Skip to content

Commit f49ba4e

Browse files
committed
Buffer end_of_stream actions
1 parent 44f7e29 commit f49ba4e

File tree

1 file changed

+33
-13
lines changed

1 file changed

+33
-13
lines changed

lib/membrane/rtp/demuxer.ex

+33-13
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@ defmodule Membrane.RTP.Demuxer do
66
"""
77

88
use Membrane.Filter
9-
alias Membrane.{RemoteStream, RTCP, RTP}
9+
require Membrane.Pad
10+
11+
alias Membrane.Element.Action
12+
alias Membrane.{Pad, RemoteStream, RTCP, RTP}
1013

1114
def_input_pad :input,
1215
accepted_format:
@@ -35,7 +38,7 @@ defmodule Membrane.RTP.Demuxer do
3538
@type t :: %__MODULE__{
3639
stream_states: %{
3740
RTP.ssrc_t() => %{
38-
waiting_packets: [ExRTP.Packet.t()],
41+
buffered_actions: [Action.buffer() | Action.end_of_stream()],
3942
phase: :waiting_for_link | :linked
4043
}
4144
}
@@ -65,19 +68,27 @@ defmodule Membrane.RTP.Demuxer do
6568

6669
@impl true
6770
def handle_pad_added(Pad.ref(:output, ssrc) = pad, _ctx, state) do
68-
waiting_buffers =
69-
state.stream_states[ssrc].waiting_packets
70-
|> Enum.reverse()
71-
|> Enum.map(&put_packet_into_buffer/1)
71+
buffered_actions = state.stream_states[ssrc].buffered_actions
7272

7373
state =
7474
Bunch.Struct.update_in(
7575
state,
7676
[:stream_states, ssrc],
77-
&%{&1 | phase: :linked, waiting_packets: []}
77+
&%{&1 | phase: :linked, buffered_actions: []}
7878
)
7979

80-
{[stream_format: {pad, %RTP{}}, buffer: {pad, waiting_buffers}], state}
80+
{[stream_format: {pad, %RTP{}}] ++ Enum.reverse(buffered_actions), state}
81+
end
82+
83+
@impl true
84+
def handle_end_of_stream(:input, _ctx, state) do
85+
state =
86+
state.stream_states
87+
|> Enum.reduce(state, fn {ssrc, _stream_state}, state ->
88+
append_action_to_buffered_actions(ssrc, {:end_of_stream, Pad.ref(:output, ssrc)}, state)
89+
end)
90+
91+
{[forward: :end_of_stream], state}
8192
end
8293

8394
@spec classify_packet(binary()) :: :rtp | :rtcp
@@ -101,7 +112,12 @@ defmodule Membrane.RTP.Demuxer do
101112
{buffer_actions, state} =
102113
case state.stream_states[packet.ssrc].phase do
103114
:waiting_for_link ->
104-
{[], append_packet_to_waiting_packets(packet, state)}
115+
{[],
116+
append_action_to_buffered_actions(
117+
packet.ssrc,
118+
{:buffer, {Pad.ref(:output, packet.ssrc), put_packet_into_buffer(packet)}},
119+
state
120+
)}
105121

106122
:linked ->
107123
{[buffer: {Pad.ref(:output, packet.ssrc), put_packet_into_buffer(packet)}], state}
@@ -118,7 +134,7 @@ defmodule Membrane.RTP.Demuxer do
118134
@spec initialize_new_stream_state(ExRTP.Packet.t(), State.t()) ::
119135
{[Membrane.Element.Action.t()], State.t()}
120136
defp initialize_new_stream_state(packet, state) do
121-
stream_state = %{waiting_packets: [], phase: :waiting_for_link}
137+
stream_state = %{buffered_actions: [], phase: :waiting_for_link}
122138

123139
extensions =
124140
case packet.extensions do
@@ -136,9 +152,13 @@ defmodule Membrane.RTP.Demuxer do
136152
{[notify_parent: {:new_rtp_stream, packet.ssrc, packet.payload_type, extensions}], state}
137153
end
138154

139-
@spec append_packet_to_waiting_packets(ExRTP.Packet.t(), State.t()) :: State.t()
140-
defp append_packet_to_waiting_packets(packet, state) do
141-
Bunch.Struct.update_in(state, [:stream_states, packet.ssrc, :waiting_packets], &[packet | &1])
155+
@spec append_action_to_buffered_actions(
156+
ExRTP.Packet.uint32(),
157+
Action.buffer() | Action.end_of_stream(),
158+
State.t()
159+
) :: State.t()
160+
defp append_action_to_buffered_actions(ssrc, action, state) do
161+
Bunch.Struct.update_in(state, [:stream_states, ssrc, :buffered_actions], &[action | &1])
142162
end
143163

144164
@spec put_packet_into_buffer(ExRTP.Packet.t()) :: Membrane.Buffer.t()

0 commit comments

Comments
 (0)