diff --git a/README.md b/README.md index 261932e2f34..4af5397b1fc 100644 --- a/README.md +++ b/README.md @@ -5,14 +5,15 @@ [![Go Report Card](https://goreportcard.com/badge/github.com/aler9/rtsp-simple-server)](https://goreportcard.com/report/github.com/aler9/rtsp-simple-server) [![Docker Hub](https://img.shields.io/badge/docker-aler9%2Frtsp--simple--server-blue)](https://hub.docker.com/r/aler9/rtsp-simple-server) -_rtsp-simple-server_ is a simple, ready-to-use and zero-dependency RTSP server and RTSP proxy, a software that allows multiple users to publish and read live video and audio streams over time. RTSP is a standard protocol that describes how to perform these operations with the help of a server, that is contacted by both readers and publishers in order to negotiate a streaming method, then relays the publisher streams to the readers. +_rtsp-simple-server_ is a simple, ready-to-use and zero-dependency RTSP server and RTSP proxy, a software that allows multiple users to publish and read live video and audio streams over time. RTSP is a standard protocol that describes how to perform these operations with the help of a server, that is contacted by both readers and publishers and relays the publisher streams to the readers. Features: * Read and publish live streams with UDP and TCP * Each stream can have multiple video and audio tracks, encoded with any codec (including H264, H265, VP8, VP9, MP3, AAC, Opus, PCM) -* Publish multiple streams at once, each in a separate path, that can be read by multiple users +* Serve multiple streams at once, each in a separate path, that can be read by multiple users * Pull and serve streams from other RTSP or RTMP servers, always or on-demand (RTSP proxy) -* Provide separate authentication for reading and publishing +* Redirect to other RTSP servers (load balancing) +* Authenticate readers and publishers separately * Run custom commands when clients connect, disconnect, read or publish streams * Compatible with Linux, Windows and Mac, does not require any dependency or interpreter, it's a single executable diff --git a/client/client.go b/client/client.go index b90b7c2b0f0..9ddc0e43212 100644 --- a/client/client.go +++ b/client/client.go @@ -39,8 +39,9 @@ type streamTrack struct { } type describeData struct { - sdp []byte - err error + sdp []byte + redirect string + err error } type state int @@ -889,6 +890,17 @@ func (c *Client) runWaitingDescribe() bool { return true } + if res.redirect != "" { + c.conn.WriteResponse(&base.Response{ + StatusCode: base.StatusMovedPermanently, + Header: base.Header{ + "CSeq": c.describeCSeq, + "Location": base.HeaderValue{res.redirect}, + }, + }) + return true + } + c.conn.WriteResponse(&base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -1309,6 +1321,6 @@ func (c *Client) OnReaderFrame(trackId int, streamType base.StreamType, buf []by } } -func (c *Client) OnPathDescribeData(sdp []byte, err error) { - c.describeData <- describeData{sdp, err} +func (c *Client) OnPathDescribeData(sdp []byte, redirect string, err error) { + c.describeData <- describeData{sdp, redirect, err} } diff --git a/conf/pathconf.go b/conf/pathconf.go index d82ea9a9c1a..b284f3ffbb4 100644 --- a/conf/pathconf.go +++ b/conf/pathconf.go @@ -16,6 +16,7 @@ type PathConf struct { SourceProtocol string `yaml:"sourceProtocol"` SourceProtocolParsed gortsplib.StreamProtocol `yaml:"-" json:"-"` SourceOnDemand bool `yaml:"sourceOnDemand"` + SourceRedirect string `yaml:"sourceRedirect"` RunOnInit string `yaml:"runOnInit"` RunOnDemand string `yaml:"runOnDemand"` RunOnPublish string `yaml:"runOnPublish"` @@ -64,6 +65,7 @@ func (pconf *PathConf) fillAndCheck(name string) error { if err != nil { return fmt.Errorf("'%s' is not a valid url", pconf.Source) } + if u.User != nil { pass, _ := u.User.Password() user := u.User.Username() @@ -76,6 +78,7 @@ func (pconf *PathConf) fillAndCheck(name string) error { if pconf.SourceProtocol == "" { pconf.SourceProtocol = "udp" } + switch pconf.SourceProtocol { case "udp": pconf.SourceProtocolParsed = gortsplib.StreamProtocolUDP @@ -96,6 +99,7 @@ func (pconf *PathConf) fillAndCheck(name string) error { if err != nil { return fmt.Errorf("'%s' is not a valid url", pconf.Source) } + if u.User != nil { pass, _ := u.User.Password() user := u.User.Username() @@ -107,15 +111,25 @@ func (pconf *PathConf) fillAndCheck(name string) error { } else if pconf.Source == "record" { + } else if pconf.Source == "redirect" { + } else { return fmt.Errorf("unsupported source: '%s'", pconf.Source) } + if pconf.SourceRedirect != "" { + _, err := url.Parse(pconf.SourceRedirect) + if err != nil { + return fmt.Errorf("'%s' is not a valid url", pconf.SourceRedirect) + } + } + if pconf.PublishUser != "" { if !regexp.MustCompile("^[a-zA-Z0-9]+$").MatchString(pconf.PublishUser) { return fmt.Errorf("publish username must be alphanumeric") } } + if pconf.PublishPass != "" { if !regexp.MustCompile("^[a-zA-Z0-9]+$").MatchString(pconf.PublishPass) { return fmt.Errorf("publish password must be alphanumeric") diff --git a/main_test.go b/main_test.go index fa32c1077fe..22a099007b6 100644 --- a/main_test.go +++ b/main_test.go @@ -233,7 +233,7 @@ func TestPublish(t *testing.T) { switch conf.publishSoft { case "ffmpeg": - cnt1, err := newContainer("ffmpeg", "publish", []string{ + cnt1, err := newContainer("ffmpeg", "source", []string{ "-re", "-stream_loop", "-1", "-i", "/emptyvideo.ts", @@ -256,7 +256,7 @@ func TestPublish(t *testing.T) { time.Sleep(1 * time.Second) - cnt2, err := newContainer("ffmpeg", "read", []string{ + cnt2, err := newContainer("ffmpeg", "dest", []string{ "-rtsp_transport", "udp", "-i", "rtsp://" + ownDockerIp + ":8554/teststream", "-vframes", "1", @@ -289,7 +289,7 @@ func TestRead(t *testing.T) { time.Sleep(1 * time.Second) - cnt1, err := newContainer("ffmpeg", "publish", []string{ + cnt1, err := newContainer("ffmpeg", "source", []string{ "-re", "-stream_loop", "-1", "-i", "/emptyvideo.ts", @@ -305,7 +305,7 @@ func TestRead(t *testing.T) { switch conf.readSoft { case "ffmpeg": - cnt2, err := newContainer("ffmpeg", "read", []string{ + cnt2, err := newContainer("ffmpeg", "dest", []string{ "-rtsp_transport", conf.readProto, "-i", "rtsp://" + ownDockerIp + ":8554/teststream", "-vframes", "1", @@ -337,14 +337,13 @@ func TestRead(t *testing.T) { } func TestTCPOnly(t *testing.T) { - conf := "protocols: [tcp]\n" - p, err := testProgram(conf) + p, err := testProgram("protocols: [tcp]\n") require.NoError(t, err) defer p.close() time.Sleep(1 * time.Second) - cnt1, err := newContainer("ffmpeg", "publish", []string{ + cnt1, err := newContainer("ffmpeg", "source", []string{ "-re", "-stream_loop", "-1", "-i", "/emptyvideo.ts", @@ -356,7 +355,7 @@ func TestTCPOnly(t *testing.T) { require.NoError(t, err) defer cnt1.close() - cnt2, err := newContainer("ffmpeg", "read", []string{ + cnt2, err := newContainer("ffmpeg", "dest", []string{ "-rtsp_transport", "tcp", "-i", "rtsp://" + ownDockerIp + ":8554/teststream", "-vframes", "1", @@ -377,7 +376,7 @@ func TestPathWithSlash(t *testing.T) { time.Sleep(1 * time.Second) - cnt1, err := newContainer("ffmpeg", "publish", []string{ + cnt1, err := newContainer("ffmpeg", "source", []string{ "-re", "-stream_loop", "-1", "-i", "/emptyvideo.ts", @@ -389,7 +388,7 @@ func TestPathWithSlash(t *testing.T) { require.NoError(t, err) defer cnt1.close() - cnt2, err := newContainer("ffmpeg", "read", []string{ + cnt2, err := newContainer("ffmpeg", "dest", []string{ "-rtsp_transport", "udp", "-i", "rtsp://" + ownDockerIp + ":8554/test/stream", "-vframes", "1", @@ -410,7 +409,7 @@ func TestPathWithQuery(t *testing.T) { time.Sleep(1 * time.Second) - cnt1, err := newContainer("ffmpeg", "publish", []string{ + cnt1, err := newContainer("ffmpeg", "source", []string{ "-re", "-stream_loop", "-1", "-i", "/emptyvideo.ts", @@ -422,7 +421,7 @@ func TestPathWithQuery(t *testing.T) { require.NoError(t, err) defer cnt1.close() - cnt2, err := newContainer("ffmpeg", "read", []string{ + cnt2, err := newContainer("ffmpeg", "dest", []string{ "-rtsp_transport", "udp", "-i", "rtsp://" + ownDockerIp + ":8554/test?param3=otherval", "-vframes", "1", @@ -438,12 +437,11 @@ func TestPathWithQuery(t *testing.T) { func TestAuth(t *testing.T) { t.Run("publish", func(t *testing.T) { - conf := "paths:\n" + + p, err := testProgram("paths:\n" + " all:\n" + " publishUser: testuser\n" + " publishPass: testpass\n" + - " publishIps: [172.17.0.0/16]\n" - p, err := testProgram(conf) + " publishIps: [172.17.0.0/16]\n") require.NoError(t, err) defer p.close() @@ -482,12 +480,11 @@ func TestAuth(t *testing.T) { "vlc", } { t.Run("read_"+soft, func(t *testing.T) { - conf := "paths:\n" + + p, err := testProgram("paths:\n" + " all:\n" + " readUser: testuser\n" + " readPass: testpass\n" + - " readIps: [172.17.0.0/16]\n" - p, err := testProgram(conf) + " readIps: [172.17.0.0/16]\n") require.NoError(t, err) defer p.close() @@ -540,11 +537,10 @@ func TestSourceRtsp(t *testing.T) { "tcp", } { t.Run(proto, func(t *testing.T) { - conf := "paths:\n" + + p1, err := testProgram("paths:\n" + " all:\n" + " readUser: testuser\n" + - " readPass: testpass\n" - p1, err := testProgram(conf) + " readPass: testpass\n") require.NoError(t, err) defer p1.close() @@ -564,7 +560,7 @@ func TestSourceRtsp(t *testing.T) { time.Sleep(1 * time.Second) - conf = "rtspPort: 8555\n" + + p2, err := testProgram("rtspPort: 8555\n" + "rtpPort: 8100\n" + "rtcpPort: 8101\n" + "\n" + @@ -572,8 +568,7 @@ func TestSourceRtsp(t *testing.T) { " proxied:\n" + " source: rtsp://testuser:testpass@localhost:8554/teststream\n" + " sourceProtocol: " + proto + "\n" + - " sourceOnDemand: yes\n" - p2, err := testProgram(conf) + " sourceOnDemand: yes\n") require.NoError(t, err) defer p2.close() @@ -615,11 +610,10 @@ func TestSourceRtmp(t *testing.T) { time.Sleep(1 * time.Second) - conf := "paths:\n" + + p, err := testProgram("paths:\n" + " proxied:\n" + " source: rtmp://" + cnt1.ip() + "/stream/test\n" + - " sourceOnDemand: yes\n" - p, err := testProgram(conf) + " sourceOnDemand: yes\n") require.NoError(t, err) defer p.close() @@ -639,11 +633,49 @@ func TestSourceRtmp(t *testing.T) { require.Equal(t, 0, code) } +func TestRedirect(t *testing.T) { + p1, err := testProgram("paths:\n" + + " path1:\n" + + " source: redirect\n" + + " sourceRedirect: rtsp://" + ownDockerIp + ":8554/path2\n" + + " path2:\n") + require.NoError(t, err) + defer p1.close() + + time.Sleep(1 * time.Second) + + cnt1, err := newContainer("ffmpeg", "source", []string{ + "-re", + "-stream_loop", "-1", + "-i", "/emptyvideo.ts", + "-c", "copy", + "-f", "rtsp", + "-rtsp_transport", "udp", + "rtsp://" + ownDockerIp + ":8554/path2", + }) + require.NoError(t, err) + defer cnt1.close() + + time.Sleep(1 * time.Second) + + cnt2, err := newContainer("ffmpeg", "dest", []string{ + "-rtsp_transport", "udp", + "-i", "rtsp://" + ownDockerIp + ":8554/path1", + "-vframes", "1", + "-f", "image2", + "-y", "/dev/null", + }) + require.NoError(t, err) + defer cnt2.close() + + code := cnt2.wait() + require.Equal(t, 0, code) +} + func TestRunOnDemand(t *testing.T) { - conf := "paths:\n" + + p1, err := testProgram("paths:\n" + " all:\n" + - " runOnDemand: ffmpeg -hide_banner -loglevel error -re -i testimages/ffmpeg/emptyvideo.ts -c copy -f rtsp rtsp://localhost:8554/$RTSP_SERVER_PATH\n" - p1, err := testProgram(conf) + " runOnDemand: ffmpeg -hide_banner -loglevel error -re -i testimages/ffmpeg/emptyvideo.ts -c copy -f rtsp rtsp://localhost:8554/$RTSP_SERVER_PATH\n") require.NoError(t, err) defer p1.close() diff --git a/path/path.go b/path/path.go index e27959db1e1..da97809b8f0 100644 --- a/path/path.go +++ b/path/path.go @@ -31,12 +31,18 @@ type Parent interface { OnPathClientClose(*client.Client) } -// a source is either a client.Client, a sourcertsp.Source or a sourcertmp.Source +// a source can be +// * client.Client +// * sourcertsp.Source +// * sourcertmp.Source +// * sourceRedirect type source interface { IsSource() } -// a sourceExternal is either a sourcertsp.Source or a sourcertmp.Source +// a sourceExternal can be +// * sourcertsp.Source +// * sourcertmp.Source type sourceExternal interface { IsSource() Close() @@ -44,6 +50,10 @@ type sourceExternal interface { SetRunning(bool) } +type sourceRedirect struct{} + +func (*sourceRedirect) IsSource() {} + type ClientDescribeRes struct { Path client.Path Err error @@ -205,8 +215,10 @@ func (pa *Path) run() { pa.Log("starting source") } - pa.source = sourcertmp.New(pa.conf.Source, state, - pa.stats, pa) + pa.source = sourcertmp.New(pa.conf.Source, state, pa.stats, pa) + + } else if pa.conf.Source == "redirect" { + pa.source = &sourceRedirect{} } if pa.conf.RunOnInit != "" { @@ -434,7 +446,7 @@ func (pa *Path) onCheck() bool { for c, state := range pa.clients { if state != clientStatePreRemove && state == clientStateWaitingDescribe { pa.clients[c] = clientStatePreRemove - c.OnPathDescribeData(nil, fmt.Errorf("publisher of path '%s' has timed out", pa.name)) + c.OnPathDescribeData(nil, "", fmt.Errorf("publisher of path '%s' has timed out", pa.name)) } } } @@ -477,7 +489,7 @@ func (pa *Path) onSourceSetReady() { for c, state := range pa.clients { if state == clientStateWaitingDescribe { pa.clients[c] = clientStatePreRemove - c.OnPathDescribeData(pa.sourceSdp, nil) + c.OnPathDescribeData(pa.sourceSdp, "", nil) } } } @@ -497,7 +509,7 @@ func (pa *Path) onSourceSetNotReady() { func (pa *Path) onClientDescribe(c *client.Client) { pa.lastDescribeReq = time.Now() - // publisher not found + // source not found if pa.source == nil { // on demand command is available: put the client on hold if pa.conf.RunOnDemand != "" { @@ -519,10 +531,17 @@ func (pa *Path) onClientDescribe(c *client.Client) { pa.clients[c] = clientStatePreRemove pa.clientsWg.Add(1) - c.OnPathDescribeData(nil, fmt.Errorf("no one is publishing to path '%s'", pa.name)) + c.OnPathDescribeData(nil, "", fmt.Errorf("no one is publishing to path '%s'", pa.name)) } - // publisher was found but is not ready: put the client on hold + // source found and is redirect + } else if _, ok := pa.source.(*sourceRedirect); ok { + pa.clients[c] = clientStatePreRemove + pa.clientsWg.Add(1) + + c.OnPathDescribeData(nil, pa.conf.SourceRedirect, nil) + + // source was found but is not ready: put the client on hold } else if !pa.sourceReady { // start source if needed if source, ok := pa.source.(sourceExternal); ok { @@ -536,12 +555,12 @@ func (pa *Path) onClientDescribe(c *client.Client) { pa.clients[c] = clientStateWaitingDescribe pa.clientsWg.Add(1) - // publisher was found and is ready + // source was found and is ready } else { pa.clients[c] = clientStatePreRemove pa.clientsWg.Add(1) - c.OnPathDescribeData(pa.sourceSdp, nil) + c.OnPathDescribeData(pa.sourceSdp, "", nil) } } diff --git a/rtsp-simple-server.yml b/rtsp-simple-server.yml index 8bd9e2ae91a..97d42687673 100644 --- a/rtsp-simple-server.yml +++ b/rtsp-simple-server.yml @@ -1,39 +1,41 @@ -# supported stream protocols (the handshake is always performed with TCP) +# supported stream protocols (the handshake is always performed with TCP). protocols: [udp, tcp] -# port of the TCP RTSP listener +# port of the TCP RTSP listener. rtspPort: 8554 -# port of the UDP RTP listener (used only if udp is in protocols) +# port of the UDP RTP listener (used only if udp is in protocols). rtpPort: 8000 -# port of the UDP RTCP listener (used only if udp is in protocols) +# port of the UDP RTCP listener (used only if udp is in protocols). rtcpPort: 8001 -# timeout of read operations +# timeout of read operations. readTimeout: 10s -# timeout of write operations +# timeout of write operations. writeTimeout: 5s -# supported authentication methods (both are insecure, use RTSP inside a VPN to enforce security) +# supported authentication methods (both are insecure, use RTSP inside a VPN +# to enforce security). authMethods: [basic, digest] # command to run when a client connects. # this is terminated with SIGINT when a client disconnects. runOnConnect: -# enable Prometheus-compatible metrics on port 9998 +# enable Prometheus-compatible metrics on port 9998. metrics: no -# enable pprof on port 9999 to monitor performances +# enable pprof on port 9999 to monitor performances. pprof: no -# destinations of log messages; available options are 'stdout', 'file' and 'syslog' + +# destinations of log messages; available options are "stdout", "file" and "syslog". logDestinations: [stdout] -# if 'file' is in logDestinations, this is the file that will receive the logs +# if "file" is in logDestinations, this is the file that will receive the logs. logFile: rtsp-simple-server.log # these settings are path-dependent. -# It's possible to use regular expressions by using a tilde as prefix. -# for instance, '~^(test1|test2)$' will match both 'test1' and 'test2'. -# for instance, '~^prefix' will match all paths that start with 'prefix'. -# The settings under the path 'all' are applied to all paths that do not match +# it's possible to use regular expressions by using a tilde as prefix. +# for example, "~^(test1|test2)$" will match both "test1" and "test2". +# for example, "~^prefix" will match all paths that start with "prefix". +# the settings under the path "all" are applied to all paths that do not match # another entry. paths: all: @@ -41,45 +43,52 @@ paths: # * record -> the stream is provided by a RTSP client # * rtsp://existing-url -> the stream is pulled from another RTSP server # * rtmp://existing-url -> the stream is pulled from a RTMP server + # * redirect -> the stream is provided by another path or server source: record - # if the source is an RTSP url, this is the protocol that will be used to pull the stream + + # if the source is an RTSP url, this is the protocol that will be used to + # pull the stream. sourceProtocol: udp - # if the source is an RTSP or RTMP url, it will be pulled only when at least one reader - # is connected, saving bandwidth + # if the source is an RTSP or RTMP url, it will be pulled only when at least + # one reader is connected, saving bandwidth. sourceOnDemand: no + # if the source is "redirect", this is the RTSP url which clients will be + # redirected to. + sourceRedirect: + # command to run when this path is loaded by the program. # this can be used, for example, to publish a stream and keep it always opened. - # This is terminated with SIGINT when the program closes. - # The path name is available in the RTSP_SERVER_PATH variable + # this is terminated with SIGINT when the program closes. + # the path name is available in the RTSP_SERVER_PATH variable. runOnInit: # command to run when this path is requested. - # This can be used, for example, to publish a stream on demand. - # This is terminated with SIGINT when the path is not requested anymore. - # The path name is available in the RTSP_SERVER_PATH variable + # this can be used, for example, to publish a stream on demand. + # this is terminated with SIGINT when the path is not requested anymore. + # the path name is available in the RTSP_SERVER_PATH variable. runOnDemand: # command to run when a client starts publishing. - # This is terminated with SIGINT when a client stops publishing. - # The path name is available in the RTSP_SERVER_PATH variable + # this is terminated with SIGINT when a client stops publishing. + # the path name is available in the RTSP_SERVER_PATH variable. runOnPublish: # command to run when a clients starts reading. - # This is terminated with SIGINT when a client stops reading. - # The path name is available in the RTSP_SERVER_PATH variable + # this is terminated with SIGINT when a client stops reading. + # the path name is available in the RTSP_SERVER_PATH variable. runOnRead: - # username required to publish + # username required to publish. publishUser: - # password required to publish + # password required to publish. publishPass: - # IPs or networks (x.x.x.x/24) allowed to publish + # ips or networks (x.x.x.x/24) allowed to publish. publishIps: [] - # username required to read + # username required to read. readUser: - # password required to read + # password required to read. readPass: - # IPs or networks (x.x.x.x/24) allowed to read + # ips or networks (x.x.x.x/24) allowed to read. readIps: []