@@ -22,6 +22,7 @@ defmodule Membrane.TimestampQueue do
22
22
dts_offset: integer ( ) ,
23
23
qex: Qex . t ( ) ,
24
24
buffers_size: non_neg_integer ( ) ,
25
+ buffers_number: non_neg_integer ( ) ,
25
26
paused_demand?: boolean ( ) ,
26
27
end_of_stream?: boolean ( )
27
28
}
@@ -97,11 +98,14 @@ defmodule Membrane.TimestampQueue do
97
98
|> push_item ( pad_ref , { :buffer , buffer } )
98
99
|> get_and_update_in ( [ :pad_queues , pad_ref ] , fn pad_queue ->
99
100
pad_queue
100
- |> Map . update! ( :buffers_size , & ( & 1 + buffer_size ) )
101
101
|> Map . update! ( :dts_offset , fn
102
102
nil -> timestamp_queue . current_queue_time - buffer . dts
103
103
valid_offset -> valid_offset
104
104
end )
105
+ |> Map . merge ( % {
106
+ buffers_size: pad_queue . buffers_size + buffer_size ,
107
+ buffers_number: pad_queue . buffers_number + 1
108
+ } )
105
109
|> actions_after_pushing_buffer ( timestamp_queue . pause_demand_boundary )
106
110
end )
107
111
end
@@ -173,6 +177,7 @@ defmodule Membrane.TimestampQueue do
173
177
dts_offset: nil ,
174
178
qex: Qex . new ( ) ,
175
179
buffers_size: 0 ,
180
+ buffers_number: 0 ,
176
181
paused_demand?: false ,
177
182
end_of_stream?: false
178
183
}
@@ -231,6 +236,7 @@ defmodule Membrane.TimestampQueue do
231
236
end
232
237
end
233
238
239
+ # todo 1: consider updating heap just after popping a buffer
234
240
defp do_pop ( timestamp_queue , pad_ref , pad_priority ) do
235
241
pad_queue = Map . get ( timestamp_queue . pad_queues , pad_ref )
236
242
@@ -240,14 +246,12 @@ defmodule Membrane.TimestampQueue do
240
246
buffer_size = timestamp_queue . metric . buffers_size ( [ buffer ] )
241
247
242
248
cond do
243
- # todo 1: consider updating heap just after popping a buffer
244
- # todo 2: holding buffers counter in pad_queue would help with a problem with buffer with payload: <<>> and metric: Bytes
245
249
pad_priority != - buffer_time ->
246
250
timestamp_queue
247
251
|> Map . update! ( :pads_heap , & ( & 1 |> Heap . pop ( ) |> Heap . push ( { - buffer_time , pad_ref } ) ) )
248
252
|> do_pop ( )
249
253
250
- buffer_size == pad_queue . buffers_size and not pad_queue . end_of_stream? ->
254
+ pad_queue . buffers_number == 1 and not pad_queue . end_of_stream? ->
251
255
# last buffer on pad queue without end of stream
252
256
{ :none , timestamp_queue }
253
257
@@ -257,7 +261,8 @@ defmodule Membrane.TimestampQueue do
257
261
pad_queue = % {
258
262
pad_queue
259
263
| qex: qex ,
260
- buffers_size: pad_queue . buffers_size - buffer_size
264
+ buffers_size: pad_queue . buffers_size - buffer_size ,
265
+ buffers_number: pad_queue . buffers_number - 1
261
266
}
262
267
263
268
timestamp_queue =
0 commit comments