Skip to content

Commit feb38bc

Browse files
committed
Recorder enhancements
1 parent 7f1a127 commit feb38bc

File tree

10 files changed

+915
-92
lines changed

10 files changed

+915
-92
lines changed

lib/ex_webrtc/recorder.ex

Lines changed: 191 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2,30 +2,52 @@ defmodule ExWebRTC.Recorder do
22
@moduledoc """
33
Saves received RTP packets to a file for later processing/analysis.
44
5-
Dumps raw RTP packets fed to it in a custom format. Use `Recorder.Converter` to process them.
6-
"""
5+
Dumps raw RTP packets fed to it in a custom format. Use `ExWebRTC.Recorder.Converter` to process them.
76
8-
use GenServer
7+
Can optionally upload the saved files to S3-compatible storage.
8+
See `ExWebRTC.Recorder.S3` and `t:options/0` for more info.
9+
"""
910

1011
alias ExWebRTC.MediaStreamTrack
12+
alias __MODULE__.S3
1113

1214
require Logger
1315

16+
use GenServer
17+
1418
@default_base_dir "./recordings"
1519

20+
@type recorder :: GenServer.server()
21+
1622
@typedoc """
1723
Options that can be passed to `start_link/1`.
1824
19-
* `base_dir` - Base directory where Recorder will save its artifacts. `#{@default_base_dir}` by default.
20-
* `on_start` - Callback that will be executed just after the Recorder is (re)started.
21-
It should return the initial list of tracks to be added.
25+
* `:base_dir` - Base directory where Recorder will save its artifacts. `#{@default_base_dir}` by default.
26+
* `:on_start` - Callback that will be executed just after the Recorder is (re)started.
27+
It should return the initial list of tracks to be added.
28+
* `:controlling_process` - PID of a process where all messages will be sent. `self()` by default.
29+
* `:s3_upload_config` - If passed, finished recordings will be uploaded to S3-compatible storage.
30+
See `t:ExWebRTC.Recorder.S3.upload_config/0` for more info.
2231
"""
2332
@type option ::
2433
{:base_dir, String.t()}
2534
| {:on_start, (-> [MediaStreamTrack.t()])}
35+
| {:controlling_process, Process.dest()}
36+
| {:s3_upload_config, S3.upload_config()}
2637

2738
@type options :: [option()]
2839

40+
@typedoc """
41+
Messages sent by the `ExWebRTC.Recorder` process to its controlling process.
42+
43+
* `:upload_complete`, `:upload_failed` - Sent after the completion of the upload task, identified by its reference.
44+
Contains the updated manifest with `s3://` scheme URLs to uploaded files.
45+
"""
46+
@type message ::
47+
{:ex_webrtc_recorder, pid(),
48+
{:upload_complete, S3.upload_task_ref(), __MODULE__.Manifest.t()}
49+
| {:upload_failed, S3.upload_task_ref(), __MODULE__.Manifest.t()}}
50+
2951
# Necessary to start Recorder under a supervisor using `{Recorder, [recorder_opts, gen_server_opts]}`
3052
@doc false
3153
@spec child_spec(list()) :: Supervisor.child_spec()
@@ -44,7 +66,11 @@ defmodule ExWebRTC.Recorder do
4466
"""
4567
@spec start(options(), GenServer.options()) :: GenServer.on_start()
4668
def start(recorder_opts \\ [], gen_server_opts \\ []) do
47-
GenServer.start(__MODULE__, recorder_opts, gen_server_opts)
69+
config =
70+
recorder_opts
71+
|> Keyword.put_new(:controlling_process, self())
72+
73+
GenServer.start(__MODULE__, config, gen_server_opts)
4874
end
4975

5076
@doc """
@@ -54,13 +80,20 @@ defmodule ExWebRTC.Recorder do
5480
"""
5581
@spec start_link(options(), GenServer.options()) :: GenServer.on_start()
5682
def start_link(recorder_opts \\ [], gen_server_opts \\ []) do
57-
GenServer.start_link(__MODULE__, recorder_opts, gen_server_opts)
83+
config =
84+
recorder_opts
85+
|> Keyword.put_new(:controlling_process, self())
86+
87+
GenServer.start_link(__MODULE__, config, gen_server_opts)
5888
end
5989

