Skip to content

Commit

Permalink
Add support for additionalMedia message (#63)
Browse files Browse the repository at this point in the history
* Add support for additional media message

* Adjust to CR
  • Loading branch information
Qizot authored Feb 15, 2024
1 parent 4fcab69 commit e1fef39
Show file tree
Hide file tree
Showing 20 changed files with 432 additions and 21 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ The package can be installed by adding `membrane_rtmp_plugin` to your list of de
```elixir
def deps do
[
{:membrane_rtmp_plugin, "~> 0.20.2"}
{:membrane_rtmp_plugin, "~> 0.22.0"}
]
end
```
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule Membrane.RTMP.AMF.Encoder do
defmodule Membrane.RTMP.AMF0.Encoder do
@moduledoc false

@type basic_object_t :: float() | boolean() | String.t() | map() | :null
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
defmodule Membrane.RTMP.AMF.Parser do
defmodule Membrane.RTMP.AMF0.Parser do
@moduledoc false

alias Membrane.RTMP.AMF3

@doc """
Parses message from [AMF0](https://en.wikipedia.org/wiki/Action_Message_Format#AMF0) format to Erlang terms.
"""
Expand Down Expand Up @@ -48,8 +50,24 @@ defmodule Membrane.RTMP.AMF.Parser do
parse_object_pairs(rest, [])
end

defp parse_value(_data) do
raise "Unknown data type"
defp parse_value(<<0x0A, size::32, rest::binary>>) do
{acc, rest} =
Enum.reduce(1..size, {[], rest}, fn _i, {acc, rest} ->
{value, rest} = parse_value(rest)

{[value | acc], rest}
end)

{Enum.reverse(acc), rest}
end

# switch to AMF3
defp parse_value(<<0x11, rest::binary>>) do
AMF3.Parser.parse_one(rest)
end

defp parse_value(data) do
raise "Unknown data type #{inspect(data)}"
end

# we reached object end
Expand Down
215 changes: 215 additions & 0 deletions lib/membrane_rtmp_plugin/rtmp/source/amf3/parser.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
defmodule Membrane.RTMP.AMF3.Parser do
@moduledoc false

import Bitwise

@doc """
Parses message from [AMF3](https://en.wikipedia.org/wiki/Action_Message_Format#AMF3) format to Erlang terms.
"""
@spec parse(binary()) :: list()
def parse(binary) do
do_parse(binary, [])
end

@doc """
Parses a single message from [AMF3](https://en.wikipedia.org/wiki/Action_Message_Format#AMF3) format to Erlang terms.
"""
@spec parse_one(binary()) :: {value :: term(), rest :: binary()}
def parse_one(binary) do
parse_value(binary)
end

defp do_parse(<<>>, acc), do: Enum.reverse(acc)

defp do_parse(payload, acc) do
{value, rest} = parse_value(payload)

do_parse(rest, [value | acc])
end

# undefined
defp parse_value(<<0x00, rest::binary>>) do
{:undefined, rest}
end

# null
defp parse_value(<<0x01, rest::binary>>) do
{nil, rest}
end

# false
defp parse_value(<<0x02, rest::binary>>) do
{false, rest}
end

# true
defp parse_value(<<0x03, rest::binary>>) do
{true, rest}
end

# integer
defp parse_value(<<0x04, rest::binary>>) do
parse_integer(rest)
end

# double
defp parse_value(<<0x05, double::float-size(64), rest::binary>>) do
{double, rest}
end

# string
defp parse_value(<<0x06, rest::binary>>) do
parse_string(rest)
end

# xml document
defp parse_value(<<0x07, rest::binary>>) do
case check_value_type(rest) do
{:value, size, rest} ->
<<string::binary-size(size), rest::binary>> = rest

{{:xml, string}, rest}

{:ref, ref, rest} ->
{{:ref, {:xml, ref}}, rest}
end
end

# date
defp parse_value(<<0x08, rest::binary>>) do
case check_value_type(rest) do
{:value, _value, rest} ->
<<date::float-size(64), rest::binary>> = rest

{DateTime.from_unix!(trunc(date), :millisecond), rest}

{:ref, ref, rest} ->
{{:ref, {:date, ref}}, rest}
end
end

# array
defp parse_value(<<0x09, rest::binary>>) do
case check_value_type(rest) do
{:value, dense_array_size, rest} ->
{assoc_array, rest} = parse_assoc_array(rest, [])
{dense_array, rest} = parse_dense_array(rest, dense_array_size, [])

{assoc_array ++ dense_array, rest}

{:ref, ref, rest} ->
{{:array_ref, ref}, rest}
end
end

# object
defp parse_value(<<0x0A, _rest::binary>>) do
raise "Unsupported AMF3 type: object"
end

# xml
defp parse_value(<<0x0B, rest::binary>>) do
case check_value_type(rest) do
{:value, size, rest} ->
<<string::binary-size(size), rest::binary>> = rest

{{:xml_script, string}, rest}

{:ref, ref, rest} ->
{{:ref, {:xml_script, ref}}, rest}
end
end

# byte array
defp parse_value(<<0x0C, rest::binary>>) do
case check_value_type(rest) do
{:value, size, rest} ->
<<bytes::binary-size(size), rest::binary>> = rest

{bytes, rest}

{:ref, ref, rest} ->
{{:ref, {:byte_array, ref}}, rest}
end
end

# vector int
defp parse_value(<<0x0D, _rest::binary>>) do
raise "Unsupported AMF3 type: vector int"
end

# vector uint
defp parse_value(<<0x0E, _rest::binary>>) do
raise "Unsupported AMF3 type: vector uint"
end

# vector double
defp parse_value(<<0x0F, _rest::binary>>) do
raise "Unsupported AMF3 type: vector double"
end

# vector object
defp parse_value(<<0x10, _rest::binary>>) do
raise "Unsupported AMF3 type: vector object"
end

# dictionary
defp parse_value(<<0x11, _rest::binary>>) do
raise "Unsupported AMF3 type: dictionary"
end

defp parse_integer(<<0::1, value::7, rest::binary>>), do: {value, rest}

defp parse_integer(<<1::1, first::7, 0::1, second::7, rest::binary>>) do
{bsl(first, 7) + second, rest}
end

defp parse_integer(<<1::1, first::7, 1::1, second::7, 0::1, third::7, rest::binary>>) do
{bsl(first, 14) + bsl(second, 7) + third, rest}
end

defp parse_integer(<<1::1, first::7, 1::1, second::7, 0::1, third::7, fourth::8, rest::binary>>) do
{bsl(first, 22) + bsl(second, 15) + bsl(third, 7) + fourth, rest}
end

defp parse_string(payload) do
case check_value_type(payload) do
{:value, size, rest} ->
<<string::binary-size(size), rest::binary>> = rest

{string, rest}

{:ref, ref, rest} ->
{{:ref, {:string, ref}}, rest}
end
end

defp parse_assoc_array(<<0x01, rest::binary>>, acc), do: {Enum.reverse(acc), rest}

defp parse_assoc_array(payload, acc) do
{key, rest} = parse_string(payload)
{value, rest} = parse_value(rest)

parse_assoc_array(rest, [{key, value} | acc])
end

defp parse_dense_array(rest, 0, acc), do: {Enum.reverse(acc), rest}

defp parse_dense_array(rest, size, acc) do
{value, rest} = parse_value(rest)

parse_dense_array(rest, size - 1, [value | acc])
end

defp check_value_type(rest) do
{number, rest} = parse_integer(rest)

value = number >>> 1

if (number &&& 0x01) == 1 do
{:value, value, rest}
else
{:ref, value, rest}
end
end
end
5 changes: 3 additions & 2 deletions lib/membrane_rtmp_plugin/rtmp/source/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ defmodule Membrane.RTMP.Message do

@amf_data_to_module %{
"@setDataFrame" => Messages.SetDataFrame,
"onMetaData" => Messages.OnMetaData
"onMetaData" => Messages.OnMetaData,
"additionalMedia" => Messages.AdditionalMedia
}

@spec deserialize_message(type_id :: integer(), binary()) :: struct()
Expand Down Expand Up @@ -88,7 +89,7 @@ defmodule Membrane.RTMP.Message do

defp message_from_modules(payload, mapping, required? \\ false) do
payload
|> Membrane.RTMP.AMF.Parser.parse()
|> Membrane.RTMP.AMF0.Parser.parse()
|> then(fn [command | _rest] = arguments ->
if required? do
Map.fetch!(mapping, command)
Expand Down
63 changes: 59 additions & 4 deletions lib/membrane_rtmp_plugin/rtmp/source/message_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ defmodule Membrane.RTMP.MessageHandler do
{:cont, state}
end

defp do_handle_client_message(%Messages.AdditionalMedia{} = media, header, state) do
state = get_additional_media_actions(header, media, state)
{:cont, state}
end

defp do_handle_client_message(%Handshake.Step{type: :s0_s1_s2} = step, _header, state) do
state.socket_module.send(state.socket, Handshake.Step.serialize(step))

Expand Down Expand Up @@ -154,6 +159,21 @@ defmodule Membrane.RTMP.MessageHandler do
end
end

@validation_stage :on_expect_additional_media
defp do_handle_client_message(
%Messages.OnExpectAdditionalMedia{} = additional_media,
_header,
state
) do
case MessageValidator.validate_on_expect_additional_media(state.validator, additional_media) do
{:ok, _msg} = result ->
{:cont, validation_action(state, @validation_stage, result)}

{:error, _reason} = error ->
{:halt, {:error, :stream_validation, validation_action(state, @validation_stage, error)}}
end
end

@validation_stage :on_meta_data
defp do_handle_client_message(%Messages.OnMetaData{} = on_meta_data, _header, state) do
case MessageValidator.validate_on_meta_data(state.validator, on_meta_data) do
Expand Down Expand Up @@ -258,31 +278,66 @@ defmodule Membrane.RTMP.MessageHandler do
}
end

defp get_additional_media_actions(rtmp_header, additional_media, %{header_sent?: true} = state) do
# NOTE: we are replacing the type_id from 18 to 8 (script data to audio data) as it carries the
# additional audio track
data = additional_media.media

header = %Membrane.RTMP.Header{
rtmp_header
| type_id: 8,
body_size: byte_size(data)
}

# NOTE: for additional media we are also setting the stream_id to 1.
# It is against the spec but it simplifies things for us since we don't have to
# handle dynamic pads in the RTMP source + our FLV demuxer handles that well.
payload = get_flv_tag(header, 1, data)

action = {:buffer, {:output, %Buffer{payload: payload}}}

Map.update!(state, :actions, &[action | &1])
end

defp get_additional_media_actions(rtmp_header, additional_media, state) do
data = additional_media.media

header = %Membrane.RTMP.Header{rtmp_header | type_id: 8, body_size: byte_size(data)}
payload = get_flv_header() <> get_flv_tag(header, 1, data)

action = {:buffer, {:output, %Buffer{payload: payload}}}

%{state | header_sent?: true, actions: [action | state.actions]}
end

defp get_flv_header() do
alias Membrane.FLV

{header, 0} =
FLV.Serializer.serialize(%FLV.Header{audio_present?: true, video_present?: true}, 0)
FLV.Serializer.serialize(
%FLV.Header{audio_present?: true, video_present?: true},
0
)

# Add PreviousTagSize, which is 0 for the first tag
header <> <<0::32>>
end

# according to the FLV spec, the stream ID should always be 0
# but we can use 1 for hacking around Twitch's addtional audio stream
defp get_flv_tag(
%Membrane.RTMP.Header{
timestamp: timestamp,
body_size: data_size,
type_id: type_id
},
stream_id \\ 0,
payload
) do
tag_size = data_size + 11

<<upper_timestamp::8, lower_timestamp::24>> = <<timestamp::32>>

# according to the FLV spec, the stream ID should always be 0
stream_id = 0

<<type_id::8, data_size::24, lower_timestamp::24, upper_timestamp::8, stream_id::24,
payload::binary-size(data_size), tag_size::32>>
end
Expand Down
Loading

0 comments on commit e1fef39

Please sign in to comment.