Skip to content

Commit

Permalink
Feedback機構を少し改善 (#439)
Browse files Browse the repository at this point in the history
rel: #433 

変換パイプラインのFeedback機構を少し改善:

- 子スレッド内で起きたエラーを(システムログだけでなく)エラーとして報告するようにする(GUIでダイアログを表示できるようにする準備)。
- メッセージがパイプラインのどの要素 (Source, Transformer, SInk, etc.)
から送信されたかを把握できるようにする。
- 子スレッド内での panic がパイプラインのどの要素 (Source, Transformer, SInk, etc.)
で起きたか分かるようにする。
- など

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit


- **新機能**
- より詳細なログ記録メカニズムを追加しました。これは、ウォッチャーからのメッセージに関するエラーの処理とメッセージソースの表示を含みます。
-
新しい`FeedbackSourceComponent`列挙型を追加しました。フィードバックメッセージにソースコンポーネントを含むようにしました。

- **リファクタリング**
    - フィードバック構造体の管理方法を更新し、コンポーネントに基づいて新しいフィードバックスパンを作成するメソッドを導入しました。
-
スレッド処理を個別に結合し、パニック時にエラーをログに記録するように`PipelineHandle`のスレッド処理をリファクタリングしました。

- **テスト**
-
テストファイルに`FeedbackSourceComponent`を追加し、フィードバックメッセージのためのソースコンポーネントの指定を更新しました。

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
  • Loading branch information
ciscorn authored Mar 8, 2024
1 parent 4e006df commit b2e121a
Show file tree
Hide file tree
Showing 15 changed files with 156 additions and 61 deletions.
2 changes: 1 addition & 1 deletion nusamai-citygml/src/object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,10 @@ impl Value {
.iter()
.map(|(k, v)| (k.into(), v.to_attribute_json())),
);
m.insert("type".into(), cls.typename.clone().into());
if let Some(id) = cls.stereotype.id() {
m.insert("id".into(), serde_json::Value::String(id.into()));
}
m.insert("type".into(), cls.typename.clone().into());
serde_json::Value::Object(m)
}
}
Expand Down
10 changes: 9 additions & 1 deletion nusamai/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,15 @@ fn run(
// log watcher
scope.spawn(move || {
for msg in watcher {
log::info!("Feedback message from the pipeline {:?}", msg);
let msg_source = format!("{:?}", msg.source_component);
match msg.error {
Some(error) => {
log::log!(msg.level, "[{msg_source}]: {}: {error:?}", msg.message);
}
None => {
log::log!(msg.level, "[{msg_source}]: {}", msg.message);
}
}
}
});
});
Expand Down
80 changes: 69 additions & 11 deletions nusamai/src/pipeline/feedback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,32 @@ use super::PipelineError;

const FEEDBACK_CHANNEL_BOUND: usize = 10000;

