From 19fc0bd50bd8d7ea94a20dfb11d2b543158910fd Mon Sep 17 00:00:00 2001 From: Youyuan Wu Date: Thu, 13 Feb 2025 22:18:43 -0800 Subject: [PATCH] use stream event in test --- src/lib.rs | 221 ++++++++++++++++---------------------------------- src/types.rs | 127 +++++++++++++++++++++++++++++ src/types2.rs | 125 ---------------------------- 3 files changed, 199 insertions(+), 274 deletions(-) delete mode 100644 src/types2.rs diff --git a/src/lib.rs b/src/lib.rs index 462bafa7eb..c5357d0376 100755 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,9 +27,7 @@ mod error; pub mod ffi; pub use error::{Status, StatusCode}; mod types; -pub use types::ConnectionEvent; -mod types2; -pub use types2::StreamEvent2; +pub use types::{ConnectionEvent, StreamEvent}; // // The following starts the C interop layer of MsQuic API. @@ -806,126 +804,6 @@ pub struct ListenerEvent { pub payload: ListenerEventPayload, } -pub type StreamEventType = u32; -pub const STREAM_EVENT_START_COMPLETE: StreamEventType = 0; -pub const STREAM_EVENT_RECEIVE: StreamEventType = 1; -pub const STREAM_EVENT_SEND_COMPLETE: StreamEventType = 2; -pub const STREAM_EVENT_PEER_SEND_SHUTDOWN: StreamEventType = 3; -pub const STREAM_EVENT_PEER_SEND_ABORTED: StreamEventType = 4; -pub const STREAM_EVENT_PEER_RECEIVE_ABORTED: StreamEventType = 5; -pub const STREAM_EVENT_SEND_SHUTDOWN_COMPLETE: StreamEventType = 6; -pub const STREAM_EVENT_SHUTDOWN_COMPLETE: StreamEventType = 7; -pub const STREAM_EVENT_IDEAL_SEND_BUFFER_SIZE: StreamEventType = 8; -pub const STREAM_EVENT_PEER_ACCEPTED: StreamEventType = 9; -pub const STREAM_EVENT_CANCEL_ON_LOSS: StreamEventType = 10; - -bitfield! { - #[repr(C)] - #[derive(Debug, Clone, Copy)] - pub struct StreamEventStartCompleteBitfields(u8); - // The fields default to u8 - pub peer_accepted, _: 0, 0; - _reserved, _: 7, 1; -} - -#[repr(C)] -#[derive(Debug, Copy, Clone)] -pub struct StreamEventStartComplete { - pub status: u32, - pub id: u62, - pub bit_flags: StreamEventStartCompleteBitfields, -} - -#[repr(C)] -#[derive(Debug, Copy, Clone)] -pub struct StreamEventReceive { - pub absolute_offset: u64, - pub total_buffer_length: u64, - pub buffer: *const Buffer, - pub buffer_count: u32, - pub flags: ReceiveFlags, -} - -#[repr(C)] -#[derive(Debug, Copy, Clone)] -pub struct StreamEventSendComplete { - pub canceled: bool, - pub client_context: *const c_void, -} - -#[repr(C)] -#[derive(Debug, Copy, Clone)] -pub struct StreamEventPeerSendAborted { - pub error_code: u62, -} - -#[repr(C)] -#[derive(Debug, Copy, Clone)] -pub struct StreamEventPeerReceiveAborted { - pub error_code: u62, -} - -#[repr(C)] -#[derive(Debug, Copy, Clone)] -pub struct StreamEventSendShutdownComplete { - pub graceful: bool, -} - -bitfield! { - #[repr(C)] - #[derive(Clone, Copy)] - pub struct StreamEventShutdownCompleteBitfields(u8); - // The fields default to u8 - pub app_close_in_progress, _: 0, 0; - pub conn_shutdown_by_app, _: 1, 1; - pub conn_closed_remotely, _: 2, 2; - _reserved, _: 7, 3; -} - -#[repr(C)] -#[derive(Copy, Clone)] -pub struct StreamEventShutdownComplete { - pub connection_shutdown: bool, - pub bit_flags: StreamEventShutdownCompleteBitfields, - pub connection_error_code: u62, - pub connection_close_status: u32, -} - -#[repr(C)] -#[derive(Debug, Copy, Clone)] -pub struct StreamEventIdealSendBufferSize { - pub byte_count: u64, -} - -#[repr(C)] -#[derive(Debug, Copy, Clone)] -pub struct StreamEventCancelOnLoss { - pub error_code: u62, -} - -#[repr(C)] -#[derive(Copy, Clone)] -pub union StreamEventPayload { - pub start_complete: StreamEventStartComplete, - pub receive: StreamEventReceive, - pub send_complete: StreamEventSendComplete, - pub peer_send_aborted: StreamEventPeerSendAborted, - pub peer_receive_aborted: StreamEventPeerReceiveAborted, - pub send_shutdown_complete: StreamEventSendShutdownComplete, - pub shutdown_complete: StreamEventShutdownComplete, - pub ideal_send_buffer_size: StreamEventIdealSendBufferSize, - pub cancel_on_loss: StreamEventCancelOnLoss, -} - -#[repr(C)] -pub struct StreamEvent { - pub event_type: StreamEventType, - pub payload: StreamEventPayload, -} - -pub type StreamEventHandler = - extern "C" fn(stream: HQUIC, context: *mut c_void, event: &StreamEvent) -> u32; - #[link(name = "msquic")] unsafe extern "C" { unsafe fn MsQuicOpenVersion(version: u32, api: *mut *const QUIC_API_TABLE) -> u32; @@ -1594,7 +1472,7 @@ impl Stream { &mut self, connection: &Connection, flags: StreamOpenFlags, - handler: StreamEventHandler, + handler: ffi::QUIC_STREAM_CALLBACK_HANDLER, context: *const c_void, ) -> Result<(), Status> { // TODO: remove transmute. @@ -1603,7 +1481,7 @@ impl Stream { Api::ffi_ref().StreamOpen.unwrap()( connection.handle, flags as crate::ffi::QuicFlag, - Some(std::mem::transmute(handler)), + handler, context as *mut c_void, std::ptr::addr_of_mut!(self.handle), ) @@ -1658,8 +1536,18 @@ impl Stream { /// # Safety /// handler and context must be valid. - pub unsafe fn set_callback_handler(&self, handler: StreamEventHandler, context: *const c_void) { - unsafe { Api::set_callback_handler(self.handle, handler as *const c_void, context) }; + pub unsafe fn set_callback_handler( + &self, + handler: ffi::QUIC_STREAM_CALLBACK_HANDLER, + context: *const c_void, + ) { + unsafe { + Api::set_callback_handler( + self.handle, + std::mem::transmute::(handler), + context, + ) + }; } pub fn receive_complete(&self, buffer_length: u64) { @@ -1747,7 +1635,7 @@ mod tests { } ConnectionEvent::PeerStreamStarted { stream, flags } => { println!("Peer stream started: flags: {flags}"); - unsafe { stream.set_callback_handler(test_stream_callback, context) }; + unsafe { stream.set_callback_handler(Some(test_stream_callback), context) }; } ConnectionEvent::StreamsAvailable { bidirectional_count, @@ -1768,32 +1656,67 @@ mod tests { extern "C" fn test_stream_callback( stream: HQUIC, _context: *mut c_void, - event: &StreamEvent, - ) -> u32 { - match event.event_type { - crate::STREAM_EVENT_START_COMPLETE => { - println!("Stream start complete 0x{:x}", unsafe { - event.payload.start_complete.status - }) + event: *mut ffi::QUIC_STREAM_EVENT, + ) -> QUIC_STATUS { + let event_ref = unsafe { event.as_mut().unwrap() }; + let event = StreamEvent::from(event_ref); + match event { + StreamEvent::StartComplete { + status, + id, + peer_accepted, + } => { + println!("Stream start complete: {status}, {id}, {peer_accepted}"); + } + StreamEvent::Receive { + absolute_offset, + total_buffer_length, + buffers: _, + flags: _, + } => { + println!("Stream receive: {absolute_offset}, {total_buffer_length}"); + } + StreamEvent::SendComplete { + cancelled, + client_context: _, + } => { + println!("Stream send complete: {cancelled}"); } - crate::STREAM_EVENT_RECEIVE => println!("Receive {} bytes", unsafe { - event.payload.receive.total_buffer_length - }), - crate::STREAM_EVENT_SEND_COMPLETE => println!("Send complete"), - crate::STREAM_EVENT_PEER_SEND_SHUTDOWN => println!("Peer send shutdown"), - crate::STREAM_EVENT_PEER_SEND_ABORTED => println!("Peer send aborted"), - crate::STREAM_EVENT_PEER_RECEIVE_ABORTED => println!("Peer receive aborted"), - crate::STREAM_EVENT_SEND_SHUTDOWN_COMPLETE => println!("Peer receive aborted"), - crate::STREAM_EVENT_SHUTDOWN_COMPLETE => { - println!("Stream shutdown complete"); + StreamEvent::PeerSendShutdown => { + println!("Stream peer send shutdown"); + } + StreamEvent::PeerSendAborted { error_code } => { + println!("Stream peer send abort: {error_code}"); + } + StreamEvent::PeerReceiveAborted { error_code } => { + println!("Stream peer receive aborted: {error_code}"); + } + StreamEvent::SendShutdownComplete { graceful } => { + println!("Stream send shutdown complete: {graceful}"); + } + StreamEvent::ShutdownComplete { + connection_shutdown, + app_close_in_progress, + connection_shutdown_by_app, + connection_closed_remotely, + connection_error_code, + connection_close_status, + } => { + println!("Stream shutdown complete: {connection_shutdown} {app_close_in_progress} {connection_shutdown_by_app} {connection_closed_remotely} {connection_error_code} {connection_close_status}"); // Attach to stream for auto close handle. unsafe { Stream::from_raw(stream) }; } - crate::STREAM_EVENT_IDEAL_SEND_BUFFER_SIZE => println!("Ideal send buffer size"), - crate::STREAM_EVENT_PEER_ACCEPTED => println!("Peer accepted"), - _ => println!("Other callback {}", event.event_type), + StreamEvent::IdealSendBufferSize { byte_count } => { + println!("Stream ideal send buffer size: {byte_count}"); + } + StreamEvent::PeerAccepted => { + println!("Stream peer accepted."); + } + StreamEvent::CancelOnLoss { error_code } => { + println!("Stream cancel on loss: {error_code}"); + } } - 0 + StatusCode::QUIC_STATUS_SUCCESS.into() } #[test] diff --git a/src/types.rs b/src/types.rs index 59a3514e5b..1ae5fa7382 100644 --- a/src/types.rs +++ b/src/types.rs @@ -177,3 +177,130 @@ impl<'a> From<&'a QUIC_CONNECTION_EVENT> for ConnectionEvent<'a> { } } } + +/// Stream callback events +pub enum StreamEvent<'a> { + StartComplete { + status: crate::Status, + id: crate::u62, + peer_accepted: bool, + }, + Receive { + absolute_offset: u64, + total_buffer_length: &'a mut u64, // out parameter + buffers: &'a [crate::ffi::QUIC_BUFFER], // TODO: impl buffer wrapper types + flags: crate::ffi::QUIC_RECEIVE_FLAGS, + }, + SendComplete { + cancelled: bool, + client_context: *const std::ffi::c_void, + }, + PeerSendShutdown, + PeerSendAborted { + error_code: crate::u62, + }, + PeerReceiveAborted { + error_code: crate::u62, + }, + SendShutdownComplete { + graceful: bool, + }, + ShutdownComplete { + connection_shutdown: bool, + app_close_in_progress: bool, + connection_shutdown_by_app: bool, + connection_closed_remotely: bool, + connection_error_code: crate::u62, + connection_close_status: crate::Status, + }, + IdealSendBufferSize { + byte_count: u64, + }, + PeerAccepted, + CancelOnLoss { + error_code: &'a mut crate::u62, // out param + }, +} + +impl<'b> From<&'b mut crate::ffi::QUIC_STREAM_EVENT> for StreamEvent<'b> { + fn from(value: &'b mut crate::ffi::QUIC_STREAM_EVENT) -> Self { + match value.Type { + crate::ffi::QUIC_STREAM_EVENT_TYPE_QUIC_STREAM_EVENT_START_COMPLETE => { + let ev = unsafe { value.__bindgen_anon_1.START_COMPLETE }; + Self::StartComplete { + status: crate::Status(ev.Status), + id: ev.ID, + peer_accepted: ev.PeerAccepted() != 0, + } + } + crate::ffi::QUIC_STREAM_EVENT_TYPE_QUIC_STREAM_EVENT_RECEIVE => { + let ev = unsafe { &mut value.__bindgen_anon_1.RECEIVE }; + Self::Receive { + absolute_offset: ev.AbsoluteOffset, + total_buffer_length: &mut ev.TotalBufferLength, + buffers: unsafe { + std::slice::from_raw_parts(ev.Buffers, ev.BufferCount as usize) + }, + flags: ev.Flags, + } + } + crate::ffi::QUIC_STREAM_EVENT_TYPE_QUIC_STREAM_EVENT_SEND_COMPLETE => { + let ev = unsafe { value.__bindgen_anon_1.SEND_COMPLETE }; + Self::SendComplete { + cancelled: ev.Canceled != 0, + client_context: ev.ClientContext, + } + } + crate::ffi::QUIC_STREAM_EVENT_TYPE_QUIC_STREAM_EVENT_PEER_SEND_SHUTDOWN => { + Self::PeerSendShutdown + } + crate::ffi::QUIC_STREAM_EVENT_TYPE_QUIC_STREAM_EVENT_PEER_SEND_ABORTED => { + let ev = unsafe { value.__bindgen_anon_1.PEER_SEND_ABORTED }; + Self::PeerSendAborted { + error_code: ev.ErrorCode, + } + } + crate::ffi::QUIC_STREAM_EVENT_TYPE_QUIC_STREAM_EVENT_PEER_RECEIVE_ABORTED => { + let ev = unsafe { value.__bindgen_anon_1.PEER_RECEIVE_ABORTED }; + Self::PeerReceiveAborted { + error_code: ev.ErrorCode, + } + } + crate::ffi::QUIC_STREAM_EVENT_TYPE_QUIC_STREAM_EVENT_SEND_SHUTDOWN_COMPLETE => { + let ev = unsafe { value.__bindgen_anon_1.SEND_SHUTDOWN_COMPLETE }; + Self::SendShutdownComplete { + graceful: ev.Graceful != 0, + } + } + crate::ffi::QUIC_STREAM_EVENT_TYPE_QUIC_STREAM_EVENT_SHUTDOWN_COMPLETE => { + let ev = unsafe { value.__bindgen_anon_1.SHUTDOWN_COMPLETE }; + Self::ShutdownComplete { + connection_shutdown: ev.ConnectionShutdown != 0, + app_close_in_progress: ev.AppCloseInProgress() != 0, + connection_shutdown_by_app: ev.ConnectionShutdownByApp() != 0, + connection_closed_remotely: ev.ConnectionClosedRemotely() != 0, + connection_error_code: ev.ConnectionErrorCode, + connection_close_status: crate::Status(ev.ConnectionCloseStatus), + } + } + crate::ffi::QUIC_STREAM_EVENT_TYPE_QUIC_STREAM_EVENT_IDEAL_SEND_BUFFER_SIZE => { + let ev = unsafe { value.__bindgen_anon_1.IDEAL_SEND_BUFFER_SIZE }; + Self::IdealSendBufferSize { + byte_count: ev.ByteCount, + } + } + crate::ffi::QUIC_STREAM_EVENT_TYPE_QUIC_STREAM_EVENT_PEER_ACCEPTED => { + Self::PeerAccepted + } + crate::ffi::QUIC_STREAM_EVENT_TYPE_QUIC_STREAM_EVENT_CANCEL_ON_LOSS => { + let ev = unsafe { &mut value.__bindgen_anon_1.CANCEL_ON_LOSS }; + Self::CancelOnLoss { + error_code: &mut ev.ErrorCode, + } + } + _ => { + panic!("unknown stream event: {}", value.Type) + } + } + } +} diff --git a/src/types2.rs b/src/types2.rs deleted file mode 100644 index b246acb646..0000000000 --- a/src/types2.rs +++ /dev/null @@ -1,125 +0,0 @@ -pub enum StreamEvent2<'a> { - StartComplete { - status: crate::Status, - id: crate::u62, - peer_accepted: bool, - }, - Receive { - absolute_offset: u64, - total_buffer_length: &'a mut u64, // TODO: needs modify - buffers: &'a [crate::ffi::QUIC_BUFFER], // TODO: better buffer types - flags: crate::ffi::QUIC_RECEIVE_FLAGS, - }, - SendComplete { - cancelled: bool, - client_context: *const std::ffi::c_void, - }, - PeerSendShutdown, - PeerSendAborted { - error_code: crate::u62, - }, - PeerReceiveAborted { - error_code: crate::u62, - }, - SendShutdownComplete { - graceful: bool, - }, - ShutdownComplete { - connection_shutdown: bool, - app_close_in_progress: bool, - connection_shutdown_by_app: bool, - connection_closed_remotely: bool, - connection_error_code: crate::u62, - connection_close_status: crate::Status, - }, - IdealSendBufferSize { - byte_count: u64, - }, - PeerAccepted, - CancelOnLoss { - error_code: &'a mut crate::u62, // out param - }, -} - -impl<'b> From<&'b mut crate::ffi::QUIC_STREAM_EVENT> for StreamEvent2<'b> { - fn from(value: &'b mut crate::ffi::QUIC_STREAM_EVENT) -> Self { - match value.Type { - crate::ffi::QUIC_STREAM_EVENT_TYPE_QUIC_STREAM_EVENT_START_COMPLETE => { - let ev = unsafe { value.__bindgen_anon_1.START_COMPLETE }; - Self::StartComplete { - status: crate::Status(ev.Status), - id: ev.ID, - peer_accepted: ev.PeerAccepted() != 0, - } - } - crate::ffi::QUIC_STREAM_EVENT_TYPE_QUIC_STREAM_EVENT_RECEIVE => { - let ev = unsafe { &mut value.__bindgen_anon_1.RECEIVE }; - Self::Receive { - absolute_offset: ev.AbsoluteOffset, - total_buffer_length: &mut ev.TotalBufferLength, - buffers: unsafe { - std::slice::from_raw_parts(ev.Buffers, ev.BufferCount as usize) - }, - flags: ev.Flags, - } - } - crate::ffi::QUIC_STREAM_EVENT_TYPE_QUIC_STREAM_EVENT_SEND_COMPLETE => { - let ev = unsafe { value.__bindgen_anon_1.SEND_COMPLETE }; - Self::SendComplete { - cancelled: ev.Canceled != 0, - client_context: ev.ClientContext, - } - } - crate::ffi::QUIC_STREAM_EVENT_TYPE_QUIC_STREAM_EVENT_PEER_SEND_SHUTDOWN => { - Self::PeerSendShutdown - } - crate::ffi::QUIC_STREAM_EVENT_TYPE_QUIC_STREAM_EVENT_PEER_SEND_ABORTED => { - let ev = unsafe { value.__bindgen_anon_1.PEER_SEND_ABORTED }; - Self::PeerSendAborted { - error_code: ev.ErrorCode, - } - } - crate::ffi::QUIC_STREAM_EVENT_TYPE_QUIC_STREAM_EVENT_PEER_RECEIVE_ABORTED => { - let ev = unsafe { value.__bindgen_anon_1.PEER_RECEIVE_ABORTED }; - Self::PeerReceiveAborted { - error_code: ev.ErrorCode, - } - } - crate::ffi::QUIC_STREAM_EVENT_TYPE_QUIC_STREAM_EVENT_SEND_SHUTDOWN_COMPLETE => { - let ev = unsafe { value.__bindgen_anon_1.SEND_SHUTDOWN_COMPLETE }; - Self::SendShutdownComplete { - graceful: ev.Graceful != 0, - } - } - crate::ffi::QUIC_STREAM_EVENT_TYPE_QUIC_STREAM_EVENT_SHUTDOWN_COMPLETE => { - let ev = unsafe { value.__bindgen_anon_1.SHUTDOWN_COMPLETE }; - Self::ShutdownComplete { - connection_shutdown: ev.ConnectionShutdown != 0, - app_close_in_progress: ev.AppCloseInProgress() != 0, - connection_shutdown_by_app: ev.ConnectionShutdownByApp() != 0, - connection_closed_remotely: ev.ConnectionClosedRemotely() != 0, - connection_error_code: ev.ConnectionErrorCode, - connection_close_status: crate::Status(ev.ConnectionCloseStatus), - } - } - crate::ffi::QUIC_STREAM_EVENT_TYPE_QUIC_STREAM_EVENT_IDEAL_SEND_BUFFER_SIZE => { - let ev = unsafe { value.__bindgen_anon_1.IDEAL_SEND_BUFFER_SIZE }; - Self::IdealSendBufferSize { - byte_count: ev.ByteCount, - } - } - crate::ffi::QUIC_STREAM_EVENT_TYPE_QUIC_STREAM_EVENT_PEER_ACCEPTED => { - Self::PeerAccepted - } - crate::ffi::QUIC_STREAM_EVENT_TYPE_QUIC_STREAM_EVENT_CANCEL_ON_LOSS => { - let ev = unsafe { &mut value.__bindgen_anon_1.CANCEL_ON_LOSS }; - Self::CancelOnLoss { - error_code: &mut ev.ErrorCode, - } - } - _ => { - panic!("unknown stream event: {}", value.Type) - } - } - } -}