Skip to content

Commit

Permalink
pipeline: monor refactoring and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ciscorn committed Jan 6, 2024
1 parent 6f9e5b8 commit 5dc0bb2
Show file tree
Hide file tree
Showing 15 changed files with 127 additions and 101 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,6 @@ members = [
"nusamai",
]
resolver = "2"

[profile.release]
lto = true
1 change: 1 addition & 0 deletions nusamai-gpkg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ edition = "2021"
sqlx = { version = "0.7.3", features = ["sqlite", "runtime-tokio"] }
nusamai-geometry = { path = "../nusamai-geometry" }
thiserror = "1.0.51"
url = "2.5.0"

[dev-dependencies]
tokio = { version = "1.35.1", features = ["full"] }
54 changes: 23 additions & 31 deletions nusamai-gpkg/src/handler.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use sqlx::sqlite::*;
use sqlx::{sqlite::*, ConnectOptions};
use sqlx::{Acquire, Row};
use sqlx::{Pool, Sqlite, SqlitePool};
use sqlx::{Pool, Sqlite};
use std::path::Path;
use std::str::FromStr;
use thiserror::Error;
use url::Url;

pub struct GpkgHandler {
pool: Pool<Sqlite>,
Expand All @@ -18,20 +18,22 @@ pub enum GpkgError {
}

impl GpkgHandler {
/// Create and initialize new GeoPackage database
pub async fn init(path: &str) -> Result<Self, GpkgError> {
if Path::new(path).exists() {
return Err(GpkgError::DatabaseExists(path.to_string()));
/// Create and initialize new GeoPackage database at the specified path
pub async fn from_path(path: &Path) -> Result<Self, GpkgError> {
if path.exists() {
return Err(GpkgError::DatabaseExists(format!("{:?}", path)));
}
let url = Url::parse(&format!("sqlite://{}", path.to_str().unwrap())).unwrap();
Self::from_url(&url).await
}

let db_url = format!("sqlite://{}", path);

let conn_opts = SqliteConnectOptions::from_str(&db_url)?
/// Create and initialize new GeoPackage database at the specified URL
pub async fn from_url(url: &Url) -> Result<Self, GpkgError> {
let conn_opts = SqliteConnectOptions::from_url(url)?
.create_if_missing(true)
.synchronous(SqliteSynchronous::Normal)
.journal_mode(SqliteJournalMode::Wal);
SqlitePoolOptions::new().connect_with(conn_opts).await?;
let pool = SqlitePool::connect(&db_url).await?;
let pool = SqlitePoolOptions::new().connect_with(conn_opts).await?;

// Initialize the database with minimum GeoPackage schema
let create_query = include_str!("sql/init.sql");
Expand All @@ -44,12 +46,12 @@ impl GpkgHandler {
Ok(Self { pool })
}

/// Connect to an existing GeoPackage database
pub async fn connect(path: &str) -> Result<Self, GpkgError> {
let db_url = format!("sqlite://{}", path);
let pool = SqlitePool::connect(&db_url).await?;
Ok(Self { pool })
}
///// Connect to an existing GeoPackage database
//pub async fn connect(path: &str) -> Result<Self, GpkgError> {
// let db_url = format!("sqlite://{}", path);
// let pool = SqlitePool::connect(&db_url).await?;
// Ok(Self { pool })
//}

pub async fn application_id(&self) -> u32 {
let result = sqlx::query("PRAGMA application_id;")
Expand Down Expand Up @@ -94,17 +96,6 @@ impl GpkgHandler {
}
}

pub async fn insert_feature(pool: &Pool<Sqlite>, bytes: &[u8]) {
sqlx::query("INSERT INTO mpoly3d (geometry) VALUES (?)")
.bind(bytes)
.execute(pool)
.await
.unwrap();

// TODO: MultiLineString
// TODO: MultiPoint
}

pub struct GpkgTransaction<'c> {
tx: sqlx::Transaction<'c, Sqlite>,
}
Expand Down Expand Up @@ -141,8 +132,9 @@ mod tests {

#[tokio::test]
async fn test_init_connect() {
let handler = GpkgHandler::init("sqlite::memory:").await.unwrap();
let _handler2 = GpkgHandler::connect("sqlite::memory:").await.unwrap();
let handler = GpkgHandler::from_url(&Url::parse("sqlite::memory:").unwrap())
.await
.unwrap();

let application_id = handler.application_id().await;
assert_eq!(application_id, 1196444487);
Expand Down
6 changes: 6 additions & 0 deletions nusamai/src/pipeline/feedback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ impl Feedback {
self.cancelled.load(Ordering::Relaxed)
}

/// Request the pipeline to be cancelled
#[inline]
pub fn cancel(&self) {
self.cancelled.store(true, Ordering::Relaxed)
}

/// Send a message to the feedback channel
#[inline]
pub fn feedback(&self, msg: FeedbackMessage) {
Expand Down
4 changes: 3 additions & 1 deletion nusamai/src/pipeline/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ impl PipelineHandle {
// Wait for the pipeline to terminate
pub fn join(self) {
self.thread_handles.into_iter().for_each(|handle| {
handle.join().unwrap();
if let Err(err) = handle.join() {
eprintln!("Error: {:#?}", err);
}
});
}
}
Expand Down
18 changes: 14 additions & 4 deletions nusamai/src/sink/gpkg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
use std::path::PathBuf;

use url::Url;

use rayon::prelude::*;

use crate::parameters::Parameters;
Expand Down Expand Up @@ -39,10 +41,10 @@ impl DataSinkProvider for GpkgSinkProvider {
}

fn create(&self, params: &Parameters) -> Box<dyn DataSink> {
let output_path = get_parameter_value!(params, "@output", FileSystemPath);
let output_path = get_parameter_value!(params, "@output", FileSystemPath).unwrap();

Box::<GpkgSink>::new(GpkgSink {
output_path: output_path.unwrap().into(),
output_path: output_path.clone(),
})
}
}
Expand All @@ -54,9 +56,17 @@ pub struct GpkgSink {

impl GpkgSink {
pub async fn run_async(&mut self, upstream: Receiver, feedback: &mut Feedback) {
let mut handler = GpkgHandler::init(self.output_path.to_str().unwrap())
let mut handler = if self.output_path.to_string_lossy().starts_with("sqlite:") {
GpkgHandler::from_url(&Url::parse(self.output_path.to_str().unwrap()).unwrap())
.await
.unwrap()
} else {
GpkgHandler::from_url(
&Url::parse(&format!("sqlite://{}", self.output_path.to_str().unwrap())).unwrap(),
)
.await
.unwrap();
.unwrap()
};

let (sender, mut receiver) = tokio::sync::mpsc::channel(100);

Expand Down
File renamed without changes.
File renamed without changes.
12 changes: 8 additions & 4 deletions nusamai/src/source/citygml.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! CityGML (.gml) Source Provider
use std::fs;
use std::io::BufRead;
use std::path::Path;
Expand Down Expand Up @@ -68,10 +70,10 @@ impl DataSource for CityGMLSource {

fn toplevel_dispatcher<R: BufRead>(
st: &mut SubTreeReader<R>,
sink: &Sender,
downstream: &Sender,
feedback: &Feedback,
) -> Result<(), ParseError> {
match st.parse_children(|st| {
let result = st.parse_children(|st| {
if feedback.is_cancelled() {
return Err(ParseError::Cancelled);
}
Expand All @@ -88,7 +90,8 @@ fn toplevel_dispatcher<R: BufRead>(

if let Some(root) = cityobj.into_object() {
let cityobj = CityObject { root, geometries };
if sink.send(Parcel { cityobj }).is_err() {
if downstream.send(Parcel { cityobj }).is_err() {
feedback.cancel();
return Ok(());
}
}
Expand All @@ -103,7 +106,8 @@ fn toplevel_dispatcher<R: BufRead>(
String::from_utf8_lossy(other)
))),
}
}) {
});
match result {
Ok(_) => Ok(()),
Err(e) => {
println!("Err: {:?}", e);
Expand Down
8 changes: 5 additions & 3 deletions nusamai/src/transform/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ impl Transformer for NoopTransformer {
fn transform(
&self,
parcel: Parcel,
sender: &Sender,
_feedback: &Feedback,
downstream: &Sender,
feedback: &Feedback,
) -> Result<(), TransformError> {
// no-op
sender.send(parcel)?;
if downstream.send(parcel).is_err() {
feedback.cancel();
};
Ok(())
}
}
1 change: 1 addition & 0 deletions nusamai/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
mod sink;
26 changes: 0 additions & 26 deletions nusamai/tests/noop_sink.rs

This file was deleted.

4 changes: 2 additions & 2 deletions nusamai/tests/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ impl Transformer for NoopTransformer {
fn transform(
&self,
parcel: Parcel,
sender: &Sender,
downstream: &Sender,
_feedback: &feedback::Feedback,
) -> Result<(), TransformError> {
// no-op
sender.send(parcel)?;
downstream.send(parcel)?;
Ok(())
}
}
Expand Down
30 changes: 0 additions & 30 deletions nusamai/tests/serde_sink.rs

This file was deleted.

61 changes: 61 additions & 0 deletions nusamai/tests/sink.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use nusamai::sink::DataSinkProvider;
use nusamai::source::citygml::CityGMLSourceProvider;
use nusamai::source::DataSourceProvider;
use nusamai::transform::NoopTransformer;

use nusamai::sink;

pub(crate) fn simple_run_sink<S: DataSinkProvider>(sink_provider: S, output: Option<&str>) {
let source_provider: Box<dyn DataSourceProvider> = Box::new(CityGMLSourceProvider {
filenames: vec![
"../nusamai-plateau/tests/data/kawasaki-shi/udx/frn/53391597_frn_6697_op.gml"
.to_string(),
],
});
assert_eq!(source_provider.info().name, "CityGML");

let source = source_provider.create(&source_provider.parameters());

let transformer = Box::new(NoopTransformer {});

assert!(!sink_provider.info().name.is_empty());
let mut sink_params = sink_provider.parameters();
if let Some(output) = output {
sink_params
.update_values_with_str(std::iter::once(&("@output".into(), output.into())))
.unwrap();
}
sink_params.validate().unwrap();
let sink = sink_provider.create(&sink_params);

let (handle, _watcher, canceller) = nusamai::pipeline::run(source, transformer, sink);
handle.join();

// should not be cancelled
assert!(!canceller.is_cancelled());
}

#[test]
fn run_serde_sink() {
simple_run_sink(sink::serde::SerdeSinkProvider {}, "/dev/null".into());
}

#[test]
fn run_noop_sink() {
simple_run_sink(sink::noop::NoopSinkProvider {}, None);
}

#[test]
fn run_geojson_sink() {
simple_run_sink(sink::geojson::GeoJsonSinkProvider {}, "/dev/null".into());
}

#[test]
fn run_gpkg_sink() {
simple_run_sink(sink::gpkg::GpkgSinkProvider {}, "sqlite::memory:".into());
}

#[test]
fn run_tiling2d_sink() {
simple_run_sink(sink::tiling2d::Tiling2DSinkProvider {}, "/dev/null".into());
}

0 comments on commit 5dc0bb2

Please sign in to comment.