@@ -9,11 +9,11 @@ use std::sync::Arc;
9
9
use std:: { collections:: BTreeMap , net:: SocketAddr } ;
10
10
11
11
use dropshot:: {
12
- endpoint, ApiDescription , HttpError , HttpResponseCreated , HttpResponseOk ,
13
- HttpResponseUpdatedNoContent , Path , RequestContext , TypedBody ,
12
+ channel, endpoint, ApiDescription , HttpError , HttpResponseCreated ,
13
+ HttpResponseOk , HttpResponseUpdatedNoContent , Path , RequestContext ,
14
+ TypedBody , WebsocketConnection ,
14
15
} ;
15
- use hyper:: StatusCode ;
16
- use hyper:: { http:: header, upgrade, Body , Response } ;
16
+ use hyper:: { Body , Response } ;
17
17
use oximeter:: types:: ProducerRegistry ;
18
18
use propolis_client:: instance_spec;
19
19
use propolis_client:: { api, instance_spec:: InstanceSpec } ;
@@ -22,7 +22,6 @@ use rfb::server::VncServer;
22
22
use slog:: { error, o, Logger } ;
23
23
use thiserror:: Error ;
24
24
use tokio:: sync:: { mpsc, oneshot, MappedMutexGuard , Mutex , MutexGuard } ;
25
- use tokio_tungstenite:: tungstenite:: handshake;
26
25
use tokio_tungstenite:: tungstenite:: protocol:: { Role , WebSocketConfig } ;
27
26
use tokio_tungstenite:: WebSocketStream ;
28
27
@@ -604,73 +603,21 @@ async fn instance_state_put(
604
603
result
605
604
}
606
605
607
- #[ endpoint {
608
- method = GET ,
606
+ #[ channel {
607
+ protocol = WEBSOCKETS ,
609
608
path = "/instance/serial" ,
610
609
} ]
611
610
async fn instance_serial (
612
611
rqctx : Arc < RequestContext < DropshotEndpointContext > > ,
613
- ) -> Result < Response < Body > , HttpError > {
612
+ websock : WebsocketConnection ,
613
+ ) -> dropshot:: WebsocketChannelResult {
614
614
let ctx = rqctx. context ( ) ;
615
615
let vm = ctx. vm ( ) . await ?;
616
616
let serial = vm. com1 ( ) . clone ( ) ;
617
- let request = & mut * rqctx. request . lock ( ) . await ;
618
-
619
- if !request
620
- . headers ( )
621
- . get ( header:: CONNECTION )
622
- . and_then ( |hv| hv. to_str ( ) . ok ( ) )
623
- . map ( |hv| {
624
- hv. split ( |c| c == ',' || c == ' ' )
625
- . any ( |vs| vs. eq_ignore_ascii_case ( "upgrade" ) )
626
- } )
627
- . unwrap_or ( false )
628
- {
629
- return Err ( HttpError :: for_bad_request (
630
- None ,
631
- "expected connection upgrade" . to_string ( ) ,
632
- ) ) ;
633
- }
634
- if !request
635
- . headers ( )
636
- . get ( header:: UPGRADE )
637
- . and_then ( |v| v. to_str ( ) . ok ( ) )
638
- . map ( |v| {
639
- v. split ( |c| c == ',' || c == ' ' )
640
- . any ( |v| v. eq_ignore_ascii_case ( "websocket" ) )
641
- } )
642
- . unwrap_or ( false )
643
- {
644
- return Err ( HttpError :: for_bad_request (
645
- None ,
646
- "unexpected protocol for upgrade" . to_string ( ) ,
647
- ) ) ;
648
- }
649
- if request
650
- . headers ( )
651
- . get ( header:: SEC_WEBSOCKET_VERSION )
652
- . map ( |v| v. as_bytes ( ) )
653
- != Some ( b"13" )
654
- {
655
- return Err ( HttpError :: for_bad_request (
656
- None ,
657
- "missing or invalid websocket version" . to_string ( ) ,
658
- ) ) ;
659
- }
660
- let accept_key = request
661
- . headers ( )
662
- . get ( header:: SEC_WEBSOCKET_KEY )
663
- . map ( |hv| hv. as_bytes ( ) )
664
- . map ( handshake:: derive_accept_key)
665
- . ok_or_else ( || {
666
- HttpError :: for_bad_request (
667
- None ,
668
- "missing websocket key" . to_string ( ) ,
669
- )
670
- } ) ?;
671
617
672
- let ws_log = rqctx. log . new ( o ! ( ) ) ;
673
- let err_log = ws_log. clone ( ) ;
618
+ let err_log = rqctx. log . new ( o ! ( ) ) ;
619
+
620
+ // Create or get active serial task handle and channels
674
621
let mut serial_task = ctx. services . serial_task . lock ( ) . await ;
675
622
let serial_task = serial_task. get_or_insert_with ( move || {
676
623
let ( websocks_ch, websocks_recv) = mpsc:: channel ( 1 ) ;
@@ -681,11 +628,11 @@ async fn instance_serial(
681
628
websocks_recv,
682
629
close_recv,
683
630
serial,
684
- ws_log . clone ( ) ,
631
+ err_log . clone ( ) ,
685
632
)
686
633
. await
687
634
{
688
- error ! ( ws_log , "Failed to spawn instance serial task: {}" , e) ;
635
+ error ! ( err_log , "Failed to spawn instance serial task: {}" , e) ;
689
636
}
690
637
} ) ;
691
638
@@ -696,37 +643,21 @@ async fn instance_serial(
696
643
}
697
644
} ) ;
698
645
699
- let upgrade_fut = upgrade:: on ( request) ;
700
646
let config =
701
647
WebSocketConfig { max_send_queue : Some ( 4096 ) , ..Default :: default ( ) } ;
702
648
let websocks_send = serial_task. websocks_ch . clone ( ) ;
703
- tokio:: spawn ( async move {
704
- let upgraded = match upgrade_fut. await {
705
- Ok ( u) => u,
706
- Err ( e) => {
707
- error ! ( err_log, "Serial socket upgrade failed: {}" , e) ;
708
- return ;
709
- }
710
- } ;
711
649
712
- let ws_stream = WebSocketStream :: from_raw_socket (
713
- upgraded ,
714
- Role :: Server ,
715
- Some ( config) ,
716
- )
717
- . await ;
650
+ let ws_stream = WebSocketStream :: from_raw_socket (
651
+ websock . into_inner ( ) ,
652
+ Role :: Server ,
653
+ Some ( config) ,
654
+ )
655
+ . await ;
718
656
719
- if let Err ( e) = websocks_send. send ( ws_stream) . await {
720
- error ! ( err_log, "Serial socket hand-off failed: {}" , e) ;
721
- }
722
- } ) ;
723
-
724
- Ok ( Response :: builder ( )
725
- . status ( StatusCode :: SWITCHING_PROTOCOLS )
726
- . header ( header:: CONNECTION , "Upgrade" )
727
- . header ( header:: UPGRADE , "websocket" )
728
- . header ( header:: SEC_WEBSOCKET_ACCEPT , accept_key)
729
- . body ( Body :: empty ( ) ) ?)
657
+ websocks_send
658
+ . send ( ws_stream. into ( ) )
659
+ . await
660
+ . map_err ( |e| format ! ( "Serial socket hand-off failed: {}" , e) . into ( ) )
730
661
}
731
662
732
663
// This endpoint is meant to only be called during a migration from the destination
0 commit comments