Skip to content

Commit 6f202ce

Browse files
committed
WIP
1 parent 2d8b67e commit 6f202ce

File tree

7 files changed

+795
-80
lines changed

7 files changed

+795
-80
lines changed

lib/ex_webrtc/recorder.ex

Lines changed: 202 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -2,30 +2,68 @@ defmodule ExWebRTC.Recorder do
22
@moduledoc """
33
Saves received RTP packets to a file for later processing/analysis.
44
5-
Dumps raw RTP packets fed to it in a custom format. Use `Recorder.Converter` to process them.
5+
Dumps raw RTP packets fed to it in a custom format. Use `ExWebRTC.Recorder.Converter` to process them.
6+
7+
Can optionally upload the saved files to S3-compatible storage.
8+
See `ExWebRTC.Recorder.S3` and `t:options/0` for more info.
69
"""
710

811
use GenServer
912

1013
alias ExWebRTC.MediaStreamTrack
14+
alias __MODULE__.S3.UploadHandler
15+
alias __MODULE__.S3
1116

1217
require Logger
1318

1419
@default_base_dir "./recordings"
1520

21+
@type recorder :: GenServer.server()
22+
23+
@typep track_manifest :: %{
24+
start_time: DateTime.t(),
25+
kind: :video | :audio,
26+
streams: [MediaStreamTrack.stream_id()],
27+
rid_map: %{MediaStreamTrack.rid() => integer()},
28+
location: String.t()
29+
}
30+
31+
# XXX really opaque?
32+
@typedoc """
33+
Contains metadata about the recordings.
34+
WRITEME
35+
"""
36+
@opaque manifest :: %{MediaStreamTrack.id() => track_manifest()}
37+
1638
@typedoc """
1739
Options that can be passed to `start_link/1`.
1840
19-
* `base_dir` - Base directory where Recorder will save its artifacts. `#{@default_base_dir}` by default.
20-
* `on_start` - Callback that will be executed just after the Recorder is (re)started.
21-
It should return the initial list of tracks to be added.
41+
* `:base_dir` - Base directory where Recorder will save its artifacts. `#{@default_base_dir}` by default.
42+
* `:on_start` - Callback that will be executed just after the Recorder is (re)started.
43+
It should return the initial list of tracks to be added.
44+
* `:controlling_process` - PID of a process where all messages will be sent. `self()` by default.
45+
* `:s3_upload_config` - If passed, finished recordings will be uploaded to S3-compatible storage.
46+
See `t:ExWebRTC.Recorder.S3.upload_config/0` for more info.
2247
"""
2348
@type option ::
2449
{:base_dir, String.t()}
2550
| {:on_start, (-> [MediaStreamTrack.t()])}
51+
| {:controlling_process, Process.dest()}
52+
| {:s3_upload_config, S3.upload_config()}
2653

2754
@type options :: [option()]
2855

