Skip to content

Commit 5ff53b8

Browse files
authored
Merge pull request #8 from jellyfish-dev/bump_compositor
Update compositor deps
2 parents 39e8550 + e84f298 commit 5ff53b8

File tree

6 files changed

+134
-136
lines changed

6 files changed

+134
-136
lines changed

.circleci/config.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ orbs:
55
executors:
66
docker-executor:
77
docker:
8-
- image: ghcr.io/jellyfish-dev/recording-converter-test:0.0.2
8+
- image: ghcr.io/jellyfish-dev/recording-converter-test:0.0.3
99
environment:
1010
MIX_ENV: test
1111
jobs:

lib/compositor_utils.ex

+37-70
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
defmodule RecordingConverter.Compositor do
22
@moduledoc false
33

4+
alias Membrane.LiveCompositor.Request
5+
46
@text_margin 10
57
@letter_width 12
68
@output_width 1280
@@ -37,60 +39,40 @@ defmodule RecordingConverter.Compositor do
3739
@spec video_output_id() :: String.t()
3840
def video_output_id(), do: @video_output_id
3941

40-
@spec generate_output_update(map(), number()) :: [tuple()]
42+
@spec generate_output_update(map(), number()) :: [struct()]
4143
def generate_output_update(tracks, timestamp),
4244
do: [
4345
generate_video_output_update(tracks, timestamp),
4446
generate_audio_output_update(tracks, timestamp)
4547
]
4648

