From 3ca0d33b9c0c2c9b1cd994b73b9ce15b4ac3a824 Mon Sep 17 00:00:00 2001 From: Shubham Pawar Date: Fri, 6 Jan 2023 23:51:35 +0530 Subject: [PATCH] Refactor net::PortForwarder to reduce `handle` fn This change reduces the code size of `handle` fn inside `PortForwarder` by moving the code inside it to new functions `handle_plaintext`, `handle_encrypt` and `handle_decrypt`. This makes the code more readable and easier to understand. Also, common code between `handle_encrypt` and `handle_decrypt` is moved to two new functions `initiator_handshake` and `responder_handshake`. All the changes are made without changing the functionality of the code and as such should not break anything and should be compatible with the previous commit. --- src/cli.rs | 2 +- src/lib.rs | 1 - src/net.rs | 373 +++++++++++++++++++++++++++-------------------------- 3 files changed, 193 insertions(+), 183 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index 96e1ec5..9aa53f4 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -274,7 +274,7 @@ pub enum ConsumerCommands { }, /// Stop device screen mirroring for a device. /// Note: This doesn't work if the Consumer and Supplier are on the same machine. - /// see: https://github.com/mobi-nex/adborc/issues/16 for more information. + /// see: https://github.com/mobi-nex/adborc/issues/16 for more information. StopScrcpy { /// `device_id` of the device to stop scrcpy for. #[clap(value_parser)] diff --git a/src/lib.rs b/src/lib.rs index 7b31dd7..7c79455 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -65,7 +65,6 @@ //! net::TCPClient, //! util::SysStateDefaultConfig //! }; -//! # use serde_json; //! //! // start the system in a separate thread. //! thread::spawn(|| SysState::start_system().unwrap()); diff --git a/src/net.rs b/src/net.rs index 84905a1..b99b511 100644 --- a/src/net.rs +++ b/src/net.rs @@ -510,7 +510,10 @@ impl PortForwarder { debug!("Sent stop signal to portforwarder"); // Portforwader checks for stop signal only when a new connection is received. // Force a new connection to be established to stop the server. - if TcpStream::connect(format!("127.0.0.1:{}", self.src_port)).await.is_err() { + if TcpStream::connect(format!("127.0.0.1:{}", self.src_port)) + .await + .is_err() + { warn!( "Error establishing connection for stopping the Portforwader. Maybe the server is already stopped."); }; @@ -556,6 +559,7 @@ impl PortForwarder { })?; let handle = tokio::spawn(async move { + let peer_key = peer_key.clone().unwrap_or_default(); loop { let result = listener.accept().await; match result { @@ -585,197 +589,204 @@ impl PortForwarder { // This method handles incoming connection from client and forwards it to dst_host:dst_port. async fn handle( - mut client_stream: TcpStream, + client_stream: TcpStream, dst_addr: SocketAddr, - peer_key: &Option, + peer_key: &Key, mode: &PortForwardMode, ) { debug!("Entering _handle"); debug!("Connecting to destination address: {}", dst_addr); - if let Ok(mut server_stream) = TcpStream::connect(dst_addr).await { - debug!("Connected to destination address: {}", dst_addr); - match mode { - PortForwardMode::PlainText | PortForwardMode::PlainTextAll => { - let (mut client_reader, mut client_writer) = client_stream.into_split(); - let (mut server_reader, mut server_writer) = server_stream.into_split(); - task::spawn(async move { - trace!("Starting _forward_stream thread 1"); - Self::forward_stream(&mut client_reader, &mut server_writer, 1).await; - trace!("Returned from _forward_stream thread 1"); - }); - task::spawn(async move { - trace!("Starting _forward_stream thread 2"); - Self::forward_stream(&mut server_reader, &mut client_writer, 2).await; - trace!("Returned from _forward_stream thread 2"); - }); - } + let connection = TcpStream::connect(dst_addr).await; + if connection.is_err() { + error!("Error connecting to destination address: {}", dst_addr); + return; + } + debug!("Connected to destination address: {}", dst_addr); + let server_stream = connection.unwrap(); + match mode { + PortForwardMode::PlainText | PortForwardMode::PlainTextAll => { + Self::handle_plaintext(client_stream, server_stream).await; + } - PortForwardMode::Encrypt => { - // Unwrapping the following should be safe because we have already checked - // during initialization that the peer_key is not None and private_key is not none. - let peer_key = peer_key.clone().unwrap(); - let private_key = SystemKeypair::get_private_key().unwrap(); - let initiator = Noise::build_portforwarder_initiator(&private_key, &peer_key); - if initiator.is_err() { - error!( - "Error building portforwarder initiator in encrypt mode: {}", - initiator.err().unwrap() - ); - return; - } - let initiator = initiator.unwrap(); - let mut buf = vec![0_u8; 65535]; - debug!("Starting portforwarder initiator handshake in encrypt mode"); - let enc_transport = Noise::portforwarder_initiator_handshake( - initiator, - &mut server_stream, - &mut buf[..], - ) - .await; - if enc_transport.is_err() { - error!( - "Error in portforwarder initiator handshake in encrypt mode: {}", - enc_transport.err().unwrap() - ); - return; - } - let mut enc_transport = enc_transport.unwrap(); - debug!("Portforwarder initiator handshake completed in encrypt mode"); - debug!("Starting portforwarder responder in encrypt mode"); - let responder = Noise::build_portforwarder_responder(&private_key, &peer_key); - if responder.is_err() { - error!( - "Error building portforwarder responder in encrypt mode: {}", - responder.err().unwrap() - ); - return; - } - let responder = responder.unwrap(); - let dec_transport = Noise::portforwarder_responder_handshake( - responder, - &mut server_stream, - &mut buf[..], - ) - .await; - if dec_transport.is_err() { - error!( - "Error in portforwarder responder handshake in encrypt mode: {}", - dec_transport.err().unwrap() - ); - return; - } - let mut dec_transport = dec_transport.unwrap(); - debug!("Portforwarder responder handshake completed in encrypt mode"); - let (mut client_reader, mut client_writer) = client_stream.into_split(); - let (mut server_reader, mut server_writer) = server_stream.into_split(); - task::spawn(async move { - trace!("Starting _forward_stream thread 1"); - Self::forward_stream_encrypt( - &mut client_reader, - &mut server_writer, - &mut enc_transport, - ) - .await; - trace!("Returned from _forward_stream thread 1"); - }); - task::spawn(async move { - trace!("Starting _forward_stream thread 2"); - Self::forward_stream_decrypt( - &mut server_reader, - &mut client_writer, - &mut dec_transport, - ) - .await; - trace!("Returned from _forward_stream thread 2"); - }); - } + PortForwardMode::Encrypt => { + // Unwrapping the following should be safe because we have already checked + // during initialization that the peer_key is not None and private_key is not none. + let private_key = + SystemKeypair::get_private_key().expect("Unable to get private key"); + Self::handle_encrypt(client_stream, server_stream, peer_key, &private_key).await; + } - PortForwardMode::Decrypt => { - // Unwrapping the following should be safe because we have already checked - // during initialization that the peer_key is not None and private_key is not none. - let peer_key = peer_key.clone().unwrap(); - let private_key = SystemKeypair::get_private_key().unwrap(); - let responder = Noise::build_portforwarder_responder(&private_key, &peer_key); - if responder.is_err() { - error!( - "Error building portforwarder responder in decrypt mode: {}", - responder.err().unwrap() - ); - return; - } - let responder = responder.unwrap(); - let mut buf = vec![0_u8; 65535]; - debug!("Starting portforwarder responder handshake in decrypt mode"); - let dec_transport = Noise::portforwarder_responder_handshake( - responder, - &mut client_stream, - &mut buf[..], - ) - .await; - if dec_transport.is_err() { - error!( - "Error in portforwarder responder handshake in decrypt mode: {}", - dec_transport.err().unwrap() - ); - return; - } - let mut dec_transport = dec_transport.unwrap(); - debug!("Portforwarder responder handshake completed in decrypt mode"); - debug!("Starting portforwarder initiator in decrypt mode"); - let initiator = Noise::build_portforwarder_initiator(&private_key, &peer_key); - if initiator.is_err() { - error!( - "Error building portforwarder initiator in decrypt mode: {}", - initiator.err().unwrap() - ); - return; - } - let initiator = initiator.unwrap(); - debug!("Starting portforwarder initiator handshake in decrypt mode"); - let enc_transport = Noise::portforwarder_initiator_handshake( - initiator, - &mut client_stream, - &mut buf[..], - ) - .await; - if enc_transport.is_err() { - error!( - "Error in portforwarder initiator handshake in decrypt mode: {}", - enc_transport.err().unwrap() - ); - return; - } - let mut enc_transport = enc_transport.unwrap(); - debug!("Portforwarder initiator handshake completed in decrypt mode"); - let (mut client_reader, mut client_writer) = client_stream.into_split(); - let (mut server_reader, mut server_writer) = server_stream.into_split(); - task::spawn(async move { - trace!("Starting _forward_stream thread 1"); - Self::forward_stream_decrypt( - &mut client_reader, - &mut server_writer, - &mut dec_transport, - ) - .await; - trace!("Returned from _forward_stream thread 1"); - }); - task::spawn(async move { - trace!("Starting _forward_stream thread 2"); - Self::forward_stream_encrypt( - &mut server_reader, - &mut client_writer, - &mut enc_transport, - ) - .await; - trace!("Returned from _forward_stream thread 2"); - }); - } + PortForwardMode::Decrypt => { + // Unwrapping the following should be safe because we have already checked + // during initialization that the peer_key is not None and private_key is not none. + let private_key = + SystemKeypair::get_private_key().expect("Unable to get private key"); + Self::handle_decrypt(client_stream, server_stream, peer_key, &private_key).await; } - } else { - error!("Error connecting to destination address: {}", dst_addr); } } + async fn handle_plaintext(client_stream: TcpStream, server_stream: TcpStream) { + let (mut client_reader, mut client_writer) = client_stream.into_split(); + let (mut server_reader, mut server_writer) = server_stream.into_split(); + task::spawn(async move { + trace!("Starting _forward_stream thread 1"); + Self::forward_stream(&mut client_reader, &mut server_writer, 1).await; + trace!("Returned from _forward_stream thread 1"); + }); + task::spawn(async move { + trace!("Starting _forward_stream thread 2"); + Self::forward_stream(&mut server_reader, &mut client_writer, 2).await; + trace!("Returned from _forward_stream thread 2"); + }); + } + + async fn handle_encrypt( + client_stream: TcpStream, + mut server_stream: TcpStream, + peer_key: &Key, + private_key: &Key, + ) { + let mut buf = vec![0_u8; 65535]; + // For each connection, we do two authenticated handshakes. + // One for reading from the client and writing to the server, and + // one for reading from the server and writing to the client. + + debug!("Starting portforwarder initiator handshake in encrypt mode"); + let enc_transport = + Self::initiator_handshake(peer_key, private_key, &mut server_stream, &mut buf[..]) + .await; + if enc_transport.is_err() { + error!( + "Error in portforwarder initiator handshake in encrypt mode: {}", + enc_transport.err().unwrap() + ); + return; + } + let mut enc_transport = enc_transport.unwrap(); + debug!("Portforwarder initiator handshake completed in encrypt mode"); + + debug!("Starting portforwarder responder in encrypt mode"); + let dec_transport = + Self::responder_handshake(peer_key, private_key, &mut server_stream, &mut buf[..]) + .await; + if dec_transport.is_err() { + error!( + "Error in portforwarder responder handshake in encrypt mode: {}", + dec_transport.err().unwrap() + ); + return; + } + let mut dec_transport = dec_transport.unwrap(); + debug!("Portforwarder responder handshake completed in encrypt mode"); + + let (mut client_reader, mut client_writer) = client_stream.into_split(); + let (mut server_reader, mut server_writer) = server_stream.into_split(); + task::spawn(async move { + trace!("Starting _forward_stream thread 1"); + Self::forward_stream_encrypt( + &mut client_reader, + &mut server_writer, + &mut enc_transport, + ) + .await; + trace!("Returned from _forward_stream thread 1"); + }); + task::spawn(async move { + trace!("Starting _forward_stream thread 2"); + Self::forward_stream_decrypt( + &mut server_reader, + &mut client_writer, + &mut dec_transport, + ) + .await; + trace!("Returned from _forward_stream thread 2"); + }); + } + + async fn handle_decrypt( + mut client_stream: TcpStream, + server_stream: TcpStream, + peer_key: &Key, + private_key: &Key, + ) { + let mut buf = vec![0_u8; 65535]; + // For each connection, we do two authenticated handshakes. + // One for reading from the server and writing to the client, and + // one for reading from the client and writing to the server. + + debug!("Starting portforwarder responder handshake in decrypt mode"); + let dec_transport = + Self::responder_handshake(peer_key, private_key, &mut client_stream, &mut buf[..]) + .await; + if dec_transport.is_err() { + error!( + "Error in portforwarder responder handshake in decrypt mode: {}", + dec_transport.err().unwrap() + ); + return; + } + let mut dec_transport = dec_transport.unwrap(); + debug!("Portforwarder responder handshake completed in decrypt mode"); + + debug!("Starting portforwarder initiator in decrypt mode"); + let enc_transport = + Self::initiator_handshake(peer_key, private_key, &mut client_stream, &mut buf[..]) + .await; + if enc_transport.is_err() { + error!( + "Error in portforwarder initiator handshake in decrypt mode: {}", + enc_transport.err().unwrap() + ); + return; + } + let mut enc_transport = enc_transport.unwrap(); + debug!("Portforwarder initiator handshake completed in decrypt mode"); + + let (mut client_reader, mut client_writer) = client_stream.into_split(); + let (mut server_reader, mut server_writer) = server_stream.into_split(); + task::spawn(async move { + trace!("Starting _forward_stream thread 1"); + Self::forward_stream_decrypt( + &mut client_reader, + &mut server_writer, + &mut dec_transport, + ) + .await; + trace!("Returned from _forward_stream thread 1"); + }); + task::spawn(async move { + trace!("Starting _forward_stream thread 2"); + Self::forward_stream_encrypt( + &mut server_reader, + &mut client_writer, + &mut enc_transport, + ) + .await; + trace!("Returned from _forward_stream thread 2"); + }); + } + + async fn initiator_handshake( + peer_key: &Key, + private_key: &Key, + stream: &mut TcpStream, + buf: &mut [u8], + ) -> io::Result { + let initiator = Noise::build_portforwarder_initiator(private_key, peer_key)?; + Noise::portforwarder_initiator_handshake(initiator, stream, buf).await + } + + async fn responder_handshake( + peer_key: &Key, + private_key: &Key, + stream: &mut TcpStream, + buf: &mut [u8], + ) -> io::Result { + let responder = Noise::build_portforwarder_responder(private_key, peer_key)?; + Noise::portforwarder_responder_handshake(responder, stream, buf).await + } + // This function reads the packets received from one TcpStream and // forwards them to another TcpStream. async fn forward_stream(