Skip to content
This repository was archived by the owner on Apr 25, 2023. It is now read-only.

Commit 656ec1b

Browse files
committed
Implement on_incoming_stream, on_incoming_decodebin_stream, handle_media_stream
1 parent ab61ee6 commit 656ec1b

File tree

1 file changed

+94
-17
lines changed

1 file changed

+94
-17
lines changed

sendrecv/gst-rust/src/main.rs

Lines changed: 94 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,12 @@ extern crate serde_json;
1010
use glib::translate::*;
1111
use gst::prelude::*;
1212
use gst::{BinExt, ElementExt};
13-
use std::mem;
1413
use std::sync::atomic::{AtomicUsize, Ordering};
1514
use std::sync::Arc;
16-
// use std::io;
1715

1816
#[derive(PartialEq, Eq, Debug)]
1917
enum AppState {
20-
AppStateUnknown = 0,
18+
// AppStateUnknown = 0,
2119
AppStateErr = 1,
2220
ServerConnecting = 1000,
2321
ServerConnectionError,
@@ -37,12 +35,6 @@ enum AppState {
3735
None,
3836
}
3937

40-
struct WsClient {
41-
out: ws::Sender,
42-
webrtc: Option<gst::Element>,
43-
app_state: Arc<AtomicUsize>,
44-
}
45-
4638
fn check_plugins() -> bool {
4739
let needed = vec![
4840
"opus",
@@ -159,6 +151,85 @@ fn on_negotiation_needed(
159151
None
160152
}
161153

154+
fn handle_media_stream(pad: &gst::Pad, pipe: &gst::Element, convert_name: &str, sink_name: &str) {
155+
println!(
156+
"Trying to handle stream with {} ! {}",
157+
convert_name, sink_name,
158+
);
159+
let q = gst::ElementFactory::make("queue", None).unwrap();
160+
let conv = gst::ElementFactory::make(convert_name, None).unwrap();
161+
let sink = gst::ElementFactory::make(sink_name, None).unwrap();
162+
163+
if convert_name == "audioconvert" {
164+
let resample = gst::ElementFactory::make("audioresample", None).unwrap();
165+
pipe.clone()
166+
.dynamic_cast::<gst::Bin>()
167+
.unwrap()
168+
.add_many(&[&q, &conv, &resample, &sink])
169+
.unwrap();
170+
q.sync_state_with_parent().unwrap();
171+
conv.sync_state_with_parent().unwrap();
172+
resample.sync_state_with_parent().unwrap();
173+
sink.sync_state_with_parent().unwrap();
174+
gst::Element::link_many(&[&q, &conv, &resample, &sink]).unwrap();
175+
} else {
176+
pipe.clone()
177+
.dynamic_cast::<gst::Bin>()
178+
.unwrap()
179+
.add_many(&[&q, &conv, &sink])
180+
.unwrap();
181+
q.sync_state_with_parent().unwrap();
182+
conv.sync_state_with_parent().unwrap();
183+
sink.sync_state_with_parent().unwrap();
184+
gst::Element::link_many(&[&q, &conv, &sink]).unwrap();
185+
}
186+
let qpad = q.get_static_pad("sink").unwrap();
187+
let ret = pad.link(&qpad);
188+
assert_eq!(ret, gst::PadLinkReturn::Ok);
189+
}
190+
191+
fn on_incoming_decodebin_stream(
192+
values: &[glib::Value],
193+
pipe: &gst::Element,
194+
) -> Option<glib::Value> {
195+
let pad = values[1].get::<gst::Pad>().expect("Invalid argument");
196+
if !pad.has_current_caps() {
197+
println!("Pad {:?} has no caps, can't do anything, ignoring", pad);
198+
}
199+
200+
let caps = pad.get_current_caps().unwrap();
201+
let name = caps.get_structure(0).unwrap().get_name();
202+
println!("CAPS NAME {:?}", name);
203+
if name.starts_with("video") {
204+
handle_media_stream(&pad, &pipe, "videoconvert", "autovideosink");
205+
} else if name.starts_with("audio") {
206+
handle_media_stream(&pad, &pipe, "audioconvert", "autoaudiosink");
207+
} else {
208+
println!("Unknown pad {:?}, ignoring", pad);
209+
}
210+
None
211+
}
212+
213+
fn on_incoming_stream(values: &[glib::Value], pipeline: &gst::Element) -> Option<glib::Value> {
214+
let webrtc = values[0].get::<gst::Element>().expect("Invalid argument");
215+
let decodebin = gst::ElementFactory::make("decodebin", None).unwrap();
216+
let pipeline_clone = pipeline.clone();
217+
decodebin
218+
.connect("pad-added", false, move |values| {
219+
on_incoming_decodebin_stream(values, &pipeline_clone)
220+
})
221+
.unwrap();
222+
pipeline
223+
.clone()
224+
.dynamic_cast::<gst::Bin>()
225+
.unwrap()
226+
.add(&decodebin)
227+
.unwrap();
228+
decodebin.sync_state_with_parent().unwrap();
229+
webrtc.link(&decodebin).unwrap();
230+
None
231+
}
232+
162233
fn send_ice_candidate_message(
163234
app_state: &Arc<AtomicUsize>,
164235
values: &[glib::Value],
@@ -221,13 +292,25 @@ fn start_pipeline(app_state: &Arc<AtomicUsize>, out: &ws::Sender) -> gst::Elemen
221292
})
222293
.unwrap();
223294

295+
let pipeline_clone = pipeline.clone();
296+
webrtc
297+
.connect("pad-added", false, move |values| {
298+
on_incoming_stream(values, &pipeline_clone)
299+
})
300+
.unwrap();
224301
// TODO pad-added
225302
let ret = pipeline.set_state(gst::State::Playing);
226303
assert_ne!(ret, gst::StateChangeReturn::Failure);
227304

228305
return webrtc;
229306
}
230307

308+
struct WsClient {
309+
out: ws::Sender,
310+
webrtc: Option<gst::Element>,
311+
app_state: Arc<AtomicUsize>,
312+
}
313+
231314
impl WsClient {
232315
fn update_state(&self, state: AppState) {
233316
self.app_state.store(state as usize, Ordering::Relaxed);
@@ -338,17 +421,11 @@ impl ws::Handler for WsClient {
338421
}
339422

340423
fn connect_to_websocket_server_async() {
341-
let result = ws::connect("ws://signalling:8443", |out| WsClient {
424+
ws::connect("ws://signalling:8443", |out| WsClient {
342425
out: out,
343426
app_state: Arc::new(AtomicUsize::new(AppState::ServerConnecting as usize)),
344427
webrtc: None,
345-
});
346-
match result {
347-
Ok(_) => {
348-
println!("Connected to");
349-
}
350-
Err(error) => panic!("There was a problem opening the file: {:?}", error),
351-
};
428+
}).unwrap();
352429
}
353430

354431
fn main() {

0 commit comments

Comments
 (0)