From a81d316635ad373e701b4cdeeb761d80da15ede5 Mon Sep 17 00:00:00 2001 From: Jose134 Date: Tue, 13 Aug 2024 19:41:53 +0200 Subject: [PATCH] Structured the project in different files --- api/handler/searchvideo.go | 67 ++++++++ api/handler/streamvideo.go | 74 +++++++++ api/middleware/cors.go | 16 ++ cmd/video_server_backend/main.go | 180 ++-------------------- go.mod | 3 +- internal/bufferedsocket/bufferedsocket.go | 59 +++++++ 6 files changed, 229 insertions(+), 170 deletions(-) create mode 100644 api/handler/searchvideo.go create mode 100644 api/handler/streamvideo.go create mode 100644 api/middleware/cors.go create mode 100644 internal/bufferedsocket/bufferedsocket.go diff --git a/api/handler/searchvideo.go b/api/handler/searchvideo.go new file mode 100644 index 0000000..6cc7b10 --- /dev/null +++ b/api/handler/searchvideo.go @@ -0,0 +1,67 @@ +package handler + +import ( + "encoding/json" + "log" + "net/http" + "os" + "path/filepath" + "strings" + + "video_server_backend/internal/bufferedsocket" + + "github.com/gorilla/websocket" +) + +const WEBSOCKET_WRITE_BUFFER_SIZE = 4096 +const VIDEOS_DIR = "./videos" + +var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: WEBSOCKET_WRITE_BUFFER_SIZE, + CheckOrigin: func(r *http.Request) bool { + return true + }, +} + +type Video struct { + Filename string `json:"filename"` +} + +func SearchVideoHandler(w http.ResponseWriter, r *http.Request) { + query := r.URL.Query().Get("q") + if query == "" { + http.Error(w, "Query is required.", http.StatusBadRequest) + return + } + + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + http.Error(w, "Could not upgrade to websocket.", http.StatusInternalServerError) + return + } + socket := bufferedsocket.NewBufferedSocket(conn, WEBSOCKET_WRITE_BUFFER_SIZE) + defer socket.Close() + + err = filepath.Walk(VIDEOS_DIR, func(path string, info os.FileInfo, err error) error { + if strings.HasSuffix(path, ".mp4") && strings.Contains(path, query) { + jsonMsg, err := json.Marshal(Video{Filename: path}) + if err != nil { + log.Println(err) + } + jsonMsg = append(jsonMsg, '\n') + + socket.WriteMessage(jsonMsg) + } + return nil + }) + + if err != nil { + log.Println(err) + } + + err = socket.Flush() + if err != nil { + log.Println(err) + } +} diff --git a/api/handler/streamvideo.go b/api/handler/streamvideo.go new file mode 100644 index 0000000..8a420e3 --- /dev/null +++ b/api/handler/streamvideo.go @@ -0,0 +1,74 @@ +package handler + +import ( + "errors" + "fmt" + "net/http" + "os" + "strconv" + "strings" +) + +func StreamVideoHandler(w http.ResponseWriter, r *http.Request) { + videoPath := r.URL.Query().Get("v") + videoFile, err := os.Open(videoPath) + if err != nil { + http.Error(w, "Video file not found.", http.StatusNotFound) + return + } + defer videoFile.Close() + + fileInfo, err := videoFile.Stat() + if err != nil { + http.Error(w, "Could not obtain file info.", http.StatusInternalServerError) + return + } + + fileSize := fileInfo.Size() + rangeHeader := r.Header.Get("Range") + if rangeHeader == "" { + http.ServeFile(w, r, videoPath) + return + } + + start, end, err := getStartEndRange(rangeHeader, fileSize) + if err != nil || start > end || end >= fileSize { + http.Error(w, "Invalid range.", http.StatusBadRequest) + return + } + + w.Header().Set("Content-Type", "video/mp4") + w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, end, fileSize)) + w.Header().Set("Accept-Ranges", "bytes") + w.WriteHeader(http.StatusPartialContent) + + videoFile.Seek(start, 0) + buf := make([]byte, end-start+1) + videoFile.Read(buf) + w.Write(buf) +} + +func getStartEndRange(rangeHeader string, fileSize int64) (int64, int64, error) { + rangeParts := strings.Split(rangeHeader, "=") + if len(rangeParts) != 2 || rangeParts[0] != "bytes" { + return 0, 0, errors.New("invalid range header") + } + + rangeSpec := strings.Split(rangeParts[1], "-") + start, err := strconv.ParseInt(rangeSpec[0], 10, 64) + if err != nil { + return 0, 0, errors.New("invalid range start") + } + + var end int64 + if len(rangeSpec) == 2 && rangeSpec[1] != "" { + end, err = strconv.ParseInt(rangeSpec[1], 10, 64) + if err != nil { + return 0, 0, errors.New("invalid range end") + } + } else { + end = fileSize - 1 + } + + return start, end, nil +} diff --git a/api/middleware/cors.go b/api/middleware/cors.go new file mode 100644 index 0000000..1636a12 --- /dev/null +++ b/api/middleware/cors.go @@ -0,0 +1,16 @@ +package middleware + +import "net/http" + +func AddCORSHeaders(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS") + w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization") + if r.Method == "OPTIONS" { + w.WriteHeader(http.StatusOK) + return + } + next.ServeHTTP(w, r) + }) +} diff --git a/cmd/video_server_backend/main.go b/cmd/video_server_backend/main.go index 644ac08..134d4c4 100644 --- a/cmd/video_server_backend/main.go +++ b/cmd/video_server_backend/main.go @@ -1,186 +1,30 @@ package main import ( - "encoding/json" - "fmt" "log" "net/http" - "os" - "path/filepath" - "strconv" - "strings" - "github.com/gorilla/websocket" + "video_server_backend/api/handler" + "video_server_backend/api/middleware" ) -var upgrader = websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 4096, - CheckOrigin: func(r *http.Request) bool { - return true - }, -} - -const VIDEOS_DIR = "./videos" - -type Video struct { - Filename string `json:"filename"` -} - -func notFound(w http.ResponseWriter, r *http.Request) { +func notFoundHandler(w http.ResponseWriter, r *http.Request) { http.Error(w, "Endpoint not found", http.StatusNotFound) } -func connectSearchSocket(w http.ResponseWriter, r *http.Request) { - query := r.URL.Query().Get("q") - if query == "" { - http.Error(w, "Query is required.", http.StatusBadRequest) - return - } - - conn, err := upgrader.Upgrade(w, r, nil) - if err != nil { - fmt.Println(err) - http.Error(w, "Could not upgrade to websocket.", http.StatusInternalServerError) - return - } - - defer func() { - err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) - if err != nil { - log.Println("Error during websocket close: ", err) - } - conn.Close() - }() - - var msg = []byte{} - err = filepath.Walk(VIDEOS_DIR, func(path string, info os.FileInfo, err error) error { - fmt.Println("checking " + path) - if strings.HasSuffix(path, ".mp4") && strings.Contains(path, query) { - fmt.Println("found video " + path) - jsonMsg, err := json.Marshal(Video{Filename: path}) - if err != nil { - log.Println(err) - } - jsonMsg = append(jsonMsg, []byte("\n")...) - - if len(msg)+len(jsonMsg) > 4096 { - fmt.Println("sending message") - err = conn.WriteMessage(websocket.TextMessage, msg) - if err != nil { - log.Println(err) - return err - } - msg = jsonMsg - } else { - msg = append(msg, jsonMsg...) - } - } - return nil - }) - - if err != nil { - log.Println(err) - } - - // Send remaining messages - if len(msg) > 0 { - err = conn.WriteMessage(websocket.TextMessage, msg) - if err != nil { - log.Println(err) - } - } - - fmt.Println("done") - -} - -func streamVideo(w http.ResponseWriter, r *http.Request) { - videoPath := r.URL.Query().Get("v") - videoFile, err := os.Open(videoPath) - if err != nil { - http.Error(w, "Video file not found.", http.StatusNotFound) - return - } - defer videoFile.Close() - - fileInfo, err := videoFile.Stat() - if err != nil { - http.Error(w, "Could not obtain file info.", http.StatusInternalServerError) - return - } - - fileSize := fileInfo.Size() - rangeHeader := r.Header.Get("Range") - if rangeHeader == "" { - http.ServeFile(w, r, videoPath) - return - } - - start, end, err := getStartEndRange(rangeHeader, fileSize) - if err != nil || start > end || end >= fileSize { - http.Error(w, "Invalid range.", http.StatusBadRequest) - return - } - - w.Header().Set("Content-Type", "video/mp4") - w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, end, fileSize)) - w.Header().Set("Accept-Ranges", "bytes") - w.WriteHeader(http.StatusPartialContent) - - videoFile.Seek(start, 0) - buf := make([]byte, end-start+1) - videoFile.Read(buf) - w.Write(buf) -} - -func getStartEndRange(rangeHeader string, fileSize int64) (int64, int64, error) { - rangeParts := strings.Split(rangeHeader, "=") - if len(rangeParts) != 2 || rangeParts[0] != "bytes" { - return 0, 0, fmt.Errorf("invalid range header") - } - - rangeSpec := strings.Split(rangeParts[1], "-") - start, err := strconv.ParseInt(rangeSpec[0], 10, 64) - if err != nil { - return 0, 0, fmt.Errorf("invalid range start") - } - - var end int64 - if len(rangeSpec) == 2 && rangeSpec[1] != "" { - end, err = strconv.ParseInt(rangeSpec[1], 10, 64) - if err != nil { - return 0, 0, fmt.Errorf("invalid range end") - } - } else { - end = fileSize - 1 - } - - return start, end, nil -} - -func addCORSHeaders(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Access-Control-Allow-Origin", "*") - w.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS") - w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization") - if r.Method == "OPTIONS" { - w.WriteHeader(http.StatusOK) - return - } - next.ServeHTTP(w, r) - }) -} - func createMux() *http.ServeMux { var mux *http.ServeMux = http.NewServeMux() - mux.HandleFunc("/video", streamVideo) - mux.HandleFunc("/search", connectSearchSocket) - mux.HandleFunc("/", notFound) + mux.HandleFunc("/video", handler.StreamVideoHandler) + mux.HandleFunc("/search", handler.SearchVideoHandler) + mux.HandleFunc("/", notFoundHandler) return mux } -func main() { - log.Fatal(http.ListenAndServe(":8080", addCORSHeaders(createMux()))) +func createHandler() http.Handler { + return middleware.AddCORSHeaders(createMux()) +} + +func main() { + log.Fatal(http.ListenAndServe(":8080", createHandler())) } diff --git a/go.mod b/go.mod index 4cef24d..5f90b26 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,5 @@ module video_server_backend go 1.22.6 require ( - github.com/google/uuid v1.6.0 // indirect - github.com/gorilla/websocket v1.5.3 // indirect + github.com/gorilla/websocket v1.5.3 // direct ) diff --git a/internal/bufferedsocket/bufferedsocket.go b/internal/bufferedsocket/bufferedsocket.go new file mode 100644 index 0000000..634838c --- /dev/null +++ b/internal/bufferedsocket/bufferedsocket.go @@ -0,0 +1,59 @@ +package bufferedsocket + +import ( + "errors" + + "github.com/gorilla/websocket" +) + +type BufferedSocket struct { + conn *websocket.Conn + bufferSize int + buffer []byte + bufferLen int +} + +func NewBufferedSocket(conn *websocket.Conn, bufferSize int) *BufferedSocket { + return &BufferedSocket{ + conn: conn, + bufferSize: bufferSize, + buffer: make([]byte, 0, bufferSize), + } +} + +func (bs *BufferedSocket) WriteMessage(message []byte) error { + if len(message) > bs.bufferSize { + return errors.New("message larger than buffer size") + } + + if bs.bufferLen+len(message) > bs.bufferSize { + err := bs.Flush() + if err != nil { + return err + } + } + bs.buffer = append(bs.buffer, message...) + bs.bufferLen += len(message) + return nil +} + +func (bs *BufferedSocket) Flush() error { + if bs.bufferLen > 0 { + err := bs.conn.WriteMessage(websocket.TextMessage, bs.buffer[:bs.bufferLen]) + if err != nil { + return err + } + bs.buffer = make([]byte, 0, bs.bufferSize) + bs.bufferLen = 0 + } + return nil +} + +func (bs *BufferedSocket) Close() error { + err := bs.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) + if err != nil { + return err + } + bs.conn.Close() + return nil +}