2
2
3
3
use anyhow:: Result ;
4
4
use dropshot:: {
5
- endpoint, ApiDescription , HttpError , HttpResponseCreated , HttpResponseOk ,
6
- HttpResponseUpdatedNoContent , Path , RequestContext , TypedBody ,
5
+ channel, endpoint, ApiDescription , HttpError , HttpResponseCreated ,
6
+ HttpResponseOk , HttpResponseUpdatedNoContent , Path , RequestContext ,
7
+ TypedBody , WebsocketConnection ,
7
8
} ;
8
9
use futures:: future:: Fuse ;
9
10
use futures:: stream:: { SplitSink , SplitStream } ;
10
11
use futures:: { FutureExt , SinkExt , StreamExt } ;
11
- use hyper:: upgrade:: { self , Upgraded } ;
12
- use hyper:: { header , Body , Response , StatusCode } ;
12
+ use hyper:: upgrade:: Upgraded ;
13
+ use hyper:: { Body , Response } ;
13
14
use propolis:: hw:: qemu:: ramfb:: RamFb ;
14
15
use rfb:: server:: VncServer ;
15
16
use slog:: { error, info, o, Logger } ;
@@ -21,7 +22,7 @@ use thiserror::Error;
21
22
use tokio:: sync:: { mpsc, oneshot, watch, MappedMutexGuard , Mutex , MutexGuard } ;
22
23
use tokio:: task:: JoinHandle ;
23
24
use tokio_tungstenite:: tungstenite:: protocol:: { Role , WebSocketConfig } ;
24
- use tokio_tungstenite:: tungstenite:: { self , handshake , Message } ;
25
+ use tokio_tungstenite:: tungstenite:: { self , Message } ;
25
26
use tokio_tungstenite:: WebSocketStream ;
26
27
27
28
use propolis:: dispatch:: AsyncCtx ;
@@ -757,74 +758,20 @@ async fn instance_serial_task(
757
758
Ok ( ( ) )
758
759
}
759
760
760
- #[ endpoint {
761
- method = GET ,
761
+ #[ channel {
762
+ protocol = WEBSOCKETS ,
762
763
path = "/instance/serial" ,
763
764
} ]
764
765
async fn instance_serial (
765
766
rqctx : Arc < RequestContext < DropshotEndpointContext > > ,
766
- ) -> Result < Response < Body > , HttpError > {
767
+ websock : WebsocketConnection ,
768
+ ) -> dropshot:: WebsocketChannelResult {
767
769
let mut inst = rqctx. context ( ) . instance ( ) . await ?;
768
770
769
771
let serial = inst. serial . clone ( ) ;
770
- let request = & mut * rqctx. request . lock ( ) . await ;
771
-
772
- if !request
773
- . headers ( )
774
- . get ( header:: CONNECTION )
775
- . and_then ( |hv| hv. to_str ( ) . ok ( ) )
776
- . map ( |hv| {
777
- hv. split ( |c| c == ',' || c == ' ' )
778
- . any ( |vs| vs. eq_ignore_ascii_case ( "upgrade" ) )
779
- } )
780
- . unwrap_or ( false )
781
- {
782
- return Err ( HttpError :: for_bad_request (
783
- None ,
784
- "expected connection upgrade" . to_string ( ) ,
785
- ) ) ;
786
- }
787
- if !request
788
- . headers ( )
789
- . get ( header:: UPGRADE )
790
- . and_then ( |v| v. to_str ( ) . ok ( ) )
791
- . map ( |v| {
792
- v. split ( |c| c == ',' || c == ' ' )
793
- . any ( |v| v. eq_ignore_ascii_case ( "websocket" ) )
794
- } )
795
- . unwrap_or ( false )
796
- {
797
- return Err ( HttpError :: for_bad_request (
798
- None ,
799
- "unexpected protocol for upgrade" . to_string ( ) ,
800
- ) ) ;
801
- }
802
- if request
803
- . headers ( )
804
- . get ( header:: SEC_WEBSOCKET_VERSION )
805
- . map ( |v| v. as_bytes ( ) )
806
- != Some ( b"13" )
807
- {
808
- return Err ( HttpError :: for_bad_request (
809
- None ,
810
- "missing or invalid websocket version" . to_string ( ) ,
811
- ) ) ;
812
- }
813
- let accept_key = request
814
- . headers ( )
815
- . get ( header:: SEC_WEBSOCKET_KEY )
816
- . map ( |hv| hv. as_bytes ( ) )
817
- . map ( |key| handshake:: derive_accept_key ( key) )
818
- . ok_or_else ( || {
819
- HttpError :: for_bad_request (
820
- None ,
821
- "missing websocket key" . to_string ( ) ,
822
- )
823
- } ) ?;
824
772
825
773
let actx = inst. instance . async_ctx ( ) ;
826
- let ws_log = rqctx. log . new ( o ! ( ) ) ;
827
- let err_log = ws_log. clone ( ) ;
774
+ let err_log = rqctx. log . new ( o ! ( ) ) ;
828
775
829
776
// Create or get active serial task handle and channels
830
777
let serial_task = inst. serial_task . get_or_insert_with ( move || {
@@ -836,49 +783,33 @@ async fn instance_serial(
836
783
websocks_recv,
837
784
close_recv,
838
785
serial,
839
- ws_log . clone ( ) ,
786
+ err_log . clone ( ) ,
840
787
& actx,
841
788
)
842
789
. await
843
790
{
844
- error ! ( ws_log , "Failed to spawn instance serial task: {}" , e) ;
791
+ error ! ( err_log , "Failed to spawn instance serial task: {}" , e) ;
845
792
}
846
793
} ) ;
847
794
848
795
SerialTask { task, close_ch : Some ( close_ch) , websocks_ch }
849
796
} ) ;
850
797
851
- let upgrade_fut = upgrade:: on ( request) ;
852
798
let config =
853
799
WebSocketConfig { max_send_queue : Some ( 4096 ) , ..Default :: default ( ) } ;
854
800
let websocks_send = serial_task. websocks_ch . clone ( ) ;
855
- tokio:: spawn ( async move {
856
- let upgraded = match upgrade_fut. await {
857
- Ok ( u) => u,
858
- Err ( e) => {
859
- error ! ( err_log, "Serial socket upgrade failed: {}" , e) ;
860
- return ;
861
- }
862
- } ;
863
-
864
- let ws_stream = WebSocketStream :: from_raw_socket (
865
- upgraded,
866
- Role :: Server ,
867
- Some ( config) ,
868
- )
869
- . await ;
870
801
871
- if let Err ( e) = websocks_send. send ( ws_stream) . await {
872
- error ! ( err_log, "Serial socket hand-off failed: {}" , e) ;
873
- }
874
- } ) ;
802
+ let ws_stream = WebSocketStream :: from_raw_socket (
803
+ websock. into_inner ( ) ,
804
+ Role :: Server ,
805
+ Some ( config) ,
806
+ )
807
+ . await ;
875
808
876
- Ok ( Response :: builder ( )
877
- . status ( StatusCode :: SWITCHING_PROTOCOLS )
878
- . header ( header:: CONNECTION , "Upgrade" )
879
- . header ( header:: UPGRADE , "websocket" )
880
- . header ( header:: SEC_WEBSOCKET_ACCEPT , accept_key)
881
- . body ( Body :: empty ( ) ) ?)
809
+ websocks_send
810
+ . send ( ws_stream. into ( ) )
811
+ . await
812
+ . map_err ( |e| format ! ( "Serial socket hand-off failed: {}" , e) . into ( ) )
882
813
}
883
814
884
815
// This endpoint is meant to only be called during a migration from the destination
0 commit comments