#[derive(Debug, Default)]
pub struct FeedbackMessage {
#[derive(Debug)]
pub struct Message {
/// Log message body
pub message: String,
/// Log level
pub level: log::Level,
/// Message source (source, transformer, sink, pipeline, etc.)
pub source_component: SourceComponent,
pub error: Option<PipelineError>,
// severity:
// progress:
// source:
// etc.
}

#[derive(Clone)]
pub struct Feedback {
canceled: Arc<AtomicBool>,
sender: std::sync::mpsc::SyncSender<FeedbackMessage>,
source_component: SourceComponent,
sender: std::sync::mpsc::SyncSender<Message>,
}

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum SourceComponent {
Source,
Transformer,
Sink,
Pipeline,
}

impl Feedback {
Expand All @@ -46,31 +58,76 @@ impl Feedback {
self.canceled.store(true, Ordering::Relaxed)
}

/// Create a new feedback span for the pipeline component
#[inline]
pub fn component_span(&self, source: SourceComponent) -> Self {
Self {
source_component: source,
..self.clone()
}
}

/// Send a message to the feedback channel
#[inline]
pub fn feedback_raw(&self, msg: FeedbackMessage) {
pub fn send_raw_message(&self, msg: Message) {
// don't care if the receiver is dropped.
let _ = self.sender.send(msg);
}

#[inline]
pub fn send_message(&self, message: String, level: log::Level) {
self.send_raw_message(Message {
message,
level,
source_component: self.source_component,
error: None,
})
}

/// Send a debug log
#[inline]
pub fn debug(&self, message: String) {
self.send_message(message, log::Level::Debug)
}

/// Send a info log
#[inline]
pub fn info(&self, message: String) {
self.send_message(message, log::Level::Info)
}

/// Send a warning log
#[inline]
pub fn warning(&self, message: String) {
self.send_message(message, log::Level::Warn)
}

/// Send an error log
#[inline]
pub fn error(&self, message: String) {
self.send_message(message, log::Level::Error)
}

/// Report a fatal error and cancel the pipeline
#[inline]
pub fn report_fatal_error(&self, error: PipelineError) {
pub fn fatal_error(&self, error: PipelineError) {
self.cancel();
let _ = self.sender.send(FeedbackMessage {
let _ = self.sender.send(Message {
message: "Fatal error".to_string(),
level: log::Level::Error,
source_component: self.source_component,
error: Some(error),
});
}
}

pub struct Watcher {
receiver: std::sync::mpsc::Receiver<FeedbackMessage>,
receiver: std::sync::mpsc::Receiver<Message>,
}

impl IntoIterator for Watcher {
type Item = FeedbackMessage;
type IntoIter = std::sync::mpsc::IntoIter<FeedbackMessage>;
type Item = Message;
type IntoIter = std::sync::mpsc::IntoIter<Message>;

fn into_iter(self) -> Self::IntoIter {
self.receiver.into_iter()
Expand Down Expand Up @@ -103,6 +160,7 @@ pub(crate) fn watcher() -> (Watcher, Feedback, Canceller) {
};
let feedback = Feedback {
canceled: canceled.clone(),
source_component: SourceComponent::Pipeline,
sender,
};
(watcher, feedback, canceller)
Expand Down
70 changes: 52 additions & 18 deletions nusamai/src/pipeline/runner.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::sync::{mpsc::sync_channel, Arc};
use std::thread;

use nusamai_citygml::schema::Schema;
use rayon::ThreadPoolBuilder;
Expand All @@ -14,12 +15,23 @@ use crate::{pipeline::Receiver, transformer::Transformer};
const SOURCE_OUTPUT_CHANNEL_BOUND: usize = 10000;
const TRANSFORMER_OUTPUT_CHANNEL_BOUND: usize = 10000;

fn run_source_thread(
fn spawn_thread<F, T>(name: String, f: F) -> std::thread::JoinHandle<T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
thread::Builder::new()
.name(name)
.spawn(f)
.expect("Failed to spawn thread")
}

fn spawn_source_thread(
mut source: Box<dyn DataSource>,
feedback: Feedback,
) -> (std::thread::JoinHandle<()>, Receiver) {
let (sender, receiver) = sync_channel(SOURCE_OUTPUT_CHANNEL_BOUND);
let handle = std::thread::spawn(move || {
let handle = spawn_thread("pipeline-source".to_string(), move || {
log::info!("Source thread started.");
let num_threads = std::thread::available_parallelism()
.map(|v| v.get() * 3)
Expand All @@ -31,43 +43,44 @@ fn run_source_thread(
.unwrap();
pool.install(move || {
if let Err(error) = source.run(sender, &feedback) {
feedback.report_fatal_error(error);
feedback.fatal_error(error);
}
});
log::info!("Source thread finished.");
});
(handle, receiver)
}

fn run_transformer_thread(
fn spawn_transformer_thread(
transformer: Box<dyn Transformer>,
upstream: Receiver,
feedback: Feedback,
) -> (std::thread::JoinHandle<()>, Receiver) {
let (sender, receiver) = sync_channel(TRANSFORMER_OUTPUT_CHANNEL_BOUND);
let handle = std::thread::spawn(move || {

let handle = spawn_thread("pipeline-transformer".to_string(), move || {
log::info!("Transformer thread started.");
let pool = ThreadPoolBuilder::new()
.use_current_thread()
.build()
.unwrap();
pool.install(move || {
if let Err(error) = transformer.run(upstream, sender, &feedback) {
feedback.report_fatal_error(error);
feedback.fatal_error(error);
}
});
log::info!("Transformer thread finished.");
});
(handle, receiver)
}

fn run_sink_thread(
fn spawn_sink_thread(
mut sink: Box<dyn DataSink>,
schema: Arc<Schema>,
upstream: Receiver,
feedback: Feedback,
) -> std::thread::JoinHandle<()> {
std::thread::spawn(move || {
spawn_thread("pipeline-sink".to_string(), move || {
log::info!("Sink thread started.");
let num_threads = std::thread::available_parallelism()
.map(|v| v.get() * 3)
Expand All @@ -79,23 +92,31 @@ fn run_sink_thread(
.unwrap();
pool.install(move || {
if let Err(error) = sink.run(upstream, &feedback, &schema) {
feedback.report_fatal_error(error);
feedback.fatal_error(error);
}
});
log::info!("Sink thread finished.");
})
}

pub struct PipelineHandle {
thread_handles: Vec<std::thread::JoinHandle<()>>,
source_thread_handle: std::thread::JoinHandle<()>,
transformer_thread_handle: std::thread::JoinHandle<()>,
sink_thread_handle: std::thread::JoinHandle<()>,
}

impl PipelineHandle {
// Wait for the pipeline to terminate
pub fn join(self) {
self.thread_handles.into_iter().for_each(|handle| {
handle.join().unwrap();
});
if self.source_thread_handle.join().is_err() {
log::error!("Source thread panicked");
}
if self.transformer_thread_handle.join().is_err() {
log::error!("Transformer thread panicked");
}
if self.sink_thread_handle.join().is_err() {
log::error!("Sink thread panicked");
}
}
}

Expand All @@ -111,13 +132,26 @@ pub fn run(
let (watcher, feedback, canceller) = watcher();

// Start the pipeline
let (source_thread, source_receiver) = run_source_thread(source, feedback.clone());
let (transformer_thread, transformer_receiver) =
run_transformer_thread(transformer, source_receiver, feedback.clone());
let sink_thread = run_sink_thread(sink, schema, transformer_receiver, feedback.clone());
let (source_thread_handle, source_receiver) = spawn_source_thread(
source,
feedback.component_span(super::SourceComponent::Source),
);
let (transformer_thread_handle, transformer_receiver) = spawn_transformer_thread(
transformer,
source_receiver,
feedback.component_span(super::SourceComponent::Transformer),
);
let sink_thread_handle = spawn_sink_thread(
sink,
schema,
transformer_receiver,
feedback.component_span(super::SourceComponent::Sink),
);

let handle = PipelineHandle {
thread_handles: vec![source_thread, transformer_thread, sink_thread],
source_thread_handle,
transformer_thread_handle,
sink_thread_handle,
};
(handle, watcher, canceller)
}
6 changes: 3 additions & 3 deletions nusamai/src/sink/cesiumtiles/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl DataSink for CesiumTilesSink {
min_zoom,
max_zoom,
) {
feedback.report_fatal_error(error);
feedback.fatal_error(error);
}
});
}
Expand All @@ -132,7 +132,7 @@ impl DataSink for CesiumTilesSink {
if let Err(error) =
feature_sorting_stage(feedback, receiver_sliced, sender_sorted)
{
feedback.report_fatal_error(error);
feedback.fatal_error(error);
}
});
}
Expand All @@ -154,7 +154,7 @@ impl DataSink for CesiumTilesSink {
tile_id_conv,
schema,
) {
feedback.report_fatal_error(error);
feedback.fatal_error(error);
}
})
});
Expand Down
5 changes: 3 additions & 2 deletions nusamai/src/sink/cesiumtiles/tiling/scheme.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::ops::Range;
//! Our tiling scheme for the 3D Tiles
/// Our tiling scheme for the 3D Tiles
use std::ops::Range;

// Get the position of the most significant bit
fn msb(d: u32) -> u32 {
u32::BITS - d.leading_zeros()
}
Expand Down
4 changes: 2 additions & 2 deletions nusamai/src/sink/czml/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,11 @@ impl DataSink for CzmlSink {

match ra {
Ok(_) | Err(PipelineError::Canceled) => {}
Err(error) => feedback.report_fatal_error(error),
Err(error) => feedback.fatal_error(error),
}
match rb {
Ok(_) | Err(PipelineError::Canceled) => {}
Err(error) => feedback.report_fatal_error(error),
Err(error) => feedback.fatal_error(error),
}

Ok(())
Expand Down
4 changes: 2 additions & 2 deletions nusamai/src/sink/geojson/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,11 @@ impl DataSink for GeoJsonSink {

match ra {
Ok(_) | Err(PipelineError::Canceled) => {}
Err(error) => feedback.report_fatal_error(error),
Err(error) => feedback.fatal_error(error),
}
match rb {
Ok(_) | Err(PipelineError::Canceled) => {}
Err(error) => feedback.report_fatal_error(error),
Err(error) => feedback.fatal_error(error),
}

Ok(())
Expand Down
4 changes: 2 additions & 2 deletions nusamai/src/sink/kml/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,11 @@ impl DataSink for KmlSink {

match ra {
Ok(_) | Err(PipelineError::Canceled) => {}
Err(error) => feedback.report_fatal_error(error),
Err(error) => feedback.fatal_error(error),
}
match rb {
Ok(_) | Err(PipelineError::Canceled) => {}
Err(error) => feedback.report_fatal_error(error),
Err(error) => feedback.fatal_error(error),
}

Ok(())
Expand Down
Loading

0 comments on commit b2e121a

Please sign in to comment.