Skip to content

Commit

Permalink
use stream event in test
Browse files Browse the repository at this point in the history
  • Loading branch information
youyuanwu committed Feb 14, 2025
1 parent 50a2ea7 commit 19fc0bd
Show file tree
Hide file tree
Showing 3 changed files with 199 additions and 274 deletions.
221 changes: 72 additions & 149 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ mod error;
pub mod ffi;
pub use error::{Status, StatusCode};
mod types;
pub use types::ConnectionEvent;
mod types2;
pub use types2::StreamEvent2;
pub use types::{ConnectionEvent, StreamEvent};

//
// The following starts the C interop layer of MsQuic API.
Expand Down Expand Up @@ -806,126 +804,6 @@ pub struct ListenerEvent {
pub payload: ListenerEventPayload,
}

pub type StreamEventType = u32;
pub const STREAM_EVENT_START_COMPLETE: StreamEventType = 0;
pub const STREAM_EVENT_RECEIVE: StreamEventType = 1;
pub const STREAM_EVENT_SEND_COMPLETE: StreamEventType = 2;
pub const STREAM_EVENT_PEER_SEND_SHUTDOWN: StreamEventType = 3;
pub const STREAM_EVENT_PEER_SEND_ABORTED: StreamEventType = 4;
pub const STREAM_EVENT_PEER_RECEIVE_ABORTED: StreamEventType = 5;
pub const STREAM_EVENT_SEND_SHUTDOWN_COMPLETE: StreamEventType = 6;
pub const STREAM_EVENT_SHUTDOWN_COMPLETE: StreamEventType = 7;
pub const STREAM_EVENT_IDEAL_SEND_BUFFER_SIZE: StreamEventType = 8;
pub const STREAM_EVENT_PEER_ACCEPTED: StreamEventType = 9;
pub const STREAM_EVENT_CANCEL_ON_LOSS: StreamEventType = 10;

bitfield! {
#[repr(C)]
#[derive(Debug, Clone, Copy)]
pub struct StreamEventStartCompleteBitfields(u8);
// The fields default to u8
pub peer_accepted, _: 0, 0;
_reserved, _: 7, 1;
}

#[repr(C)]
#[derive(Debug, Copy, Clone)]
pub struct StreamEventStartComplete {
pub status: u32,
pub id: u62,
pub bit_flags: StreamEventStartCompleteBitfields,
}

#[repr(C)]
#[derive(Debug, Copy, Clone)]
pub struct StreamEventReceive {
pub absolute_offset: u64,
pub total_buffer_length: u64,
pub buffer: *const Buffer,
pub buffer_count: u32,
pub flags: ReceiveFlags,
}

#[repr(C)]
#[derive(Debug, Copy, Clone)]
pub struct StreamEventSendComplete {
pub canceled: bool,
pub client_context: *const c_void,
}

#[repr(C)]
#[derive(Debug, Copy, Clone)]
pub struct StreamEventPeerSendAborted {
pub error_code: u62,
}

#[repr(C)]
#[derive(Debug, Copy, Clone)]
pub struct StreamEventPeerReceiveAborted {
pub error_code: u62,
}

#[repr(C)]
#[derive(Debug, Copy, Clone)]
pub struct StreamEventSendShutdownComplete {
pub graceful: bool,
}

bitfield! {
#[repr(C)]
#[derive(Clone, Copy)]
pub struct StreamEventShutdownCompleteBitfields(u8);
// The fields default to u8
pub app_close_in_progress, _: 0, 0;
pub conn_shutdown_by_app, _: 1, 1;
pub conn_closed_remotely, _: 2, 2;
_reserved, _: 7, 3;
}

#[repr(C)]
#[derive(Copy, Clone)]
pub struct StreamEventShutdownComplete {
pub connection_shutdown: bool,
pub bit_flags: StreamEventShutdownCompleteBitfields,
pub connection_error_code: u62,
pub connection_close_status: u32,
}

#[repr(C)]
#[derive(Debug, Copy, Clone)]
pub struct StreamEventIdealSendBufferSize {
pub byte_count: u64,
}

#[repr(C)]
#[derive(Debug, Copy, Clone)]
pub struct StreamEventCancelOnLoss {
pub error_code: u62,
}

#[repr(C)]
#[derive(Copy, Clone)]
pub union StreamEventPayload {
pub start_complete: StreamEventStartComplete,
pub receive: StreamEventReceive,
pub send_complete: StreamEventSendComplete,
pub peer_send_aborted: StreamEventPeerSendAborted,
pub peer_receive_aborted: StreamEventPeerReceiveAborted,
pub send_shutdown_complete: StreamEventSendShutdownComplete,
pub shutdown_complete: StreamEventShutdownComplete,
pub ideal_send_buffer_size: StreamEventIdealSendBufferSize,
pub cancel_on_loss: StreamEventCancelOnLoss,
}

#[repr(C)]
pub struct StreamEvent {
pub event_type: StreamEventType,
pub payload: StreamEventPayload,
}

pub type StreamEventHandler =
extern "C" fn(stream: HQUIC, context: *mut c_void, event: &StreamEvent) -> u32;

