Skip to content

Commit

Permalink
feat(plc4rs): Add Transport trait with TCP impl.
Browse files Browse the repository at this point in the history
Add initial SPI layer for PLC4X Rust with:
- Transport trait with retry and logging support
- TCP transport implementation with configuration
- Basic error handling using thiserror
- Example showing TCP transport usage

This provides the foundation for protocol-specific implementations.
  • Loading branch information
jsxs0 committed Feb 20, 2025
1 parent 629e024 commit c6bf64f
Show file tree
Hide file tree
Showing 5 changed files with 308 additions and 0 deletions.
34 changes: 34 additions & 0 deletions plc4rs/examples/transport_usage.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use plc4rs::spi::{
TcpTransport,
config::{TransportConfig, TcpConfig},
};
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// TCP Example
let tcp_config = TcpConfig {
base: TransportConfig {
connect_timeout: Duration::from_secs(10),
read_timeout: Duration::from_secs(2),
write_timeout: Duration::from_secs(2),
buffer_size: 1024,
},
no_delay: true,
keep_alive: true,
};

let mut tcp = TcpTransport::new_with_config("192.168.1.1".into(), 102, tcp_config);
tcp.connect().await?;

let data = b"Hello PLC";
tcp.write(data).await?;

let mut buffer = vec![0u8; 1024];
let len = tcp.read(&mut buffer).await?;
println!("Received: {:?}", &buffer[..len]);

tcp.close().await?;

Ok(())
}
27 changes: 27 additions & 0 deletions plc4rs/src/spi/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use std::time::Duration;

#[derive(Debug, Clone)]
pub struct TransportConfig {
pub connect_timeout: Duration,
pub read_timeout: Duration,
pub write_timeout: Duration,
pub buffer_size: usize,
}

impl Default for TransportConfig {
fn default() -> Self {
TransportConfig {
connect_timeout: Duration::from_secs(5),
read_timeout: Duration::from_secs(1),
write_timeout: Duration::from_secs(1),
buffer_size: 8192,
}
}
}

#[derive(Debug, Clone)]
pub struct TcpConfig {
pub base: TransportConfig,
pub no_delay: bool,
pub keep_alive: bool,
}
17 changes: 17 additions & 0 deletions plc4rs/src/spi/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use thiserror::Error;
use std::io;

