diff --git a/lib/membrane/hls/cmaf_sink.ex b/lib/membrane/hls/cmaf_sink.ex index 2ffd2d4..a169865 100644 --- a/lib/membrane/hls/cmaf_sink.ex +++ b/lib/membrane/hls/cmaf_sink.ex @@ -8,7 +8,7 @@ defmodule Membrane.HLS.CMAFSink do ) def_options( - packager_pid: [ + packager: [ spec: pid(), description: "PID of the packager." ], @@ -17,8 +17,7 @@ defmodule Membrane.HLS.CMAFSink do description: "ID of the track." ], build_stream: [ - spec: - (URI.t(), Membrane.CMAF.Track.t() -> HLS.VariantStream.t() | HLS.AlternativeRendition.t()), + spec: (Membrane.CMAF.Track.t() -> HLS.VariantStream.t() | HLS.AlternativeRendition.t()), description: "Build the stream with the given stream format" ], target_segment_duration: [ @@ -28,7 +27,7 @@ defmodule Membrane.HLS.CMAFSink do @impl true def handle_init(_context, opts) do - {[], %{opts: opts, upload_tasks: %{}}} + {[], %{opts: opts}} end def handle_stream_format(:input, format, _ctx, state) do @@ -38,94 +37,31 @@ defmodule Membrane.HLS.CMAFSink do Membrane.Time.as_seconds(state.opts.target_segment_duration, :exact) |> Ratio.ceil() - Agent.update( - state.opts.packager_pid, - fn packager -> - packager = - if Packager.has_track?(packager, track_id) do - # Packager.discontinue_track(packager, track_id) - packager - else - uri = Packager.new_variant_uri(packager, track_id) - - Packager.add_track( - packager, - track_id, - codecs: Membrane.HLS.serialize_codecs(format.codecs), - stream: state.opts.build_stream.(uri, format), - segment_extension: ".m4s", - target_segment_duration: target_segment_duration - ) - end - - Packager.put_init_section(packager, track_id, format.header) - end, - :infinity - ) + if Packager.has_track?(state.opts.packager, track_id) do + # TODO: Render this configurable + # Packager.discontinue_track(packager, track_id) + else + Packager.add_track( + state.opts.packager, + track_id, + codecs: Membrane.HLS.serialize_codecs(format.codecs), + stream: state.opts.build_stream.(format), + segment_extension: ".m4s", + target_segment_duration: target_segment_duration + ) + end {[], state} end def handle_buffer(:input, buffer, _ctx, state) do - {job_ref, upload_fun} = - Agent.get_and_update( - state.opts.packager_pid, - fn packager -> - {packager, {ref, upload_fun}} = - Packager.put_segment_async( - packager, - state.opts.track_id, - buffer.payload, - Membrane.Time.as_seconds(buffer.metadata.duration) |> Ratio.to_float() - ) - - {{ref, upload_fun}, packager} - end, - :infinity - ) - - task = Task.async(upload_fun) - - {[], put_in(state, [:upload_tasks, task.ref], %{job_ref: job_ref, task: task})} - end - - def handle_info({task_ref, :ok}, _ctx, state) do - Process.demonitor(task_ref, [:flush]) - - {data, state} = pop_in(state, [:upload_tasks, task_ref]) - - Agent.update( - state.opts.packager_pid, - fn packager -> - Packager.ack_segment(packager, state.opts.track_id, data.job_ref) - end, - :infinity + Packager.put_segment( + state.opts.packager, + state.opts.track_id, + buffer.payload, + Membrane.Time.as_seconds(buffer.metadata.duration) |> Ratio.to_float() ) {[], state} end - - def handle_info({:DOWN, _ref, _, _, reason}, _ctx, state) do - raise "Cannot write segment of track #{state.track_id} with reason: #{inspect(reason)}." - {[], state} - end - - def handle_end_of_stream(:input, _ctx, state) do - state.upload_tasks - |> Map.values() - |> Enum.map(& &1.task) - |> Task.await_many(:infinity) - - Agent.update( - state.opts.packager_pid, - fn packager -> - Enum.reduce(state.upload_tasks, packager, fn {_task_ref, data}, packager -> - Packager.ack_segment(packager, state.opts.track_id, data.job_ref) - end) - end, - :infinity - ) - - {[], %{state | upload_tasks: %{}}} - end end diff --git a/lib/membrane/hls/sink_bin.ex b/lib/membrane/hls/sink_bin.ex index 61de99c..cae0b5f 100644 --- a/lib/membrane/hls/sink_bin.ex +++ b/lib/membrane/hls/sink_bin.ex @@ -9,10 +9,10 @@ defmodule Membrane.HLS.SinkBin do require Membrane.Logger def_options( - packager_pid: [ + packager: [ spec: pid(), description: """ - PID of a `HLS.Packager` which must be wrapped in an Agent (for now). + PID of a `HLS.Packager`. """ ], target_segment_duration: [ @@ -52,7 +52,7 @@ defmodule Membrane.HLS.SinkBin do ], build_stream: [ spec: - (URI.t(), Membrane.CMAF.Track.t() -> + (Membrane.CMAF.Track.t() -> HLS.VariantStream.t() | HLS.AlternativeRendition.t()), description: "Build either a `HLS.VariantStream` or a `HLS.AlternativeRendition`." ], @@ -112,7 +112,7 @@ defmodule Membrane.HLS.SinkBin do }) |> via_out(Pad.ref(:output), options: [tracks: [track_id]]) |> child({:sink, track_id}, %Membrane.HLS.CMAFSink{ - packager_pid: state.opts.packager_pid, + packager: state.opts.packager, track_id: track_id, target_segment_duration: state.opts.target_segment_duration, build_stream: pad_opts.build_stream @@ -133,7 +133,7 @@ defmodule Membrane.HLS.SinkBin do segment_min_duration: pad_opts.segment_duration }) |> child({:sink, track_id}, %Membrane.HLS.CMAFSink{ - packager_pid: state.opts.packager_pid, + packager: state.opts.packager, track_id: track_id, target_segment_duration: state.opts.target_segment_duration, build_stream: pad_opts.build_stream @@ -159,7 +159,7 @@ defmodule Membrane.HLS.SinkBin do ] }) |> child({:sink, track_id}, %Membrane.HLS.WebVTTSink{ - packager_pid: state.opts.packager_pid, + packager: state.opts.packager, track_id: track_id, target_segment_duration: state.opts.target_segment_duration, build_stream: pad_opts.build_stream @@ -178,7 +178,7 @@ defmodule Membrane.HLS.SinkBin do |> put_in([:live_state], %{stop: true}) |> put_in([:ended_sinks], ended_sinks) - if state.flush, do: Agent.update(state.opts.packager_pid, &Packager.flush/1, :infinity) + if state.flush, do: Packager.flush(state.opts.packager) {[notify_parent: :end_of_stream], state} else @@ -193,7 +193,7 @@ defmodule Membrane.HLS.SinkBin do @impl true def handle_parent_notification(:flush, ctx, state) do if not state.flush and all_streams_ended?(ctx, state.ended_sinks) do - Agent.update(state.opts.packager_pid, &Packager.flush/1, :infinity) + Packager.flush(state.opts.packager) {[notify_parent: :end_of_stream], %{state | flush: true}} else {[], %{state | flush: true}} @@ -210,13 +210,7 @@ defmodule Membrane.HLS.SinkBin do "Packager: syncing playlists up to #{state.live_state.next_sync_point}s" ) - Agent.update( - state.opts.packager_pid, - fn p -> - Packager.sync(p, state.live_state.next_sync_point) - end, - :infinity - ) + Packager.sync(state.opts.packager, state.live_state.next_sync_point) {[], live_schedule_next_sync(state)} end @@ -246,13 +240,9 @@ defmodule Membrane.HLS.SinkBin do defp live_init_state(state) do # Tells where in the playlist we should start issuing segments. next_sync_point = - Agent.get( - state.opts.packager_pid, - &Packager.next_sync_point( - &1, - Membrane.Time.as_seconds(state.opts.target_segment_duration, :round) - ), - :infinity + Packager.next_sync_point( + state.opts.packager, + Membrane.Time.as_seconds(state.opts.target_segment_duration, :round) ) {:live, safety_delay} = state.opts.mode diff --git a/lib/membrane/hls/webvtt_sink.ex b/lib/membrane/hls/webvtt_sink.ex index 932cfa4..4fdec4e 100644 --- a/lib/membrane/hls/webvtt_sink.ex +++ b/lib/membrane/hls/webvtt_sink.ex @@ -8,7 +8,7 @@ defmodule Membrane.HLS.WebVTTSink do ) def_options( - packager_pid: [ + packager: [ spec: pid(), description: "PID of the packager." ], @@ -17,8 +17,7 @@ defmodule Membrane.HLS.WebVTTSink do description: "ID of the track." ], build_stream: [ - spec: - (URI.t(), Membrane.CMAF.Track.t() -> HLS.VariantStream.t() | HLS.AlternativeRendition.t()), + spec: (Membrane.CMAF.Track.t() -> HLS.VariantStream.t() | HLS.AlternativeRendition.t()), description: "Build the stream with the given stream format" ], target_segment_duration: [ @@ -38,90 +37,30 @@ defmodule Membrane.HLS.WebVTTSink do Membrane.Time.as_seconds(state.opts.target_segment_duration, :exact) |> Ratio.ceil() - Agent.update( - state.opts.packager_pid, - fn packager -> - if Packager.has_track?(packager, track_id) do - # Packager.discontinue_track(packager, track_id) - packager - else - uri = Packager.new_variant_uri(packager, track_id) - - Packager.add_track( - packager, - track_id, - stream: state.opts.build_stream.(uri, format), - segment_extension: ".vtt", - target_segment_duration: target_segment_duration - ) - end - end, - :infinity - ) + if Packager.has_track?(state.opts.packager, track_id) do + # TODO: Render this configurable + # Packager.discontinue_track(packager, track_id) + else + Packager.add_track( + state.opts.packager, + track_id, + stream: state.opts.build_stream.(format), + segment_extension: ".vtt", + target_segment_duration: target_segment_duration + ) + end {[], state} end def handle_buffer(:input, buffer, _ctx, state) do - {job_ref, upload_fun} = - Agent.get_and_update( - state.opts.packager_pid, - fn packager -> - {packager, {ref, upload_fun}} = - Packager.put_segment_async( - packager, - state.opts.track_id, - buffer.payload, - Membrane.Time.as_seconds(buffer.metadata.duration) |> Ratio.to_float() - ) - - {{ref, upload_fun}, packager} - end, - :infinity - ) - - task = Task.async(upload_fun) - - {[], put_in(state, [:upload_tasks, task.ref], %{job_ref: job_ref, task: task})} - end - - def handle_info({task_ref, :ok}, _ctx, state) do - Process.demonitor(task_ref, [:flush]) - - {data, state} = pop_in(state, [:upload_tasks, task_ref]) - - Agent.update( - state.opts.packager_pid, - fn packager -> - Packager.ack_segment(packager, state.opts.track_id, data.job_ref) - end, - :infinity + Packager.put_segment( + state.opts.packager, + state.opts.track_id, + buffer.payload, + Membrane.Time.as_seconds(buffer.metadata.duration) |> Ratio.to_float() ) {[], state} end - - def handle_info({:DOWN, _ref, _, _, reason}, _ctx, state) do - raise "Cannot write segment of track #{state.track_id} with reason: #{inspect(reason)}." - {[], state} - end - - def handle_end_of_stream(:input, _ctx, state) do - state.upload_tasks - |> Map.values() - |> Enum.map(& &1.task) - |> Task.await_many(:infinity) - - Agent.update( - state.opts.packager_pid, - fn packager -> - Enum.reduce(state.upload_tasks, packager, fn {_task_ref, data}, packager -> - Packager.ack_segment(packager, state.opts.track_id, data.job_ref) - end) - end, - :infinity - ) - - {[], %{state | upload_tasks: %{}}} - end end diff --git a/mix.lock b/mix.lock index d639e3e..f4be10b 100644 --- a/mix.lock +++ b/mix.lock @@ -3,7 +3,7 @@ "bunch": {:hex, :bunch, "1.6.1", "5393d827a64d5f846092703441ea50e65bc09f37fd8e320878f13e63d410aec7", [:mix], [], "hexpm", "286cc3add551628b30605efbe2fca4e38cc1bea89bcd0a1a7226920b3364fe4a"}, "coerce": {:hex, :coerce, "1.0.1", "211c27386315dc2894ac11bc1f413a0e38505d808153367bd5c6e75a4003d096", [:mix], [], "hexpm", "b44a691700f7a1a15b4b7e2ff1fa30bebd669929ac8aa43cffe9e2f8bf051cf1"}, "heap": {:hex, :heap, "2.0.2", "d98cb178286cfeb5edbcf17785e2d20af73ca57b5a2cf4af584118afbcf917eb", [:mix], [], "hexpm", "ba9ea2fe99eb4bcbd9a8a28eaf71cbcac449ca1d8e71731596aace9028c9d429"}, - "kim_hls": {:git, "https://github.com/kim-company/kim_hls.git", "3afc81306caaebeda30294bd14456abfffcd954c", []}, + "kim_hls": {:git, "https://github.com/kim-company/kim_hls.git", "c06f4de542da94fc2079dbefb7e13ae584ebe13c", []}, "kim_q": {:hex, :kim_q, "1.0.0", "17cfc45e9f7e65485f0f31bbf09893d6ff35cc2fbefc39aed146a3c29740584e", [:mix], [{:qex, "~> 0.5", [hex: :qex, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.1", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7a8ee76a2c2e774c34345df3c7a234a8effeedc3f3aea845feb7c09030097278"}, "kim_subtitle": {:git, "https://github.com/kim-company/kim_subtitle.git", "8239e1bcea938167829a6b8bd2a9678c63c7bdd4", []}, "logger_backends": {:hex, :logger_backends, "1.0.0", "09c4fad6202e08cb0fbd37f328282f16539aca380f512523ce9472b28edc6bdf", [:mix], [], "hexpm", "1faceb3e7ec3ef66a8f5746c5afd020e63996df6fd4eb8cdb789e5665ae6c9ce"}, diff --git a/test/membrane/hls/sink_bin_test.exs b/test/membrane/hls/sink_bin_test.exs index 93e05b8..f98ae9f 100644 --- a/test/membrane/hls/sink_bin_test.exs +++ b/test/membrane/hls/sink_bin_test.exs @@ -6,19 +6,17 @@ defmodule Membrane.HLS.SinkBinTest do @tag :tmp_dir test "on a new stream", %{tmp_dir: tmp_dir} do - {:ok, packager_pid} = - Agent.start_link(fn -> - HLS.Packager.new( - manifest_uri: URI.new!("file://#{tmp_dir}/stream.m3u8"), - storage: HLS.Storage.File.new(), - resume_finished_tracks: true, - restore_pending_segments: false - ) - end) + {:ok, packager} = + HLS.Packager.start_link( + manifest_uri: URI.new!("file://#{tmp_dir}/stream.m3u8"), + storage: HLS.Storage.File.new(), + resume_finished_tracks: true, + restore_pending_segments: false + ) spec = [ child(:sink, %Membrane.HLS.SinkBin{ - packager_pid: packager_pid, + packager: packager, target_segment_duration: Membrane.Time.seconds(7) }), @@ -39,9 +37,9 @@ defmodule Membrane.HLS.SinkBinTest do options: [ encoding: :AAC, segment_duration: Membrane.Time.seconds(6), - build_stream: fn uri, %Membrane.CMAF.Track{} = format -> + build_stream: fn %Membrane.CMAF.Track{} = format -> %HLS.AlternativeRendition{ - uri: uri, + uri: nil, name: "Audio (EN)", type: :audio, group_id: "audio", @@ -90,9 +88,9 @@ defmodule Membrane.HLS.SinkBinTest do options: [ encoding: :TEXT, segment_duration: Membrane.Time.seconds(6), - build_stream: fn uri, %Membrane.Text{} = format -> + build_stream: fn %Membrane.Text{} = format -> %HLS.AlternativeRendition{ - uri: uri, + uri: nil, name: "Subtitles (EN)", type: :subtitles, group_id: "subtitles", @@ -117,9 +115,9 @@ defmodule Membrane.HLS.SinkBinTest do options: [ encoding: :H264, segment_duration: Membrane.Time.seconds(6), - build_stream: fn uri, %Membrane.CMAF.Track{} = format -> + build_stream: fn %Membrane.CMAF.Track{} = format -> %HLS.VariantStream{ - uri: uri, + uri: nil, bandwidth: 850_000, resolution: format.resolution, frame_rate: 30.0,