Skip to content
This repository was archived by the owner on Apr 25, 2023. It is now read-only.

Commit 59a5616

Browse files
committed
Added minor fixes and improvements related to PR
1 parent e03100a commit 59a5616

File tree

2 files changed

+49
-70
lines changed

2 files changed

+49
-70
lines changed

sendrecv/gst-rust/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,4 @@ gstreamer-sdp-sys = { git = "https://github.com/sdroege/gstreamer-sys" }
1313
glib = { git = "https://github.com/gtk-rs/glib" }
1414
serde_json = "1.0.19"
1515
ws = "0.7.6"
16+
rand = "0.5"

sendrecv/gst-rust/src/main.rs

Lines changed: 48 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,9 @@ use glib::translate::*;
1212
use gst::prelude::*;
1313
use gst::{BinExt, ElementExt};
1414
use rand::Rng;
15-
use std::sync::atomic::{AtomicUsize, Ordering};
16-
use std::sync::Arc;
15+
use std::sync::{Arc, Mutex};
1716

18-
#[derive(PartialEq, Eq, Debug)]
17+
#[derive(PartialEq, PartialOrd, Eq, Debug)]
1918
enum AppState {
2019
// AppStateUnknown = 0,
2120
AppStateErr = 1,
@@ -31,12 +30,13 @@ enum AppState {
3130
PeerConnected,
3231
PeerCallNegotiating = 4000,
3332
PeerCallStarted,
34-
PeerCallStopping,
35-
PeerCallStopped,
3633
PeerCallError,
37-
None,
3834
}
3935

36+
const STUN_SERVER: &'static str = "stun-server=stun://stun.l.google.com:19302 ";
37+
const RTP_CAPS_OPUS: &'static str = "application/x-rtp,media=audio,encoding-name=OPUS,payload=";
38+
const RTP_CAPS_VP8: &'static str = "application/x-rtp,media=video,encoding-name=VP8,payload=";
39+
4040
fn check_plugins() -> bool {
4141
let needed = vec![
4242
"opus",
@@ -61,15 +61,15 @@ fn check_plugins() -> bool {
6161
return ret;
6262
}
6363

64-
fn setup_call(app_state: &Arc<AtomicUsize>, out: &ws::Sender) -> AppState {
65-
app_state.store(AppState::PeerConnecting as usize, Ordering::Relaxed);
64+
fn setup_call(app_state: &Arc<Mutex<AppState>>, out: &ws::Sender) -> AppState {
65+
*app_state.lock().unwrap() = AppState::PeerConnecting;
6666
println!("Setting up signalling server call with 1");
6767
out.send("SESSION 1").unwrap();
6868
return AppState::PeerConnecting;
6969
}
7070

71-
fn register_with_server(app_state: &Arc<AtomicUsize>, out: &ws::Sender) -> AppState {
72-
app_state.store(AppState::ServerRegistering as usize, Ordering::Relaxed);
71+
fn register_with_server(app_state: &Arc<Mutex<AppState>>, out: &ws::Sender) -> AppState {
72+
*app_state.lock().unwrap() = AppState::ServerRegistering;
7373
let our_id = rand::thread_rng().gen_range(10, 10_000);
7474
println!("Registering id {} with server", our_id);
7575
out.send(format!("HELLO {}", our_id)).unwrap();
@@ -85,11 +85,11 @@ fn sdp_message_as_text(offer: gstreamer_webrtc::WebRTCSessionDescription) -> Opt
8585
}
8686

8787
fn send_sdp_offer(
88-
app_state: &Arc<AtomicUsize>,
88+
app_state: &Arc<Mutex<AppState>>,
8989
offer: gstreamer_webrtc::WebRTCSessionDescription,
9090
out: &ws::Sender,
9191
) {
92-
if app_state.load(Ordering::Relaxed) < (AppState::PeerCallNegotiating as usize) {
92+
if *app_state.lock().unwrap() < AppState::PeerCallNegotiating {
9393
// TODO signal and cleanup
9494
panic!("Can't send offer, not in call");
9595
};
@@ -103,39 +103,32 @@ fn send_sdp_offer(
103103
}
104104

105105
fn on_offer_created(
106-
app_state: &Arc<AtomicUsize>,
106+
app_state: &Arc<Mutex<AppState>>,
107107
webrtc: gst::Element,
108108
promise: &gst::Promise,
109109
out: &ws::Sender,
110110
) {
111-
assert_eq!(
112-
app_state.load(Ordering::Relaxed),
113-
AppState::PeerCallNegotiating as usize
114-
);
111+
assert_eq!(*app_state.lock().unwrap(), AppState::PeerCallNegotiating);
115112
let reply = promise.get_reply().unwrap();
116113

117114
let offer = reply
118115
.get_value("offer")
119116
.unwrap()
120117
.get::<gstreamer_webrtc::WebRTCSessionDescription>()
121118
.expect("Invalid argument");
122-
let promise = gst::Promise::new();
123119
webrtc
124-
.emit(
125-
"set-local-description",
126-
&[&offer.to_value(), &promise.to_value()],
127-
)
120+
.emit("set-local-description", &[&offer, &None::<gst::Promise>])
128121
.unwrap();
122+
129123
send_sdp_offer(app_state, offer, out)
130124
}
131125

132126
fn on_negotiation_needed(
133-
app_state: &Arc<AtomicUsize>,
127+
app_state: &Arc<Mutex<AppState>>,
134128
values: &[glib::Value],
135129
out: &ws::Sender,
136130
) -> Option<glib::Value> {
137-
app_state.store(AppState::PeerCallNegotiating as usize, Ordering::Relaxed);
138-
131+
*app_state.lock().unwrap() = AppState::PeerCallNegotiating;
139132
let webrtc = values[0].get::<gst::Element>().expect("Invalid argument");
140133
let webrtc_clone = webrtc.clone();
141134
let out_clone = out.clone();
@@ -145,7 +138,7 @@ fn on_negotiation_needed(
145138
});
146139
let options = gst::Structure::new_empty("options");
147140
webrtc_clone
148-
.emit("create-offer", &[&options.to_value(), &promise.to_value()])
141+
.emit("create-offer", &[&options, &promise])
149142
.unwrap();
150143
None
151144
}
@@ -159,24 +152,18 @@ fn handle_media_stream(pad: &gst::Pad, pipe: &gst::Element, convert_name: &str,
159152
let conv = gst::ElementFactory::make(convert_name, None).unwrap();
160153
let sink = gst::ElementFactory::make(sink_name, None).unwrap();
161154

155+
let pipe_bin = pipe.clone().dynamic_cast::<gst::Bin>().unwrap();
156+
162157
if convert_name == "audioconvert" {
163158
let resample = gst::ElementFactory::make("audioresample", None).unwrap();
164-
pipe.clone()
165-
.dynamic_cast::<gst::Bin>()
166-
.unwrap()
167-
.add_many(&[&q, &conv, &resample, &sink])
168-
.unwrap();
159+
pipe_bin.add_many(&[&q, &conv, &resample, &sink]).unwrap();
169160
q.sync_state_with_parent().unwrap();
170161
conv.sync_state_with_parent().unwrap();
171162
resample.sync_state_with_parent().unwrap();
172163
sink.sync_state_with_parent().unwrap();
173164
gst::Element::link_many(&[&q, &conv, &resample, &sink]).unwrap();
174165
} else {
175-
pipe.clone()
176-
.dynamic_cast::<gst::Bin>()
177-
.unwrap()
178-
.add_many(&[&q, &conv, &sink])
179-
.unwrap();
166+
pipe_bin.add_many(&[&q, &conv, &sink]).unwrap();
180167
q.sync_state_with_parent().unwrap();
181168
conv.sync_state_with_parent().unwrap();
182169
sink.sync_state_with_parent().unwrap();
@@ -228,11 +215,11 @@ fn on_incoming_stream(values: &[glib::Value], pipe: &gst::Element) -> Option<gli
228215
}
229216

230217
fn send_ice_candidate_message(
231-
app_state: &Arc<AtomicUsize>,
218+
app_state: &Arc<Mutex<AppState>>,
232219
values: &[glib::Value],
233220
out: &ws::Sender,
234221
) -> Option<glib::Value> {
235-
if app_state.load(Ordering::Relaxed) < (AppState::PeerCallNegotiating as usize) {
222+
if *app_state.lock().unwrap() < AppState::PeerCallNegotiating {
236223
panic!("Can't send ICE, not in call");
237224
}
238225

@@ -249,18 +236,16 @@ fn send_ice_candidate_message(
249236
None
250237
}
251238

252-
fn start_pipeline(app_state: &Arc<AtomicUsize>, out: &ws::Sender) -> gst::Element {
253-
let pipe = gst::parse_launch(
254-
"webrtcbin name=sendrecv
255-
stun-server=stun://stun.l.google.com:19302
239+
fn start_pipeline(app_state: &Arc<Mutex<AppState>>, out: &ws::Sender) -> gst::Element {
240+
let pipe = gst::parse_launch(&format!(
241+
"webrtcbin name=sendrecv {}
256242
videotestsrc pattern=ball ! videoconvert ! queue ! vp8enc deadline=1 ! rtpvp8pay !
257-
queue !
258-
application/x-rtp,media=video,encoding-name=VP8,payload=96 ! sendrecv.
243+
queue ! {}96 ! sendrecv.
259244
audiotestsrc wave=red-noise ! audioconvert ! audioresample ! queue ! opusenc ! rtpopuspay !
260-
queue !
261-
application/x-rtp,media=audio,encoding-name=OPUS,payload=97 ! sendrecv.
245+
queue ! {}97 ! sendrecv.
262246
",
263-
).unwrap();
247+
STUN_SERVER, RTP_CAPS_VP8, RTP_CAPS_OPUS
248+
)).unwrap();
264249

265250
let webrtc = pipe.clone()
266251
.dynamic_cast::<gst::Bin>()
@@ -292,21 +277,20 @@ fn start_pipeline(app_state: &Arc<AtomicUsize>, out: &ws::Sender) -> gst::Elemen
292277
})
293278
.unwrap();
294279

295-
let ret = pipe.set_state(gst::State::Playing);
296-
assert_ne!(ret, gst::StateChangeReturn::Failure);
280+
pipe.set_state(gst::State::Playing).into_result().unwrap();
297281

298282
return webrtc;
299283
}
300284

301285
struct WsClient {
302286
out: ws::Sender,
303287
webrtc: Option<gst::Element>,
304-
app_state: Arc<AtomicUsize>,
288+
app_state: Arc<Mutex<AppState>>,
305289
}
306290

307291
impl WsClient {
308292
fn update_state(&self, state: AppState) {
309-
self.app_state.store(state as usize, Ordering::Relaxed);
293+
*self.app_state.lock().unwrap() = state
310294
}
311295
}
312296

@@ -322,7 +306,7 @@ impl ws::Handler for WsClient {
322306
// Close the connection when we get a response from the server
323307
let msg_text = msg.into_text().unwrap();
324308
if msg_text == "HELLO" {
325-
if self.app_state.load(Ordering::Relaxed) != (AppState::ServerRegistering as usize) {
309+
if *self.app_state.lock().unwrap() != AppState::ServerRegistering {
326310
// TODO: signal and cleanup
327311
panic!("ERROR: Received HELLO when not registering");
328312
}
@@ -331,7 +315,7 @@ impl ws::Handler for WsClient {
331315
return Ok(());
332316
}
333317
if msg_text == "SESSION_OK" {
334-
if self.app_state.load(Ordering::Relaxed) != (AppState::PeerConnecting as usize) {
318+
if *self.app_state.lock().unwrap() != AppState::PeerConnecting {
335319
// TODO: signal and cleanup
336320
panic!("ERROR: Received SESSION_OK when not calling");
337321
}
@@ -342,12 +326,12 @@ impl ws::Handler for WsClient {
342326

343327
if msg_text.starts_with("ERROR") {
344328
println!("Got error message! {}", msg_text);
345-
let error = match self.app_state.load(Ordering::Relaxed) {
346-
x if x == AppState::ServerConnecting as usize => AppState::ServerConnectionError,
347-
x if x == AppState::ServerRegistering as usize => AppState::ServerRegisteringError,
348-
x if x == AppState::PeerConnecting as usize => AppState::PeerConnectionError,
349-
x if x == AppState::PeerConnected as usize => AppState::PeerCallError,
350-
x if x == AppState::PeerCallNegotiating as usize => AppState::PeerCallError,
329+
let error = match *self.app_state.lock().unwrap() {
330+
AppState::ServerConnecting => AppState::ServerConnectionError,
331+
AppState::ServerRegistering => AppState::ServerRegisteringError,
332+
AppState::PeerConnecting => AppState::PeerConnectionError,
333+
AppState::PeerConnected => AppState::PeerCallError,
334+
AppState::PeerCallNegotiating => AppState::PeerCallError,
351335
_ => AppState::AppStateErr,
352336
};
353337
panic!("Got websocket error {:?}", error);
@@ -357,8 +341,8 @@ impl ws::Handler for WsClient {
357341
let json_msg: serde_json::Value = serde_json::from_str(&msg_text).unwrap();
358342
if json_msg.get("sdp").is_some() {
359343
assert_eq!(
360-
self.app_state.load(Ordering::Relaxed),
361-
(AppState::PeerCallNegotiating as usize)
344+
*self.app_state.lock().unwrap(),
345+
AppState::PeerCallNegotiating
362346
);
363347

364348
if !json_msg["sdp"].get("type").is_some() {
@@ -380,10 +364,7 @@ impl ws::Handler for WsClient {
380364
self.webrtc
381365
.as_ref()
382366
.unwrap()
383-
.emit(
384-
"set-remote-description",
385-
&[&answer.to_value(), &promise.to_value()],
386-
)
367+
.emit("set-remote-description", &[&answer, &promise])
387368
.unwrap();
388369
self.update_state(AppState::PeerCallStarted);
389370
}
@@ -393,10 +374,7 @@ impl ws::Handler for WsClient {
393374
self.webrtc
394375
.as_ref()
395376
.unwrap()
396-
.emit(
397-
"add-ice-candidate",
398-
&[&sdpmlineindex.to_value(), &candidate.to_value()],
399-
)
377+
.emit("add-ice-candidate", &[&sdpmlineindex, &candidate])
400378
.unwrap();
401379
}
402380

@@ -408,7 +386,7 @@ impl ws::Handler for WsClient {
408386
fn connect_to_websocket_server_async() {
409387
ws::connect("ws://signalling:8443", |out| WsClient {
410388
out: out,
411-
app_state: Arc::new(AtomicUsize::new(AppState::ServerConnecting as usize)),
389+
app_state: Arc::new(Mutex::new(AppState::ServerConnecting)),
412390
webrtc: None,
413391
}).unwrap();
414392
}

0 commit comments

Comments
 (0)