@@ -3,13 +3,15 @@ extern crate gstreamer as gst;
3
3
extern crate gstreamer_sdp;
4
4
extern crate gstreamer_sdp_sys;
5
5
extern crate gstreamer_webrtc;
6
+ extern crate rand;
6
7
extern crate ws;
7
8
#[ macro_use]
8
9
extern crate serde_json;
9
10
10
11
use glib:: translate:: * ;
11
12
use gst:: prelude:: * ;
12
13
use gst:: { BinExt , ElementExt } ;
14
+ use rand:: Rng ;
13
15
use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
14
16
use std:: sync:: Arc ;
15
17
@@ -61,13 +63,16 @@ fn check_plugins() -> bool {
61
63
62
64
fn setup_call ( app_state : & Arc < AtomicUsize > , out : & ws:: Sender ) -> AppState {
63
65
app_state. store ( AppState :: PeerConnecting as usize , Ordering :: Relaxed ) ;
66
+ println ! ( "Setting up signalling server call with 1" ) ;
64
67
out. send ( "SESSION 1" ) . unwrap ( ) ;
65
68
return AppState :: PeerConnecting ;
66
69
}
67
70
68
71
fn register_with_server ( app_state : & Arc < AtomicUsize > , out : & ws:: Sender ) -> AppState {
69
72
app_state. store ( AppState :: ServerRegistering as usize , Ordering :: Relaxed ) ;
70
- out. send ( "HELLO 2" ) . unwrap ( ) ;
73
+ let our_id = rand:: thread_rng ( ) . gen_range ( 10 , 10_000 ) ;
74
+ println ! ( "Registering id {} with server" , our_id) ;
75
+ out. send ( format ! ( "HELLO {}" , our_id) ) . unwrap ( ) ;
71
76
return AppState :: ServerRegistering ;
72
77
}
73
78
@@ -109,14 +114,11 @@ fn on_offer_created(
109
114
) ;
110
115
let reply = promise. get_reply ( ) . unwrap ( ) ;
111
116
112
- println ! ( "{:?}" , reply) ;
113
-
114
117
let offer = reply
115
118
. get_value ( "offer" )
116
119
. unwrap ( )
117
120
. get :: < gstreamer_webrtc:: WebRTCSessionDescription > ( )
118
121
. expect ( "Invalid argument" ) ;
119
- println ! ( "{:?}" , offer) ;
120
122
let promise = gst:: Promise :: new ( ) ;
121
123
webrtc
122
124
. emit (
@@ -132,22 +134,19 @@ fn on_negotiation_needed(
132
134
values : & [ glib:: Value ] ,
133
135
out : & ws:: Sender ,
134
136
) -> Option < glib:: Value > {
135
- println ! ( "on-negotiation-needed {:?}" , values) ;
136
137
app_state. store ( AppState :: PeerCallNegotiating as usize , Ordering :: Relaxed ) ;
137
138
138
139
let webrtc = values[ 0 ] . get :: < gst:: Element > ( ) . expect ( "Invalid argument" ) ;
139
140
let webrtc_clone = webrtc. clone ( ) ;
140
141
let out_clone = out. clone ( ) ;
141
142
let app_state_clone = app_state. clone ( ) ;
142
- println ! ( "{:?}" , webrtc) ;
143
143
let promise = gst:: Promise :: new_with_change_func ( move |promise : & gst:: Promise | {
144
144
on_offer_created ( & app_state_clone, webrtc, promise, & out_clone) ;
145
145
} ) ;
146
146
let options = gst:: Structure :: new_empty ( "options" ) ;
147
- let desc = webrtc_clone
147
+ webrtc_clone
148
148
. emit ( "create-offer" , & [ & options. to_value ( ) , & promise. to_value ( ) ] )
149
149
. unwrap ( ) ;
150
- println ! ( "{:?}" , desc) ;
151
150
None
152
151
}
153
152
@@ -199,7 +198,6 @@ fn on_incoming_decodebin_stream(
199
198
200
199
let caps = pad. get_current_caps ( ) . unwrap ( ) ;
201
200
let name = caps. get_structure ( 0 ) . unwrap ( ) . get_name ( ) ;
202
- println ! ( "CAPS NAME {:?}" , name) ;
203
201
if name. starts_with ( "video" ) {
204
202
handle_media_stream ( & pad, & pipe, "videoconvert" , "autovideosink" ) ;
205
203
} else if name. starts_with ( "audio" ) {
@@ -210,17 +208,16 @@ fn on_incoming_decodebin_stream(
210
208
None
211
209
}
212
210
213
- fn on_incoming_stream ( values : & [ glib:: Value ] , pipeline : & gst:: Element ) -> Option < glib:: Value > {
211
+ fn on_incoming_stream ( values : & [ glib:: Value ] , pipe : & gst:: Element ) -> Option < glib:: Value > {
214
212
let webrtc = values[ 0 ] . get :: < gst:: Element > ( ) . expect ( "Invalid argument" ) ;
215
213
let decodebin = gst:: ElementFactory :: make ( "decodebin" , None ) . unwrap ( ) ;
216
- let pipeline_clone = pipeline . clone ( ) ;
214
+ let pipe_clone = pipe . clone ( ) ;
217
215
decodebin
218
216
. connect ( "pad-added" , false , move |values| {
219
- on_incoming_decodebin_stream ( values, & pipeline_clone )
217
+ on_incoming_decodebin_stream ( values, & pipe_clone )
220
218
} )
221
219
. unwrap ( ) ;
222
- pipeline
223
- . clone ( )
220
+ pipe. clone ( )
224
221
. dynamic_cast :: < gst:: Bin > ( )
225
222
. unwrap ( )
226
223
. add ( & decodebin)
@@ -248,13 +245,12 @@ fn send_ice_candidate_message(
248
245
"sdpMLineIndex" : mlineindex,
249
246
}
250
247
} ) ;
251
- println ! ( "{}" , message. to_string( ) ) ;
252
248
out. send ( message. to_string ( ) ) . unwrap ( ) ;
253
249
None
254
250
}
255
251
256
252
fn start_pipeline ( app_state : & Arc < AtomicUsize > , out : & ws:: Sender ) -> gst:: Element {
257
- let pipeline = gst:: parse_launch (
253
+ let pipe = gst:: parse_launch (
258
254
"webrtcbin name=sendrecv
259
255
stun-server=stun://stun.l.google.com:19302
260
256
videotestsrc pattern=ball ! videoconvert ! queue ! vp8enc deadline=1 ! rtpvp8pay !
@@ -266,16 +262,12 @@ fn start_pipeline(app_state: &Arc<AtomicUsize>, out: &ws::Sender) -> gst::Elemen
266
262
" ,
267
263
) . unwrap ( ) ;
268
264
269
- let webrtc = pipeline
270
- . clone ( )
265
+ let webrtc = pipe. clone ( )
271
266
. dynamic_cast :: < gst:: Bin > ( )
272
267
. unwrap ( )
273
268
. get_by_name ( "sendrecv" )
274
269
. unwrap ( ) ;
275
270
276
- webrtc. connect_pad_added ( move |_, _src_pad| {
277
- println ! ( "connnect pad added" ) ;
278
- } ) ;
279
271
let out_clone = out. clone ( ) ;
280
272
let app_state_clone = app_state. clone ( ) ;
281
273
webrtc
@@ -292,14 +284,15 @@ fn start_pipeline(app_state: &Arc<AtomicUsize>, out: &ws::Sender) -> gst::Elemen
292
284
} )
293
285
. unwrap ( ) ;
294
286
295
- let pipeline_clone = pipeline. clone ( ) ;
287
+ let pipe_clone = pipe. clone ( ) ;
288
+ // TODO: replace with webrtc.connect_pad_added
296
289
webrtc
297
290
. connect ( "pad-added" , false , move |values| {
298
- on_incoming_stream ( values, & pipeline_clone )
291
+ on_incoming_stream ( values, & pipe_clone )
299
292
} )
300
293
. unwrap ( ) ;
301
- // TODO pad-added
302
- let ret = pipeline . set_state ( gst:: State :: Playing ) ;
294
+
295
+ let ret = pipe . set_state ( gst:: State :: Playing ) ;
303
296
assert_ne ! ( ret, gst:: StateChangeReturn :: Failure ) ;
304
297
305
298
return webrtc;
@@ -327,33 +320,26 @@ impl ws::Handler for WsClient {
327
320
328
321
fn on_message ( & mut self , msg : ws:: Message ) -> ws:: Result < ( ) > {
329
322
// Close the connection when we get a response from the server
330
- println ! ( "Got message: {}" , msg) ;
331
323
let msg_text = msg. into_text ( ) . unwrap ( ) ;
332
- let matched = match msg_text. as_ref ( ) {
333
- "HELLO" => {
334
- if self . app_state . load ( Ordering :: Relaxed ) != ( AppState :: ServerRegistering as usize )
335
- {
336
- // TODO: signal and cleanup
337
- panic ! ( "ERROR: Received HELLO when not registering" ) ;
338
- }
339
- self . update_state ( AppState :: ServerRegistered ) ;
340
- setup_call ( & self . app_state , & self . out ) ;
341
- true
324
+ if msg_text == "HELLO" {
325
+ if self . app_state . load ( Ordering :: Relaxed ) != ( AppState :: ServerRegistering as usize ) {
326
+ // TODO: signal and cleanup
327
+ panic ! ( "ERROR: Received HELLO when not registering" ) ;
342
328
}
343
- "SESSION_OK" => {
344
- if self . app_state . load ( Ordering :: Relaxed ) != ( AppState :: PeerConnecting as usize ) {
345
- // TODO: signal and cleanup
346
- panic ! ( "ERROR: Received SESSION_OK when not calling" ) ;
347
- }
348
- self . update_state ( AppState :: PeerConnected ) ;
349
- self . webrtc = Some ( start_pipeline ( & self . app_state , & self . out ) ) ;
350
- true
329
+ self . update_state ( AppState :: ServerRegistered ) ;
330
+ setup_call ( & self . app_state , & self . out ) ;
331
+ return Ok ( ( ) ) ;
332
+ }
333
+ if msg_text == "SESSION_OK" {
334
+ if self . app_state . load ( Ordering :: Relaxed ) != ( AppState :: PeerConnecting as usize ) {
335
+ // TODO: signal and cleanup
336
+ panic ! ( "ERROR: Received SESSION_OK when not calling" ) ;
351
337
}
352
- _ => false ,
353
- } ;
354
- if matched {
338
+ self . update_state ( AppState :: PeerConnected ) ;
339
+ self . webrtc = Some ( start_pipeline ( & self . app_state , & self . out ) ) ;
355
340
return Ok ( ( ) ) ;
356
341
}
342
+
357
343
if msg_text. starts_with ( "ERROR" ) {
358
344
println ! ( "Got error message! {}" , msg_text) ;
359
345
let error = match self . app_state . load ( Ordering :: Relaxed ) {
@@ -402,7 +388,6 @@ impl ws::Handler for WsClient {
402
388
self . update_state ( AppState :: PeerCallStarted ) ;
403
389
}
404
390
if json_msg. get ( "ice" ) . is_some ( ) {
405
- println ! ( "ice {:?}" , json_msg) ;
406
391
let candidate = json_msg[ "ice" ] [ "candidate" ] . as_str ( ) . unwrap ( ) ;
407
392
let sdpmlineindex = json_msg[ "ice" ] [ "sdpMLineIndex" ] . as_u64 ( ) . unwrap ( ) as u32 ;
408
393
self . webrtc
0 commit comments