diff --git a/Cargo.toml b/Cargo.toml index 35c51b80c..08a44c98e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,3 +6,6 @@ members = [ "nusamai", ] resolver = "2" + +[profile.release] +lto = true \ No newline at end of file diff --git a/nusamai-gpkg/Cargo.toml b/nusamai-gpkg/Cargo.toml index 98d256a55..4f0f2c104 100644 --- a/nusamai-gpkg/Cargo.toml +++ b/nusamai-gpkg/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" sqlx = { version = "0.7.3", features = ["sqlite", "runtime-tokio"] } nusamai-geometry = { path = "../nusamai-geometry" } thiserror = "1.0.51" +url = "2.5.0" [dev-dependencies] tokio = { version = "1.35.1", features = ["full"] } diff --git a/nusamai-gpkg/src/handler.rs b/nusamai-gpkg/src/handler.rs index 29c1cab99..d636f3c13 100644 --- a/nusamai-gpkg/src/handler.rs +++ b/nusamai-gpkg/src/handler.rs @@ -1,9 +1,9 @@ -use sqlx::sqlite::*; +use sqlx::{sqlite::*, ConnectOptions}; use sqlx::{Acquire, Row}; -use sqlx::{Pool, Sqlite, SqlitePool}; +use sqlx::{Pool, Sqlite}; use std::path::Path; -use std::str::FromStr; use thiserror::Error; +use url::Url; pub struct GpkgHandler { pool: Pool, @@ -18,20 +18,22 @@ pub enum GpkgError { } impl GpkgHandler { - /// Create and initialize new GeoPackage database - pub async fn init(path: &str) -> Result { - if Path::new(path).exists() { - return Err(GpkgError::DatabaseExists(path.to_string())); + /// Create and initialize new GeoPackage database at the specified path + pub async fn from_path(path: &Path) -> Result { + if path.exists() { + return Err(GpkgError::DatabaseExists(format!("{:?}", path))); } + let url = Url::parse(&format!("sqlite://{}", path.to_str().unwrap())).unwrap(); + Self::from_url(&url).await + } - let db_url = format!("sqlite://{}", path); - - let conn_opts = SqliteConnectOptions::from_str(&db_url)? + /// Create and initialize new GeoPackage database at the specified URL + pub async fn from_url(url: &Url) -> Result { + let conn_opts = SqliteConnectOptions::from_url(url)? .create_if_missing(true) .synchronous(SqliteSynchronous::Normal) .journal_mode(SqliteJournalMode::Wal); - SqlitePoolOptions::new().connect_with(conn_opts).await?; - let pool = SqlitePool::connect(&db_url).await?; + let pool = SqlitePoolOptions::new().connect_with(conn_opts).await?; // Initialize the database with minimum GeoPackage schema let create_query = include_str!("sql/init.sql"); @@ -44,12 +46,12 @@ impl GpkgHandler { Ok(Self { pool }) } - /// Connect to an existing GeoPackage database - pub async fn connect(path: &str) -> Result { - let db_url = format!("sqlite://{}", path); - let pool = SqlitePool::connect(&db_url).await?; - Ok(Self { pool }) - } + ///// Connect to an existing GeoPackage database + //pub async fn connect(path: &str) -> Result { + // let db_url = format!("sqlite://{}", path); + // let pool = SqlitePool::connect(&db_url).await?; + // Ok(Self { pool }) + //} pub async fn application_id(&self) -> u32 { let result = sqlx::query("PRAGMA application_id;") @@ -94,17 +96,6 @@ impl GpkgHandler { } } -pub async fn insert_feature(pool: &Pool, bytes: &[u8]) { - sqlx::query("INSERT INTO mpoly3d (geometry) VALUES (?)") - .bind(bytes) - .execute(pool) - .await - .unwrap(); - - // TODO: MultiLineString - // TODO: MultiPoint -} - pub struct GpkgTransaction<'c> { tx: sqlx::Transaction<'c, Sqlite>, } @@ -141,8 +132,9 @@ mod tests { #[tokio::test] async fn test_init_connect() { - let handler = GpkgHandler::init("sqlite::memory:").await.unwrap(); - let _handler2 = GpkgHandler::connect("sqlite::memory:").await.unwrap(); + let handler = GpkgHandler::from_url(&Url::parse("sqlite::memory:").unwrap()) + .await + .unwrap(); let application_id = handler.application_id().await; assert_eq!(application_id, 1196444487); diff --git a/nusamai/src/pipeline/feedback.rs b/nusamai/src/pipeline/feedback.rs index 8e18cf01d..0d24059b2 100644 --- a/nusamai/src/pipeline/feedback.rs +++ b/nusamai/src/pipeline/feedback.rs @@ -27,6 +27,12 @@ impl Feedback { self.cancelled.load(Ordering::Relaxed) } + /// Request the pipeline to be cancelled + #[inline] + pub fn cancel(&self) { + self.cancelled.store(true, Ordering::Relaxed) + } + /// Send a message to the feedback channel #[inline] pub fn feedback(&self, msg: FeedbackMessage) { diff --git a/nusamai/src/pipeline/runner.rs b/nusamai/src/pipeline/runner.rs index 366dc08f8..e3621aaf1 100644 --- a/nusamai/src/pipeline/runner.rs +++ b/nusamai/src/pipeline/runner.rs @@ -79,7 +79,9 @@ impl PipelineHandle { // Wait for the pipeline to terminate pub fn join(self) { self.thread_handles.into_iter().for_each(|handle| { - handle.join().unwrap(); + if let Err(err) = handle.join() { + eprintln!("Error: {:#?}", err); + } }); } } diff --git a/nusamai/src/sink/gpkg/mod.rs b/nusamai/src/sink/gpkg/mod.rs index 70f43b442..e03fb4826 100644 --- a/nusamai/src/sink/gpkg/mod.rs +++ b/nusamai/src/sink/gpkg/mod.rs @@ -2,6 +2,8 @@ use std::path::PathBuf; +use url::Url; + use rayon::prelude::*; use crate::parameters::Parameters; @@ -39,10 +41,10 @@ impl DataSinkProvider for GpkgSinkProvider { } fn create(&self, params: &Parameters) -> Box { - let output_path = get_parameter_value!(params, "@output", FileSystemPath); + let output_path = get_parameter_value!(params, "@output", FileSystemPath).unwrap(); Box::::new(GpkgSink { - output_path: output_path.unwrap().into(), + output_path: output_path.clone(), }) } } @@ -54,9 +56,17 @@ pub struct GpkgSink { impl GpkgSink { pub async fn run_async(&mut self, upstream: Receiver, feedback: &mut Feedback) { - let mut handler = GpkgHandler::init(self.output_path.to_str().unwrap()) + let mut handler = if self.output_path.to_string_lossy().starts_with("sqlite:") { + GpkgHandler::from_url(&Url::parse(self.output_path.to_str().unwrap()).unwrap()) + .await + .unwrap() + } else { + GpkgHandler::from_url( + &Url::parse(&format!("sqlite://{}", self.output_path.to_str().unwrap())).unwrap(), + ) .await - .unwrap(); + .unwrap() + }; let (sender, mut receiver) = tokio::sync::mpsc::channel(100); diff --git a/nusamai/src/sink/noop.rs b/nusamai/src/sink/noop/mod.rs similarity index 100% rename from nusamai/src/sink/noop.rs rename to nusamai/src/sink/noop/mod.rs diff --git a/nusamai/src/sink/serde.rs b/nusamai/src/sink/serde/mod.rs similarity index 100% rename from nusamai/src/sink/serde.rs rename to nusamai/src/sink/serde/mod.rs diff --git a/nusamai/src/source/citygml.rs b/nusamai/src/source/citygml.rs index a6e7aa3a6..2a69ba30a 100644 --- a/nusamai/src/source/citygml.rs +++ b/nusamai/src/source/citygml.rs @@ -1,3 +1,5 @@ +//! CityGML (.gml) Source Provider + use std::fs; use std::io::BufRead; use std::path::Path; @@ -68,10 +70,10 @@ impl DataSource for CityGMLSource { fn toplevel_dispatcher( st: &mut SubTreeReader, - sink: &Sender, + downstream: &Sender, feedback: &Feedback, ) -> Result<(), ParseError> { - match st.parse_children(|st| { + let result = st.parse_children(|st| { if feedback.is_cancelled() { return Err(ParseError::Cancelled); } @@ -88,7 +90,8 @@ fn toplevel_dispatcher( if let Some(root) = cityobj.into_object() { let cityobj = CityObject { root, geometries }; - if sink.send(Parcel { cityobj }).is_err() { + if downstream.send(Parcel { cityobj }).is_err() { + feedback.cancel(); return Ok(()); } } @@ -103,7 +106,8 @@ fn toplevel_dispatcher( String::from_utf8_lossy(other) ))), } - }) { + }); + match result { Ok(_) => Ok(()), Err(e) => { println!("Err: {:?}", e); diff --git a/nusamai/src/transform/mod.rs b/nusamai/src/transform/mod.rs index 44df3c239..2e4a32ed0 100644 --- a/nusamai/src/transform/mod.rs +++ b/nusamai/src/transform/mod.rs @@ -6,11 +6,13 @@ impl Transformer for NoopTransformer { fn transform( &self, parcel: Parcel, - sender: &Sender, - _feedback: &Feedback, + downstream: &Sender, + feedback: &Feedback, ) -> Result<(), TransformError> { // no-op - sender.send(parcel)?; + if downstream.send(parcel).is_err() { + feedback.cancel(); + }; Ok(()) } } diff --git a/nusamai/tests/mod.rs b/nusamai/tests/mod.rs new file mode 100644 index 000000000..4f919315d --- /dev/null +++ b/nusamai/tests/mod.rs @@ -0,0 +1 @@ +mod sink; diff --git a/nusamai/tests/noop_sink.rs b/nusamai/tests/noop_sink.rs deleted file mode 100644 index 69825d793..000000000 --- a/nusamai/tests/noop_sink.rs +++ /dev/null @@ -1,26 +0,0 @@ -use nusamai::sink::noop::NoopSinkProvider; -use nusamai::sink::DataSinkProvider; -use nusamai::source::citygml::CityGMLSourceProvider; -use nusamai::source::DataSourceProvider; -use nusamai::transform::NoopTransformer; - -#[test] -fn run_noop_sink() { - let source_provider: Box = Box::new(CityGMLSourceProvider { - filenames: vec![ - "../nusamai-plateau/tests/data/kawasaki-shi/udx/frn/53391597_frn_6697_op.gml" - .to_string(), - ], - }); - let sink_provider: Box = Box::new(NoopSinkProvider {}); - - let source = source_provider.create(&source_provider.parameters()); - let transformer = Box::new(NoopTransformer {}); - let sink = sink_provider.create(&sink_provider.parameters()); - - // start the pipeline - let (handle, _watcher, _canceller) = nusamai::pipeline::run(source, transformer, sink); - - // wait for the pipeline to finish - handle.join(); -} diff --git a/nusamai/tests/pipeline.rs b/nusamai/tests/pipeline.rs index 57f8beaa3..a804f4ea9 100644 --- a/nusamai/tests/pipeline.rs +++ b/nusamai/tests/pipeline.rs @@ -57,11 +57,11 @@ impl Transformer for NoopTransformer { fn transform( &self, parcel: Parcel, - sender: &Sender, + downstream: &Sender, _feedback: &feedback::Feedback, ) -> Result<(), TransformError> { // no-op - sender.send(parcel)?; + downstream.send(parcel)?; Ok(()) } } diff --git a/nusamai/tests/serde_sink.rs b/nusamai/tests/serde_sink.rs deleted file mode 100644 index b58da97ee..000000000 --- a/nusamai/tests/serde_sink.rs +++ /dev/null @@ -1,30 +0,0 @@ -use nusamai::sink::serde::SerdeSinkProvider; -use nusamai::sink::DataSinkProvider; -use nusamai::source::citygml::CityGMLSourceProvider; -use nusamai::source::DataSourceProvider; -use nusamai::transform::NoopTransformer; - -#[test] -fn run_serde_sink() { - let source_provider: Box = Box::new(CityGMLSourceProvider { - filenames: vec![ - "../nusamai-plateau/tests/data/kawasaki-shi/udx/frn/53391597_frn_6697_op.gml" - .to_string(), - ], - }); - let sink_provider: Box = Box::new(SerdeSinkProvider {}); - - let source = source_provider.create(&source_provider.parameters()); - let transformer = Box::new(NoopTransformer {}); - let mut sink_params = sink_provider.parameters(); - sink_params - .update_values_with_str(std::iter::once(&("@output".into(), "/dev/null".into()))) - .unwrap(); - let sink = sink_provider.create(&sink_params); - - // start the pipeline - let (handle, _watcher, _canceller) = nusamai::pipeline::run(source, transformer, sink); - - // wait for the pipeline to finish - handle.join(); -} diff --git a/nusamai/tests/sink.rs b/nusamai/tests/sink.rs new file mode 100644 index 000000000..b4190c3d9 --- /dev/null +++ b/nusamai/tests/sink.rs @@ -0,0 +1,61 @@ +use nusamai::sink::DataSinkProvider; +use nusamai::source::citygml::CityGMLSourceProvider; +use nusamai::source::DataSourceProvider; +use nusamai::transform::NoopTransformer; + +use nusamai::sink; + +pub(crate) fn simple_run_sink(sink_provider: S, output: Option<&str>) { + let source_provider: Box = Box::new(CityGMLSourceProvider { + filenames: vec![ + "../nusamai-plateau/tests/data/kawasaki-shi/udx/frn/53391597_frn_6697_op.gml" + .to_string(), + ], + }); + assert_eq!(source_provider.info().name, "CityGML"); + + let source = source_provider.create(&source_provider.parameters()); + + let transformer = Box::new(NoopTransformer {}); + + assert!(!sink_provider.info().name.is_empty()); + let mut sink_params = sink_provider.parameters(); + if let Some(output) = output { + sink_params + .update_values_with_str(std::iter::once(&("@output".into(), output.into()))) + .unwrap(); + } + sink_params.validate().unwrap(); + let sink = sink_provider.create(&sink_params); + + let (handle, _watcher, canceller) = nusamai::pipeline::run(source, transformer, sink); + handle.join(); + + // should not be cancelled + assert!(!canceller.is_cancelled()); +} + +#[test] +fn run_serde_sink() { + simple_run_sink(sink::serde::SerdeSinkProvider {}, "/dev/null".into()); +} + +#[test] +fn run_noop_sink() { + simple_run_sink(sink::noop::NoopSinkProvider {}, None); +} + +#[test] +fn run_geojson_sink() { + simple_run_sink(sink::geojson::GeoJsonSinkProvider {}, "/dev/null".into()); +} + +#[test] +fn run_gpkg_sink() { + simple_run_sink(sink::gpkg::GpkgSinkProvider {}, "sqlite::memory:".into()); +} + +#[test] +fn run_tiling2d_sink() { + simple_run_sink(sink::tiling2d::Tiling2DSinkProvider {}, "/dev/null".into()); +}