diff --git a/plc4rs/examples/transport_usage.rs b/plc4rs/examples/transport_usage.rs new file mode 100644 index 0000000000..5937c72389 --- /dev/null +++ b/plc4rs/examples/transport_usage.rs @@ -0,0 +1,34 @@ +use plc4rs::spi::{ + TcpTransport, + config::{TransportConfig, TcpConfig}, +}; +use std::time::Duration; + +#[tokio::main] +async fn main() -> Result<(), Box> { + // TCP Example + let tcp_config = TcpConfig { + base: TransportConfig { + connect_timeout: Duration::from_secs(10), + read_timeout: Duration::from_secs(2), + write_timeout: Duration::from_secs(2), + buffer_size: 1024, + }, + no_delay: true, + keep_alive: true, + }; + + let mut tcp = TcpTransport::new_with_config("192.168.1.1".into(), 102, tcp_config); + tcp.connect().await?; + + let data = b"Hello PLC"; + tcp.write(data).await?; + + let mut buffer = vec![0u8; 1024]; + let len = tcp.read(&mut buffer).await?; + println!("Received: {:?}", &buffer[..len]); + + tcp.close().await?; + + Ok(()) +} diff --git a/plc4rs/src/spi/config.rs b/plc4rs/src/spi/config.rs new file mode 100644 index 0000000000..6cff36368e --- /dev/null +++ b/plc4rs/src/spi/config.rs @@ -0,0 +1,27 @@ +use std::time::Duration; + +#[derive(Debug, Clone)] +pub struct TransportConfig { + pub connect_timeout: Duration, + pub read_timeout: Duration, + pub write_timeout: Duration, + pub buffer_size: usize, +} + +impl Default for TransportConfig { + fn default() -> Self { + TransportConfig { + connect_timeout: Duration::from_secs(5), + read_timeout: Duration::from_secs(1), + write_timeout: Duration::from_secs(1), + buffer_size: 8192, + } + } +} + +#[derive(Debug, Clone)] +pub struct TcpConfig { + pub base: TransportConfig, + pub no_delay: bool, + pub keep_alive: bool, +} diff --git a/plc4rs/src/spi/error.rs b/plc4rs/src/spi/error.rs new file mode 100644 index 0000000000..83598e7bf6 --- /dev/null +++ b/plc4rs/src/spi/error.rs @@ -0,0 +1,17 @@ +use thiserror::Error; +use std::io; + +#[derive(Error, Debug)] +pub enum TransportError { + #[error("IO error: {0}")] + Io(#[from] io::Error), + + #[error("Connection error: {0}")] + Connection(String), + + #[error("Not connected")] + NotConnected, + + #[error("Already connected")] + AlreadyConnected, +} diff --git a/plc4rs/src/spi/mod.rs b/plc4rs/src/spi/mod.rs new file mode 100644 index 0000000000..90e8aeed7a --- /dev/null +++ b/plc4rs/src/spi/mod.rs @@ -0,0 +1,141 @@ +//! Service Provider Interface (SPI) for PLC4X +//! +//! This module provides the core abstractions for implementing different transport +//! mechanisms in PLC4X. The main trait is `Transport` which defines the basic +//! operations that any transport implementation must provide. +//! +//! # Transport Types +//! +//! Currently implemented: +//! - TCP: For TCP/IP based protocols +//! +//! # Example +//! ```rust +//! use plc4rs::spi::{Transport, TcpTransport}; +//! +//! async fn example() { +//! let mut transport = TcpTransport::new("192.168.1.1".to_string(), 102); +//! transport.connect().await.unwrap(); +//! // ... use transport ... +//! transport.close().await.unwrap(); +//! } +//! ``` + +use std::fmt::Debug; +use tracing::{debug, error, info, warn}; + +/// Retry configuration for transport operations +#[derive(Debug, Clone)] +pub struct RetryConfig { + /// Maximum number of retry attempts + pub max_attempts: u32, + /// Delay between retry attempts + pub retry_delay: std::time::Duration, + /// Whether to use exponential backoff + pub use_backoff: bool, +} + +impl Default for RetryConfig { + fn default() -> Self { + Self { + max_attempts: 3, + retry_delay: std::time::Duration::from_millis(100), + use_backoff: true, + } + } +} + +/// Core trait for implementing transport mechanisms +#[async_trait::async_trait] +pub trait Transport: Send + Sync { + /// Establishes a connection to the target device + async fn connect(&mut self) -> Result<(), TransportError> { + self.connect_with_retry(RetryConfig::default()).await + } + + /// Connects with retry logic + async fn connect_with_retry(&mut self, retry_config: RetryConfig) -> Result<(), TransportError> { + let mut attempt = 0; + let mut delay = retry_config.retry_delay; + + loop { + attempt += 1; + match self.connect_internal().await { + Ok(()) => { + info!("Connection established on attempt {}", attempt); + return Ok(()); + } + Err(e) => { + if attempt >= retry_config.max_attempts { + error!("Connection failed after {} attempts: {}", attempt, e); + return Err(e); + } + warn!("Connection attempt {} failed: {}", attempt, e); + tokio::time::sleep(delay).await; + if retry_config.use_backoff { + delay *= 2; + } + } + } + } + } + + /// Internal connect implementation + #[doc(hidden)] + async fn connect_internal(&mut self) -> Result<(), TransportError>; + + /// Reads data with logging + async fn read(&mut self, buffer: &mut [u8]) -> Result { + debug!("Attempting to read {} bytes", buffer.len()); + match self.read_internal(buffer).await { + Ok(n) => { + debug!("Successfully read {} bytes", n); + Ok(n) + } + Err(e) => { + error!("Read error: {}", e); + Err(e) + } + } + } + + /// Internal read implementation + #[doc(hidden)] + async fn read_internal(&mut self, buffer: &mut [u8]) -> Result; + + /// Writes data with logging + async fn write(&mut self, data: &[u8]) -> Result { + debug!("Attempting to write {} bytes", data.len()); + match self.write_internal(data).await { + Ok(n) => { + debug!("Successfully wrote {} bytes", n); + Ok(n) + } + Err(e) => { + error!("Write error: {}", e); + Err(e) + } + } + } + + /// Internal write implementation + #[doc(hidden)] + async fn write_internal(&mut self, data: &[u8]) -> Result; + + /// Closes the connection with logging + async fn close(&mut self) -> Result<(), TransportError> { + info!("Closing connection"); + self.close_internal().await + } + + /// Internal close implementation + #[doc(hidden)] + async fn close_internal(&mut self) -> Result<(), TransportError>; +} + +// Implement transport types +pub mod tcp; +pub mod error; + +pub use error::TransportError; +pub use tcp::TcpTransport; diff --git a/plc4rs/src/spi/tcp.rs b/plc4rs/src/spi/tcp.rs new file mode 100644 index 0000000000..4274b63ba2 --- /dev/null +++ b/plc4rs/src/spi/tcp.rs @@ -0,0 +1,89 @@ +use tokio::net::TcpStream; +use crate::spi::{Transport, TransportError}; +use crate::spi::config::TcpConfig; +use std::io; + +pub struct TcpTransport { + stream: Option, + address: String, + port: u16, + config: TcpConfig, +} + +impl TcpTransport { + pub fn new(address: String, port: u16) -> Self { + Self::new_with_config(address, port, TcpConfig { + base: Default::default(), + no_delay: true, + keep_alive: true, + }) + } + + pub fn new_with_config(address: String, port: u16, config: TcpConfig) -> Self { + TcpTransport { + stream: None, + address, + port, + config, + } + } +} + +impl Transport for TcpTransport { + async fn connect_internal(&mut self) -> Result<(), TransportError> { + if self.stream.is_some() { + return Err(TransportError::AlreadyConnected); + } + + let addr = format!("{}:{}", self.address, self.port); + let stream = TcpStream::connect(addr).await?; + + // Apply TCP-specific settings + stream.set_nodelay(self.config.no_delay)?; + stream.set_keepalive(self.config.keep_alive.then_some(self.config.base.connect_timeout))?; + + self.stream = Some(stream); + Ok(()) + } + + async fn read(&mut self, buffer: &mut [u8]) -> Result { + let stream = self.stream.as_mut() + .ok_or(TransportError::NotConnected)?; + + use tokio::io::AsyncReadExt; + Ok(stream.read(buffer).await?) + } + + async fn write(&mut self, data: &[u8]) -> Result { + let stream = self.stream.as_mut() + .ok_or(TransportError::NotConnected)?; + + use tokio::io::AsyncWriteExt; + Ok(stream.write(data).await?) + } + + async fn close(&mut self) -> Result<(), TransportError> { + if let Some(stream) = self.stream.take() { + use tokio::io::AsyncWriteExt; + stream.shutdown().await?; + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tokio_test::block_on; + + #[test] + fn test_tcp_transport() { + let mut transport = TcpTransport::new("127.0.0.1".to_string(), 102); + + // Test connection + block_on(async { + assert!(transport.connect().await.is_err()); // Should fail as no server is running + assert!(transport.stream.is_none()); + }); + } +}