47-
@spec schedule_unregister_video_output(number()) :: {:lc_request, map()}
49+
@spec schedule_unregister_video_output(number()) :: Request.UnregisterOutput.t() | struct()
4850
def schedule_unregister_video_output(schedule_time_ns),
49-
do: {
50-
:lc_request,
51-
%{
52-
type: :unregister,
53-
entity_type: :output_stream,
54-
output_id: @video_output_id,
55-
schedule_time_ms: from_ns_to_ms(schedule_time_ns)
56-
}
51+
do: %Request.UnregisterOutput{
52+
output_id: @video_output_id,
53+
schedule_time: Membrane.Time.nanoseconds(schedule_time_ns)
5754
}
5855

59-
@spec schedule_unregister_audio_output(number()) :: {:lc_request, map()}
56+
@spec schedule_unregister_audio_output(number()) :: Request.UnregisterOutput.t() | struct()
6057
def schedule_unregister_audio_output(schedule_time_ns),
61-
do: {
62-
:lc_request,
63-
%{
64-
type: :unregister,
65-
entity_type: :output_stream,
66-
output_id: @audio_output_id,
67-
schedule_time_ms: from_ns_to_ms(schedule_time_ns)
68-
}
58+
do: %Request.UnregisterOutput{
59+
output_id: @audio_output_id,
60+
schedule_time: Membrane.Time.nanoseconds(schedule_time_ns)
6961
}
7062

71-
@spec schedule_unregister_input(number(), binary()) :: {:lc_request, map()}
63+
@spec schedule_unregister_input(number(), binary()) :: Request.UnregisterInput.t() | struct()
7264
def schedule_unregister_input(schedule_time_ns, input_id),
73-
do: {
74-
:lc_request,
75-
%{
76-
type: :unregister,
77-
entity_type: :input_stream,
78-
input_id: input_id,
79-
schedule_time_ms: from_ns_to_ms(schedule_time_ns)
80-
}
65+
do: %Request.UnregisterInput{
66+
input_id: input_id,
67+
schedule_time: Membrane.Time.nanoseconds(schedule_time_ns)
8168
}
8269

83-
@spec register_image_action(String.t()) :: {:lc_request, map()}
70+
@spec register_image_action(String.t()) :: Request.RegisterImage.t() | struct()
8471
def register_image_action(image_url) do
85-
{
86-
:lc_request,
87-
%{
88-
type: "register",
89-
entity_type: "image",
90-
asset_type: "png",
91-
image_id: "avatar_png",
92-
url: image_url
93-
}
72+
%Request.RegisterImage{
73+
asset_type: "png",
74+
image_id: "avatar_png",
75+
url: image_url
9476
}
9577
end
9678

@@ -105,29 +87,19 @@ defmodule RecordingConverter.Compositor do
10587
avatars_config = Enum.map(avatar_tracks, &avatar_view/1)
10688
video_tracks_config = Enum.map(video_tracks, &video_input_source_view/1)
10789

108-
{
109-
:lc_request,
110-
%{
111-
type: :update_output,
112-
output_id: @video_output_id,
113-
schedule_time_ms: from_ns_to_ms(timestamp),
114-
video: scene(video_tracks_config ++ avatars_config)
115-
}
90+
%Request.UpdateVideoOutput{
91+
output_id: @video_output_id,
92+
schedule_time: Membrane.Time.nanoseconds(timestamp),
93+
root: scene(video_tracks_config ++ avatars_config)
11694
}
11795
end
11896

11997
defp generate_audio_output_update(%{"audio" => audio_tracks}, timestamp)
12098
when is_list(audio_tracks) do
121-
{
122-
:lc_request,
123-
%{
124-
type: :update_output,
125-
output_id: @audio_output_id,
126-
audio: %{
127-
inputs: Enum.map(audio_tracks, &%{input_id: &1.id})
128-
},
129-
schedule_time_ms: from_ns_to_ms(timestamp)
130-
}
99+
%Request.UpdateAudioOutput{
100+
output_id: @audio_output_id,
101+
inputs: Enum.map(audio_tracks, &%{input_id: &1.id}),
102+
schedule_time: Membrane.Time.nanoseconds(timestamp)
131103
}
132104
end
133105

@@ -139,7 +111,7 @@ defmodule RecordingConverter.Compositor do
139111
# TODO: fix after compositor update
140112
# unnecessary rescaler
141113
%{
142-
type: "rescaler",
114+
type: :rescaler,
143115
mode: "fit",
144116
child: %{
145117
type: :input_stream,
@@ -152,16 +124,16 @@ defmodule RecordingConverter.Compositor do
152124

153125
defp avatar_view(track) do
154126
%{
155-
type: "view",
127+
type: :view,
156128
children:
157129
[
158130
# TODO: fix after compositor update
159131
# unnecessary rescaler
160132
%{
161-
type: "rescaler",
133+
type: :rescaler,
162134
mode: "fit",
163135
child: %{
164-
type: "image",
136+
type: :image,
165137
image_id: "avatar_png"
166138
}
167139
}
@@ -174,31 +146,26 @@ defmodule RecordingConverter.Compositor do
174146

175147
[
176148
%{
177-
type: "view",
149+
type: :view,
178150
bottom: 20,
179151
right: 20,
180152
width: label_width,
181153
height: 20,
182154
background_color_rgba: "#000000FF",
183155
children: [
156+
%{type: :view},
184157
%{
185-
type: "text",
158+
type: :text,
186159
text: label,
187160
align: "center",
188161
width: label_width,
189162
font_size: 20.0
190-
}
163+
},
164+
%{type: :view}
191165
]
192166
}
193167
]
194168
end
195169

196170
defp text_view(_metadata), do: []
197-
198-
defp from_ns_to_ms(timestamp_ns) do
199-
rounded_ts =
200-
timestamp_ns |> Membrane.Time.nanoseconds() |> Membrane.Time.as_milliseconds(:round)
201-
202-
max(0, rounded_ts - 10)
203-
end
204171
end

lib/pipeline.ex

+60-49
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ defmodule RecordingConverter.Pipeline do
66

77
alias Membrane.AWS.S3.Source
88
alias Membrane.HTTPAdaptiveStream.{SinkBin, Storages}
9-
alias Membrane.Time
9+
alias Membrane.LiveCompositor
1010
alias RecordingConverter.{Compositor, ReportParser}
1111

1212
@segment_duration 3
@@ -17,7 +17,12 @@ defmodule RecordingConverter.Pipeline do
1717

1818
@impl true
1919
def handle_init(_ctx, opts) do
20-
output_directory = opts.output_directory
20+
{[], opts}
21+
end
22+
23+
@impl true
24+
def handle_setup(_ctx, state) do
25+
output_directory = state.output_directory
2126

2227
with {:ok, files} when files != [] <- File.ls(output_directory) do
2328
Logger.warning(
@@ -29,19 +34,6 @@ defmodule RecordingConverter.Pipeline do
2934

3035
File.mkdir_p!(output_directory)
3136

32-
main_spec =
33-
[
34-
generate_sink_bin(output_directory),
35-
generate_output_audio_branch(opts),
36-
generate_output_video_branch(opts)
37-
]
38-
|> Enum.reject(&is_nil(&1))
39-
40-
{[spec: main_spec], opts}
41-
end
42-
43-
@impl true
44-
def handle_setup(_ctx, state) do
4537
report_path = s3_file_path(@report_file, state)
4638

4739
tracks = ReportParser.get_tracks(state.bucket_name, report_path)
@@ -52,15 +44,21 @@ defmodule RecordingConverter.Pipeline do
5244

5345
tracks_spec = Enum.map(tracks, &create_branch(&1, state))
5446

55-
track_actions =
56-
tracks
57-
|> ReportParser.get_all_track_actions()
58-
|> Enum.map(&notify_compositor/1)
47+
track_actions = ReportParser.get_all_track_actions(tracks)
5948

60-
register_image_action =
61-
state.image_url |> Compositor.register_image_action() |> notify_compositor()
49+
register_image_action = Compositor.register_image_action(state.image_url)
6250

63-
actions = [{:spec, tracks_spec}, register_image_action | track_actions]
51+
all_compositor_actions = [register_image_action | track_actions]
52+
53+
main_spec =
54+
[
55+
generate_sink_bin(output_directory),
56+
generate_output_audio_branch(state, all_compositor_actions),
57+
generate_output_video_branch(state)
58+
]
59+
|> Enum.reject(&is_nil(&1))
60+
61+
actions = [{:spec, main_spec ++ tracks_spec}]
6462

6563
{actions,
6664
%{
@@ -90,24 +88,39 @@ defmodule RecordingConverter.Pipeline do
9088

9189
@impl true
9290
def handle_child_notification(
93-
{:lc_request_response, req, %Req.Response{status: response_code, body: response_body},
94-
_lc_ctx},
95-
_child,
91+
{:request_result, _req, {:error, error}},
92+
:video_compositor,
9693
_membrane_ctx,
9794
state
9895
) do
99-
if response_code != 200 do
100-
raise """
101-
Request failed.
102-
Request: `#{inspect(req)}.
103-
Response code: #{response_code}.
104-
Response body: #{inspect(response_body)}.
105-
"""
106-
end
96+
raise """
97+
Request failed with error: #{inspect(error)}
98+
"""
10799

108100
{[], state}
109101
end
110102

103+
@impl true
104+
def handle_child_notification(
105+
{atom, _pad, _lc_ctx},
106+
:video_compositor,
107+
_membrane_ctx,
108+
state
109+
)
110+
when atom in [:input_delivered, :input_playing, :input_eos] do
111+
{[], state}
112+
end
113+
114+
@impl true
115+
def handle_child_notification(:start_of_stream, :hls_sink_bin, _ctx, state) do
116+
{[], state}
117+
end
118+
119+
@impl true
120+
def handle_child_notification({:track_playable, _track}, :hls_sink_bin, _ctx, state) do
121+
{[], state}
122+
end
123+
111124
@impl true
112125
def handle_child_notification(:end_of_stream, :hls_sink_bin, _ctx, state) do
113126
{[terminate: :normal], state}
@@ -143,14 +156,12 @@ defmodule RecordingConverter.Pipeline do
143156
get_child(:video_compositor)
144157
|> via_out(Pad.ref(:video_output, Compositor.video_output_id()),
145158
options: [
146-
encoder_preset: :slow,
147159
width: @output_width,
148160
height: @output_height,
149-
initial:
150-
Compositor.scene([
151-
%{type: :input_stream, input_id: "video_input_0", id: "child_0"}
152-
]),
153-
send_eos_when: :all_inputs
161+
encoder: %LiveCompositor.Encoder.FFmpegH264{preset: :slow},
162+
initial: %{
163+
root: Compositor.scene([])
164+
}
154165
]
155166
)
156167
|> child(:output_video_parser, %Membrane.H264.Parser{
@@ -160,27 +171,27 @@ defmodule RecordingConverter.Pipeline do
160171
|> via_in(Pad.ref(:input, :video),
161172
options: [
162173
encoding: :H264,
163-
segment_duration: Time.seconds(@segment_duration)
174+
segment_duration: Membrane.Time.seconds(@segment_duration)
164175
]
165176
)
166177
|> get_child(:hls_sink_bin)
167178
end
168179

169-
defp generate_output_audio_branch(state) do
180+
defp generate_output_audio_branch(state, compositor_actions) do
170181
child(:video_compositor, %Membrane.LiveCompositor{
171182
framerate: {30, 1},
172183
composing_strategy: :offline_processing,
173-
server_setup: Compositor.server_setup(state.compositor_path)
184+
server_setup: Compositor.server_setup(state.compositor_path),
185+
init_requests: compositor_actions
174186
})
175187
|> via_out(Pad.ref(:audio_output, Compositor.audio_output_id()),
176188
options: [
177-
channels: :stereo,
178-
initial: %{
179-
inputs: [
180-
%{input_id: "audio_input_0", volume: 0.2}
181-
]
189+
encoder: %LiveCompositor.Encoder.Opus{
190+
channels: :stereo
182191
},
183-
send_eos_when: :all_inputs
192+
initial: %{
193+
inputs: []
194+
}
184195
]
185196
)
186197
|> child(:opus_output_parser, Membrane.Opus.Parser)
@@ -190,7 +201,7 @@ defmodule RecordingConverter.Pipeline do
190201
|> via_in(Pad.ref(:input, :audio),
191202
options: [
192203
encoding: :AAC,
193-
segment_duration: Time.seconds(@segment_duration)
204+
segment_duration: Membrane.Time.seconds(@segment_duration)
194205
]
195206
)
196207
|> get_child(:hls_sink_bin)

0 commit comments

Comments
 (0)