6090
@doc """
6191
Adds new tracks to the recording.
92+
93+
Returns the part of the recording manifest that's relevant to the freshly added tracks.
94+
See `t:ExWebRTC.Recorder.Manifest.t/0` for more info.
6295
"""
63-
@spec add_tracks(GenServer.server(), [MediaStreamTrack.t()]) :: :ok
96+
@spec add_tracks(recorder(), [MediaStreamTrack.t()]) :: {:ok, __MODULE__.Manifest.t()}
6497
def add_tracks(recorder, tracks) do
6598
GenServer.call(recorder, {:add_tracks, tracks})
6699
end
@@ -69,7 +102,7 @@ defmodule ExWebRTC.Recorder do
69102
Records a received packet on the given track.
70103
"""
71104
@spec record(
72-
GenServer.server(),
105+
recorder(),
73106
MediaStreamTrack.id(),
74107
MediaStreamTrack.rid() | nil,
75108
ExRTP.Packet.t()
@@ -79,6 +112,37 @@ defmodule ExWebRTC.Recorder do
79112
GenServer.cast(recorder, {:record, track_id, rid, recv_time, packet})
80113
end
81114

115+
@doc """
116+
Changes the controlling process of this `recorder` process.
117+
118+
Controlling process is a process that receives all of the messages (described
119+
by `t:message/0`) from this Recorder.
120+
"""
121+
@spec controlling_process(recorder(), Process.dest()) :: :ok
122+
def controlling_process(recorder, controlling_process) do
123+
GenServer.call(recorder, {:controlling_process, controlling_process})
124+
end
125+
126+
@doc """
127+
Finishes the recording for the given tracks and optionally uploads the result files.
128+
129+
Returns the part of the recording manifest that's relevant to the freshly ended tracks.
130+
See `t:ExWebRTC.Recorder.Manifest.t/0` for more info.
131+
132+
If uploads are configured:
133+
* Returns the reference to the upload task that was spawned.
134+
* Will send the `:upload_complete`/`:upload_failed` message with this reference
135+
to the controlling process when the task finishes.
136+
137+
Note that the manifest returned by this function always contains local paths to files.
138+
The updated manifest with `s3://` scheme URLs is sent in the aforementioned message.
139+
"""
140+
@spec end_tracks(recorder(), [MediaStreamTrack.id()]) ::
141+
{:ok, __MODULE__.Manifest.t(), S3.upload_task_ref() | nil} | {:error, :tracks_not_found}
142+
def end_tracks(recorder, track_ids) do
143+
GenServer.call(recorder, {:end_tracks, track_ids})
144+
end
145+
82146
@impl true
83147
def init(config) do
84148
base_dir =
@@ -89,9 +153,18 @@ defmodule ExWebRTC.Recorder do
89153
:ok = File.mkdir_p!(base_dir)
90154
Logger.info("Starting recorder. Recordings will be saved under: #{base_dir}")
91155

156+
upload_handler =
157+
if config[:s3_upload_config] do
158+
Logger.info("Recordings will be uploaded to S3")
159+
S3.UploadHandler.new(config[:s3_upload_config])
160+
end
161+
92162
state = %{
163+
owner: config[:controlling_process],
93164
base_dir: base_dir,
94-
tracks: %{}
165+
manifest_path: Path.join(base_dir, "manifest.json"),
166+
track_data: %{},
167+
upload_handler: upload_handler
95168
}
96169

97170
case config[:on_start] do
@@ -110,30 +183,53 @@ defmodule ExWebRTC.Recorder do
110183
{:noreply, state}
111184

112185
tracks ->
113-
state = do_add_tracks(tracks, state)
186+
{_manifest_diff, state} = do_add_tracks(tracks, state)
114187
{:noreply, state}
115188
end
116189
end
117190

118191
@impl true
119-
def handle_call({:add_tracks, tracks}, _from, state) do
120-
state = do_add_tracks(tracks, state)
192+
def handle_call({:controlling_process, controlling_process}, _from, state) do
193+
state = %{state | owner: controlling_process}
121194
{:reply, :ok, state}
122195
end
123196

124197
@impl true
125-
def handle_cast({:record, track_id, rid, recv_time, packet}, state)
126-
when is_map_key(state.tracks, track_id) do
127-
%{file: file, rid_map: rid_map} = state.tracks[track_id]
198+
def handle_call({:add_tracks, tracks}, _from, state) do
199+
{manifest_diff, state} = do_add_tracks(tracks, state)
200+
{:reply, {:ok, manifest_diff}, state}
201+
end
202+
203+
@impl true
204+
def handle_call({:end_tracks, track_ids}, _from, state) do
205+
case Enum.filter(track_ids, &Map.has_key?(state.track_data, &1)) do
206+
[] ->
207+
{:reply, {:error, :tracks_not_found}, state}
128208

129-
case rid_map do
130-
%{^rid => rid_idx} ->
131-
:ok = IO.binwrite(file, serialize_packet(packet, rid_idx, recv_time))
209+
track_ids ->
210+
{manifest_diff, ref, state} = do_end_tracks(track_ids, state)
211+
{:reply, {:ok, manifest_diff, ref}, state}
212+
end
213+
end
132214

133-
_other ->
215+
@impl true
216+
def handle_cast({:record, track_id, rid, recv_time, packet}, state)
217+
when is_map_key(state.track_data, track_id) do
218+
%{file: file, rid_map: rid_map} = state.track_data[track_id]
219+
220+
with {:ok, rid_idx} <- Map.fetch(rid_map, rid),
221+
false <- is_nil(file) do
222+
:ok = IO.binwrite(file, serialize_packet(packet, rid_idx, recv_time))
223+
else
224+
:error ->
134225
Logger.warning("""
135226
Tried to save packet for unknown rid. Ignoring. Track id: #{inspect(track_id)}, rid: #{inspect(rid)}.\
136227
""")
228+
229+
true ->
230+
Logger.warning("""
231+
Tried to save packet for track which has been ended. Ignoring. Track id: #{inspect(track_id)} \
232+
""")
137233
end
138234

139235
{:noreply, state}
@@ -148,6 +244,29 @@ defmodule ExWebRTC.Recorder do
148244
{:noreply, state}
149245
end
150246

247+
@impl true
248+
def handle_info({ref, _res} = task_result, state) when is_reference(ref) do
249+
if state.upload_handler do
250+
{result, manifest, handler} =
251+
S3.UploadHandler.process_result(state.upload_handler, task_result)
252+
253+
case result do
254+
:ok ->
255+
send(state.owner, {:ex_webrtc_recorder, self(), {:upload_complete, ref, manifest}})
256+
257+
{:error, :upload_failed} ->
258+
send(state.owner, {:ex_webrtc_recorder, self(), {:upload_failed, ref, manifest}})
259+
260+
{:error, :unknown_task} ->
261+
raise "Upload handler encountered result of unknown task"
262+
end
263+
264+
{:noreply, %{state | upload_handler: handler}}
265+
else
266+
{:noreply, state}
267+
end
268+
end
269+
151270
@impl true
152271
def handle_info(_msg, state) do
153272
{:noreply, state}
@@ -156,28 +275,64 @@ defmodule ExWebRTC.Recorder do
156275
defp do_add_tracks(tracks, state) do
157276
start_time = DateTime.utc_now()
158277

159-
tracks =
278+
new_track_data =
160279
Map.new(tracks, fn track ->
161-
path = Path.join(state.base_dir, "#{track.id}.rtpx")
162-
file = File.open!(path, [:write])
163-
rid_map = (track.rids || [nil]) |> Enum.with_index() |> Map.new()
164-
165-
{track.id,
166-
%{kind: track.kind, rid_map: rid_map, path: path, file: file, start_time: start_time}}
280+
file_path = Path.join(state.base_dir, "#{track.id}.rtpx")
281+
282+
track_entry = %{
283+
start_time: start_time,
284+
kind: track.kind,
285+
streams: track.streams,
286+
rid_map: (track.rids || [nil]) |> Enum.with_index() |> Map.new(),
287+
location: file_path,
288+
file: File.open!(file_path, [:write])
289+
}
290+
291+
{track.id, track_entry}
167292
end)
168293

169-
state = %{state | tracks: Map.merge(state.tracks, tracks)}
170-
report_path = Path.join(state.base_dir, "report.json")
294+
manifest_diff = to_manifest(new_track_data)
295+
296+
state = %{state | track_data: Map.merge(state.track_data, new_track_data)}
297+
298+
:ok = File.write!(state.manifest_path, state.track_data |> to_manifest() |> Jason.encode!())
171299

172-
report =
173-
Map.new(state.tracks, fn {id, track} ->
174-
track = Map.delete(track, :file)
175-
{id, track}
300+
{manifest_diff, state}
301+
end
302+
303+
defp do_end_tracks(track_ids, state) do
304+
# We're keeping entries from `track_data` for ended tracks
305+
# because they need to be present in the global manifest,
306+
# which gets recreated on each call to `add_tracks`
307+
state =
308+
Enum.reduce(track_ids, state, fn track_id, state ->
309+
%{file: file} = state.track_data[track_id]
310+
File.close(file)
311+
312+
put_in(state, [:track_data, track_id, :file], nil)
176313
end)
177314

178-
:ok = File.write!(report_path, Jason.encode!(report))
315+
manifest_diff = to_manifest(state.track_data, track_ids)
316+
317+
case state.upload_handler do
318+
nil ->
319+
{manifest_diff, nil, state}
320+
321+
handler ->
322+
{ref, handler} = S3.UploadHandler.spawn_task(handler, manifest_diff)
323+
324+
{manifest_diff, ref, %{state | upload_handler: handler}}
325+
end
326+
end
327+
328+
defp to_manifest(track_data, track_ids) do
329+
track_data |> Map.take(track_ids) |> to_manifest()
330+
end
179331

180-
state
332+
defp to_manifest(track_data) do
333+
Map.new(track_data, fn {id, track} ->
334+
{id, Map.delete(track, :file)}
335+
end)
181336
end
182337

183338
defp serialize_packet(packet, rid_idx, recv_time) do

0 commit comments

Comments
 (0)