#[link(name = "msquic")]
unsafe extern "C" {
unsafe fn MsQuicOpenVersion(version: u32, api: *mut *const QUIC_API_TABLE) -> u32;
Expand Down Expand Up @@ -1594,7 +1472,7 @@ impl Stream {
&mut self,
connection: &Connection,
flags: StreamOpenFlags,
handler: StreamEventHandler,
handler: ffi::QUIC_STREAM_CALLBACK_HANDLER,
context: *const c_void,
) -> Result<(), Status> {
// TODO: remove transmute.
Expand All @@ -1603,7 +1481,7 @@ impl Stream {
Api::ffi_ref().StreamOpen.unwrap()(
connection.handle,
flags as crate::ffi::QuicFlag,
Some(std::mem::transmute(handler)),
handler,
context as *mut c_void,
std::ptr::addr_of_mut!(self.handle),
)
Expand Down Expand Up @@ -1658,8 +1536,18 @@ impl Stream {

/// # Safety
/// handler and context must be valid.
pub unsafe fn set_callback_handler(&self, handler: StreamEventHandler, context: *const c_void) {
unsafe { Api::set_callback_handler(self.handle, handler as *const c_void, context) };
pub unsafe fn set_callback_handler(
&self,
handler: ffi::QUIC_STREAM_CALLBACK_HANDLER,
context: *const c_void,
) {
unsafe {
Api::set_callback_handler(
self.handle,
std::mem::transmute::<ffi::QUIC_STREAM_CALLBACK_HANDLER, *const c_void>(handler),
context,
)
};
}

pub fn receive_complete(&self, buffer_length: u64) {
Expand Down Expand Up @@ -1747,7 +1635,7 @@ mod tests {
}
ConnectionEvent::PeerStreamStarted { stream, flags } => {
println!("Peer stream started: flags: {flags}");
unsafe { stream.set_callback_handler(test_stream_callback, context) };
unsafe { stream.set_callback_handler(Some(test_stream_callback), context) };
}
ConnectionEvent::StreamsAvailable {
bidirectional_count,
Expand All @@ -1768,32 +1656,67 @@ mod tests {
extern "C" fn test_stream_callback(
stream: HQUIC,
_context: *mut c_void,
event: &StreamEvent,
) -> u32 {
match event.event_type {
crate::STREAM_EVENT_START_COMPLETE => {
println!("Stream start complete 0x{:x}", unsafe {
event.payload.start_complete.status
})
event: *mut ffi::QUIC_STREAM_EVENT,
) -> QUIC_STATUS {
let event_ref = unsafe { event.as_mut().unwrap() };
let event = StreamEvent::from(event_ref);
match event {
StreamEvent::StartComplete {
status,
id,
peer_accepted,
} => {
println!("Stream start complete: {status}, {id}, {peer_accepted}");
}
StreamEvent::Receive {
absolute_offset,
total_buffer_length,
buffers: _,
flags: _,
} => {
println!("Stream receive: {absolute_offset}, {total_buffer_length}");
}
StreamEvent::SendComplete {
cancelled,
client_context: _,
} => {
println!("Stream send complete: {cancelled}");
}
crate::STREAM_EVENT_RECEIVE => println!("Receive {} bytes", unsafe {
event.payload.receive.total_buffer_length
}),
crate::STREAM_EVENT_SEND_COMPLETE => println!("Send complete"),
crate::STREAM_EVENT_PEER_SEND_SHUTDOWN => println!("Peer send shutdown"),
crate::STREAM_EVENT_PEER_SEND_ABORTED => println!("Peer send aborted"),
crate::STREAM_EVENT_PEER_RECEIVE_ABORTED => println!("Peer receive aborted"),
crate::STREAM_EVENT_SEND_SHUTDOWN_COMPLETE => println!("Peer receive aborted"),
crate::STREAM_EVENT_SHUTDOWN_COMPLETE => {
println!("Stream shutdown complete");
StreamEvent::PeerSendShutdown => {
println!("Stream peer send shutdown");
}
StreamEvent::PeerSendAborted { error_code } => {
println!("Stream peer send abort: {error_code}");
}
StreamEvent::PeerReceiveAborted { error_code } => {
println!("Stream peer receive aborted: {error_code}");
}
StreamEvent::SendShutdownComplete { graceful } => {
println!("Stream send shutdown complete: {graceful}");
}
StreamEvent::ShutdownComplete {
connection_shutdown,
app_close_in_progress,
connection_shutdown_by_app,
connection_closed_remotely,
connection_error_code,
connection_close_status,
} => {
println!("Stream shutdown complete: {connection_shutdown} {app_close_in_progress} {connection_shutdown_by_app} {connection_closed_remotely} {connection_error_code} {connection_close_status}");
// Attach to stream for auto close handle.
unsafe { Stream::from_raw(stream) };
}
crate::STREAM_EVENT_IDEAL_SEND_BUFFER_SIZE => println!("Ideal send buffer size"),
crate::STREAM_EVENT_PEER_ACCEPTED => println!("Peer accepted"),
_ => println!("Other callback {}", event.event_type),
StreamEvent::IdealSendBufferSize { byte_count } => {
println!("Stream ideal send buffer size: {byte_count}");
}
StreamEvent::PeerAccepted => {
println!("Stream peer accepted.");
}
StreamEvent::CancelOnLoss { error_code } => {
println!("Stream cancel on loss: {error_code}");
}
}
0
StatusCode::QUIC_STATUS_SUCCESS.into()
}

#[test]
Expand Down
Loading

0 comments on commit 19fc0bd

Please sign in to comment.