56+
@typedoc """
57+
Messages sent by the `ExWebRTC.Recorder` process to its controlling process.
58+
59+
* `:upload_complete`, `:upload_failed` - Sent after the completion of the upload task, identified by its reference.
60+
Contains the updated manifest with `s3://` schema URLs to uploaded files.
61+
"""
62+
@type message ::
63+
{:ex_webrtc_recorder, pid(),
64+
{:upload_complete, S3.upload_task_ref(), manifest()}
65+
| {:upload_failed, S3.upload_task_ref(), manifest()}}
66+
2967
# Necessary to start Recorder under a supervisor using `{Recorder, [recorder_opts, gen_server_opts]}`
3068
@doc false
3169
@spec child_spec(list()) :: Supervisor.child_spec()
@@ -44,7 +82,11 @@ defmodule ExWebRTC.Recorder do
4482
"""
4583
@spec start(options(), GenServer.options()) :: GenServer.on_start()
4684
def start(recorder_opts \\ [], gen_server_opts \\ []) do
47-
GenServer.start(__MODULE__, recorder_opts, gen_server_opts)
85+
config =
86+
recorder_opts
87+
|> Keyword.put_new(:controlling_process, self())
88+
89+
GenServer.start(__MODULE__, config, gen_server_opts)
4890
end
4991

5092
@doc """
@@ -54,13 +96,20 @@ defmodule ExWebRTC.Recorder do
5496
"""
5597
@spec start_link(options(), GenServer.options()) :: GenServer.on_start()
5698
def start_link(recorder_opts \\ [], gen_server_opts \\ []) do
57-
GenServer.start_link(__MODULE__, recorder_opts, gen_server_opts)
99+
config =
100+
recorder_opts
101+
|> Keyword.put_new(:controlling_process, self())
102+
103+
GenServer.start_link(__MODULE__, config, gen_server_opts)
58104
end
59105

60106
@doc """
61107
Adds new tracks to the recording.
108+
109+
Returns the part of the recording manifest that's relevant to the freshly added tracks.
110+
See `t:manifest/0` for more info.
62111
"""
63-
@spec add_tracks(GenServer.server(), [MediaStreamTrack.t()]) :: :ok
112+
@spec add_tracks(recorder(), [MediaStreamTrack.t()]) :: {:ok, manifest()}
64113
def add_tracks(recorder, tracks) do
65114
GenServer.call(recorder, {:add_tracks, tracks})
66115
end
@@ -69,7 +118,7 @@ defmodule ExWebRTC.Recorder do
69118
Records a received packet on the given track.
70119
"""
71120
@spec record(
72-
GenServer.server(),
121+
recorder(),
73122
MediaStreamTrack.id(),
74123
MediaStreamTrack.rid() | nil,
75124
ExRTP.Packet.t()
@@ -79,6 +128,37 @@ defmodule ExWebRTC.Recorder do
79128
GenServer.cast(recorder, {:record, track_id, rid, recv_time, packet})
80129
end
81130

131+
@doc """
132+
Changes the controlling process of this `recorder` process.
133+
134+
Controlling process is a process that receives all of the messages (described
135+
by `t:message/0`) from this Recorder.
136+
"""
137+
@spec controlling_process(recorder(), Process.dest()) :: :ok
138+
def controlling_process(recorder, controlling_process) do
139+
GenServer.call(recorder, {:controlling_process, controlling_process})
140+
end
141+
142+
@doc """
143+
Finishes the recording for the given tracks and optionally uploads the result files.
144+
145+
Returns the part of the recording manifest that's relevant to the freshly ended tracks.
146+
See `t:manifest/0` for more info.
147+
148+
If uploads are configured:
149+
* Returns the reference to the upload task that was spawned.
150+
* Will send the `:upload_complete`/`:upload_failed` message with this reference
151+
to the controlling process when the task finishes.
152+
153+
Note that the manifest returned by this function always contains local paths to files.
154+
The updated manifest with `s3://` schema URLs is sent in the aforementioned message.
155+
"""
156+
@spec end_tracks(recorder(), [MediaStreamTrack.id()]) ::
157+
{:ok, manifest(), S3.upload_task_ref() | nil} | {:error, :tracks_not_found}
158+
def end_tracks(recorder, track_ids) do
159+
GenServer.call(recorder, {:end_tracks, track_ids})
160+
end
161+
82162
@impl true
83163
def init(config) do
84164
base_dir =
@@ -89,9 +169,18 @@ defmodule ExWebRTC.Recorder do
89169
:ok = File.mkdir_p!(base_dir)
90170
Logger.info("Starting recorder. Recordings will be saved under: #{base_dir}")
91171

172+
upload_handler =
173+
if config[:s3_upload_config] do
174+
Logger.info("Recordings will be uploaded to S3")
175+
UploadHandler.new(config[:s3_upload_config])
176+
end
177+
92178
state = %{
179+
owner: config[:controlling_process],
93180
base_dir: base_dir,
94-
tracks: %{}
181+
manifest_path: Path.join(base_dir, "manifest.json"),
182+
track_data: %{},
183+
upload_handler: upload_handler
95184
}
96185

97186
case config[:on_start] do
@@ -110,30 +199,53 @@ defmodule ExWebRTC.Recorder do
110199
{:noreply, state}
111200

112201
tracks ->
113-
state = do_add_tracks(tracks, state)
202+
{_manifest_diff, state} = do_add_tracks(tracks, state)
114203
{:noreply, state}
115204
end
116205
end
117206

118207
@impl true
119-
def handle_call({:add_tracks, tracks}, _from, state) do
120-
state = do_add_tracks(tracks, state)
208+
def handle_call({:controlling_process, controlling_process}, _from, state) do
209+
state = %{state | owner: controlling_process}
121210
{:reply, :ok, state}
122211
end
123212

124213
@impl true
125-
def handle_cast({:record, track_id, rid, recv_time, packet}, state)
126-
when is_map_key(state.tracks, track_id) do
127-
%{file: file, rid_map: rid_map} = state.tracks[track_id]
214+
def handle_call({:add_tracks, tracks}, _from, state) do
215+
{manifest_diff, state} = do_add_tracks(tracks, state)
216+
{:reply, {:ok, manifest_diff}, state}
217+
end
218+
219+
@impl true
220+
def handle_call({:end_tracks, track_ids}, _from, state) do
221+
case Enum.filter(track_ids, &Map.has_key?(state.track_data, &1)) do
222+
[] ->
223+
{:reply, {:error, :tracks_not_found}, state}
128224

129-
case rid_map do
130-
%{^rid => rid_idx} ->
131-
:ok = IO.binwrite(file, serialize_packet(packet, rid_idx, recv_time))
225+
track_ids ->
226+
{manifest_diff, ref, state} = do_end_tracks(track_ids, state)
227+
{:reply, {:ok, manifest_diff, ref}, state}
228+
end
229+
end
132230

133-
_other ->
231+
@impl true
232+
def handle_cast({:record, track_id, rid, recv_time, packet}, state)
233+
when is_map_key(state.track_data, track_id) do
234+
%{file: file, rid_map: rid_map} = state.track_data[track_id]
235+
236+
with {:ok, rid_idx} <- Map.fetch(rid_map, rid),
237+
false <- is_nil(file) do
238+
:ok = IO.binwrite(file, serialize_packet(packet, rid_idx, recv_time))
239+
else
240+
:error ->
134241
Logger.warning("""
135242
Tried to save packet for unknown rid. Ignoring. Track id: #{inspect(track_id)}, rid: #{inspect(rid)}.\
136243
""")
244+
245+
true ->
246+
Logger.warning("""
247+
Tried to save packet for track which has been ended. Ignoring. Track id: #{inspect(track_id)} \
248+
""")
137249
end
138250

139251
{:noreply, state}
@@ -148,6 +260,29 @@ defmodule ExWebRTC.Recorder do
148260
{:noreply, state}
149261
end
150262

263+
@impl true
264+
def handle_info({ref, _res} = task_result, state) when is_reference(ref) do
265+
if state.upload_handler do
266+
{result, manifest, handler} =
267+
UploadHandler.process_result(state.upload_handler, task_result)
268+
269+
case result do
270+
:ok ->
271+
send(state.owner, {:ex_webrtc_recorder, self(), {:upload_complete, ref, manifest}})
272+
273+
{:error, :upload_failed} ->
274+
send(state.owner, {:ex_webrtc_recorder, self(), {:upload_failed, ref, manifest}})
275+
276+
{:error, :unknown_task} ->
277+
raise "Upload handler encountered result of unknown task"
278+
end
279+
280+
{:noreply, %{state | upload_handler: handler}}
281+
else
282+
{:noreply, state}
283+
end
284+
end
285+
151286
@impl true
152287
def handle_info(_msg, state) do
153288
{:noreply, state}
@@ -156,28 +291,61 @@ defmodule ExWebRTC.Recorder do
156291
defp do_add_tracks(tracks, state) do
157292
start_time = DateTime.utc_now()
158293

159-
tracks =
294+
new_track_data =
160295
Map.new(tracks, fn track ->
161-
path = Path.join(state.base_dir, "#{track.id}.rtpx")
162-
file = File.open!(path, [:write])
163-
rid_map = (track.rids || [nil]) |> Enum.with_index() |> Map.new()
164-
165-
{track.id,
166-
%{kind: track.kind, rid_map: rid_map, path: path, file: file, start_time: start_time}}
296+
file_path = Path.join(state.base_dir, "#{track.id}.rtpx")
297+
298+
track_entry = %{
299+
start_time: start_time,
300+
kind: track.kind,
301+
streams: track.streams,
302+
rid_map: (track.rids || [nil]) |> Enum.with_index() |> Map.new(),
303+
location: file_path,
304+
file: File.open!(file_path, [:write])
305+
}
306+
307+
{track.id, track_entry}
167308
end)
168309

169-
state = %{state | tracks: Map.merge(state.tracks, tracks)}
170-
report_path = Path.join(state.base_dir, "report.json")
310+
manifest_diff = to_manifest(new_track_data)
311+
312+
state = %{state | track_data: Map.merge(state.track_data, new_track_data)}
313+
314+
:ok = File.write!(state.manifest_path, state.track_data |> to_manifest() |> Jason.encode!())
171315

172-
report =
173-
Map.new(state.tracks, fn {id, track} ->
174-
track = Map.delete(track, :file)
175-
{id, track}
316+
{manifest_diff, state}
317+
end
318+
319+
defp do_end_tracks(track_ids, state) do
320+
state =
321+
Enum.reduce(track_ids, state, fn track_id, state ->
322+
%{file: file} = state.track_data[track_id]
323+
File.close(file)
324+
325+
put_in(state, [:track_data, track_id, :file], nil)
176326
end)
177327

178-
:ok = File.write!(report_path, Jason.encode!(report))
328+
manifest_diff = to_manifest(state.track_data, track_ids)
329+
330+
case state.upload_handler do
331+
nil ->
332+
{manifest_diff, nil, state}
333+
334+
handler ->
335+
{ref, handler} = UploadHandler.spawn_task(handler, manifest_diff)
336+
337+
{manifest_diff, ref, %{state | upload_handler: handler}}
338+
end
339+
end
340+
341+
defp to_manifest(track_data, track_ids) do
342+
track_data |> Map.take(track_ids) |> to_manifest()
343+
end
179344

180-
%{state | tracks: tracks}
345+
defp to_manifest(track_data) do
346+
Map.new(track_data, fn {id, track} ->
347+
{id, Map.delete(track, :file)}
348+
end)
181349
end
182350

183351
defp serialize_packet(packet, rid_idx, recv_time) do

0 commit comments

Comments
 (0)