#[derive(Error, Debug)]
pub enum TransportError {
#[error("IO error: {0}")]
Io(#[from] io::Error),

#[error("Connection error: {0}")]
Connection(String),

#[error("Not connected")]
NotConnected,

#[error("Already connected")]
AlreadyConnected,
}
141 changes: 141 additions & 0 deletions plc4rs/src/spi/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
//! Service Provider Interface (SPI) for PLC4X
//!
//! This module provides the core abstractions for implementing different transport
//! mechanisms in PLC4X. The main trait is `Transport` which defines the basic
//! operations that any transport implementation must provide.
//!
//! # Transport Types
//!
//! Currently implemented:
//! - TCP: For TCP/IP based protocols
//!
//! # Example
//! ```rust
//! use plc4rs::spi::{Transport, TcpTransport};
//!
//! async fn example() {
//! let mut transport = TcpTransport::new("192.168.1.1".to_string(), 102);
//! transport.connect().await.unwrap();
//! // ... use transport ...
//! transport.close().await.unwrap();
//! }
//! ```
use std::fmt::Debug;
use tracing::{debug, error, info, warn};

/// Retry configuration for transport operations
#[derive(Debug, Clone)]
pub struct RetryConfig {
/// Maximum number of retry attempts
pub max_attempts: u32,
/// Delay between retry attempts
pub retry_delay: std::time::Duration,
/// Whether to use exponential backoff
pub use_backoff: bool,
}

impl Default for RetryConfig {
fn default() -> Self {
Self {
max_attempts: 3,
retry_delay: std::time::Duration::from_millis(100),
use_backoff: true,
}
}
}

/// Core trait for implementing transport mechanisms
#[async_trait::async_trait]
pub trait Transport: Send + Sync {
/// Establishes a connection to the target device
async fn connect(&mut self) -> Result<(), TransportError> {
self.connect_with_retry(RetryConfig::default()).await
}

/// Connects with retry logic
async fn connect_with_retry(&mut self, retry_config: RetryConfig) -> Result<(), TransportError> {
let mut attempt = 0;
let mut delay = retry_config.retry_delay;

loop {
attempt += 1;
match self.connect_internal().await {
Ok(()) => {
info!("Connection established on attempt {}", attempt);
return Ok(());
}
Err(e) => {
if attempt >= retry_config.max_attempts {
error!("Connection failed after {} attempts: {}", attempt, e);
return Err(e);
}
warn!("Connection attempt {} failed: {}", attempt, e);
tokio::time::sleep(delay).await;
if retry_config.use_backoff {
delay *= 2;
}
}
}
}
}

/// Internal connect implementation
#[doc(hidden)]
async fn connect_internal(&mut self) -> Result<(), TransportError>;

/// Reads data with logging
async fn read(&mut self, buffer: &mut [u8]) -> Result<usize, TransportError> {
debug!("Attempting to read {} bytes", buffer.len());
match self.read_internal(buffer).await {
Ok(n) => {
debug!("Successfully read {} bytes", n);
Ok(n)
}
Err(e) => {
error!("Read error: {}", e);
Err(e)
}
}
}

/// Internal read implementation
#[doc(hidden)]
async fn read_internal(&mut self, buffer: &mut [u8]) -> Result<usize, TransportError>;

/// Writes data with logging
async fn write(&mut self, data: &[u8]) -> Result<usize, TransportError> {
debug!("Attempting to write {} bytes", data.len());
match self.write_internal(data).await {
Ok(n) => {
debug!("Successfully wrote {} bytes", n);
Ok(n)
}
Err(e) => {
error!("Write error: {}", e);
Err(e)
}
}
}

/// Internal write implementation
#[doc(hidden)]
async fn write_internal(&mut self, data: &[u8]) -> Result<usize, TransportError>;

/// Closes the connection with logging
async fn close(&mut self) -> Result<(), TransportError> {
info!("Closing connection");
self.close_internal().await
}

/// Internal close implementation
#[doc(hidden)]
async fn close_internal(&mut self) -> Result<(), TransportError>;
}

// Implement transport types
pub mod tcp;
pub mod error;

pub use error::TransportError;
pub use tcp::TcpTransport;
89 changes: 89 additions & 0 deletions plc4rs/src/spi/tcp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use tokio::net::TcpStream;
use crate::spi::{Transport, TransportError};
use crate::spi::config::TcpConfig;
use std::io;

pub struct TcpTransport {
stream: Option<TcpStream>,
address: String,
port: u16,
config: TcpConfig,
}

impl TcpTransport {
pub fn new(address: String, port: u16) -> Self {
Self::new_with_config(address, port, TcpConfig {
base: Default::default(),
no_delay: true,
keep_alive: true,
})
}

pub fn new_with_config(address: String, port: u16, config: TcpConfig) -> Self {
TcpTransport {
stream: None,
address,
port,
config,
}
}
}

impl Transport for TcpTransport {
async fn connect_internal(&mut self) -> Result<(), TransportError> {
if self.stream.is_some() {
return Err(TransportError::AlreadyConnected);
}

let addr = format!("{}:{}", self.address, self.port);
let stream = TcpStream::connect(addr).await?;

// Apply TCP-specific settings
stream.set_nodelay(self.config.no_delay)?;
stream.set_keepalive(self.config.keep_alive.then_some(self.config.base.connect_timeout))?;

self.stream = Some(stream);
Ok(())
}

async fn read(&mut self, buffer: &mut [u8]) -> Result<usize, TransportError> {
let stream = self.stream.as_mut()
.ok_or(TransportError::NotConnected)?;

use tokio::io::AsyncReadExt;
Ok(stream.read(buffer).await?)
}

async fn write(&mut self, data: &[u8]) -> Result<usize, TransportError> {
let stream = self.stream.as_mut()
.ok_or(TransportError::NotConnected)?;

use tokio::io::AsyncWriteExt;
Ok(stream.write(data).await?)
}

async fn close(&mut self) -> Result<(), TransportError> {
if let Some(stream) = self.stream.take() {
use tokio::io::AsyncWriteExt;
stream.shutdown().await?;
}
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;
use tokio_test::block_on;

#[test]
fn test_tcp_transport() {
let mut transport = TcpTransport::new("127.0.0.1".to_string(), 102);

// Test connection
block_on(async {
assert!(transport.connect().await.is_err()); // Should fail as no server is running
assert!(transport.stream.is_none());
});
}
}

0 comments on commit c6bf64f

Please sign in to comment.