Skip to content

Commit 8fc7273

Browse files
committed
setup tests for error streams
1 parent e328931 commit 8fc7273

File tree

4 files changed

+191
-3
lines changed

4 files changed

+191
-3
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,5 @@
22
/uniffi-rs
33
/Cargo.lock
44
.aider*
5+
rust-toolchain.toml
6+
internal_todo.md

fixtures/streams_ext/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ lazy_static = "1.4.0"
1818
futures = "0.3"
1919
async-stream = "0.3"
2020
tokio = { version = "1.0", features = ["full"] }
21+
thiserror = "1.0.66"
2122

2223
[build-dependencies]
2324
uniffi-dart = { path = "../../", features = ["build"] }

fixtures/streams_ext/src/api copy.udl

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
[Error]
2+
enum StreamErrorInt {
3+
"IntegerError",
4+
};
5+
6+
[Error]
7+
enum StreamErrorString {
8+
"StringError",
9+
};
10+
11+
namespace streams_ext {
12+
[Throws=StreamErrorInt]
13+
i32? error_stream();
14+
15+
[Throws=StreamErrorString]
16+
string? combined_error_streams();
17+
};
18+

fixtures/streams_ext/src/lib.rs

Lines changed: 170 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,22 @@
11
use async_stream::stream;
22
use futures::stream::{self, Stream, StreamExt};
3+
use futures::TryStreamExt;
34
use std::pin::Pin;
45
use tokio::time::{interval, Duration};
56

7+
// // Define custom error enums
8+
// #[derive(Debug, thiserror::Error)]
9+
// pub enum StreamErrorInt {
10+
// #[error("An integer error occurred: {0}")]
11+
// IntegerError(String),
12+
// }
13+
14+
// #[derive(Debug, thiserror::Error)]
15+
// pub enum StreamErrorString {
16+
// #[error("A string error occurred: {0}")]
17+
// StringError(String),
18+
// }
19+
620
#[uniffi_dart::export_stream(i32)]
721
pub fn simple_stream() -> impl Stream<Item = i32> {
822
stream::iter(0..5)
@@ -27,9 +41,6 @@ pub fn fibonacci_stream() -> Pin<Box<dyn Stream<Item = u64> + Send>> {
2741
})
2842
}
2943

30-
// pub fn alphabet_stream() -> Pin<Box<dyn Stream<Item = String> + Send>> {
31-
// Box::pin(stream::iter('A'..='Z'))
32-
// }
3344

3445
#[uniffi_dart::export_stream(u64)]
3546
pub fn async_timer_stream() -> Pin<Box<dyn Stream<Item = u64> + Send>> {
@@ -54,6 +65,64 @@ pub fn combined_streams() -> impl Stream<Item = String> + Send {
5465
stream::select(stream1, stream3)
5566
}
5667

68+
// pub fn error_stream() -> impl Stream<Item = Result<i32,StreamErrorInt> > +Send {
69+
// {
70+
// let(mut __yield_tx,__yield_rx) = unsafe {
71+
// async_stream::__private::yielder::pair()
72+
// };
73+
// async_stream::__private::AsyncStream::new(__yield_rx,async move {
74+
// __yield_tx.send(Ok(1)).await;
75+
// __yield_tx.send(Ok(2)).await;
76+
// __yield_tx.send(Err(StreamErrorInt::IntegerError("An error occurred".to_string()))).await;
77+
// __yield_tx.send(Ok(4)).await;
78+
// })
79+
// }
80+
// }
81+
// #[derive(uniffi::Object)]
82+
// pub struct ErrorStreamStreamExt {
83+
// stream:tokio::sync::Mutex<std::pin::Pin<Box<dyn futures::Stream<Item = Result<i32,StreamErrorInt> > +Send>> > ,
84+
// }
85+
// impl ErrorStreamStreamExt {
86+
// pub fn new() -> std::sync::Arc<Self>{
87+
// std::sync::Arc::new(Self {
88+
// stream:tokio::sync::Mutex::new(Box::pin(error_stream())),
89+
// })
90+
// }
91+
// pub async fn next(&self) -> Option<Result<i32,StreamErrorInt> >{
92+
// let mut stream = self.stream.lock().await;
93+
// stream.as_mut().next().await
94+
// }
95+
96+
// }
97+
98+
// #[uniffi_dart::export_stream(Result<i32, StreamErrorInt>)]
99+
// pub fn error_stream() -> impl Stream<Item = Result<i32, StreamErrorInt>> + Send {
100+
// stream! {
101+
// yield Ok(1);
102+
// yield Ok(2);
103+
// yield Err("An error occurred".to_string());
104+
// yield Ok(4);
105+
// }
106+
// }
107+
108+
// #[uniffi_dart::export_stream(Result<i32, StreamErrorString>)]
109+
// pub fn combined_error_streams() -> impl Stream<Item = Result<i32, StreamErrorString>> + Send {
110+
// let stream1 = count_stream()
111+
// .take(3)
112+
// .map(|n| Ok(format!("Count: {}", n)));
113+
// let stream3 = fibonacci_stream()
114+
// .take(3)
115+
// .map(|n| {
116+
// if n == 2 {
117+
// Err("Fibonacci error".to_string())
118+
// } else {
119+
// Ok(format!("Fibonacci: {}", n))
120+
// }
121+
// });
122+
123+
// stream::select(stream1, stream3)
124+
// }
125+
57126
#[cfg(test)]
58127
mod tests {
59128
use super::*;
@@ -169,6 +238,104 @@ mod tests {
169238
// The next call should return None
170239
assert_eq!(instance.next().await, None);
171240
}
241+
242+
// #[tokio::test]
243+
// async fn test_error_stream() {
244+
// let mut stream = error_stream();
245+
// let mut results = Vec::new();
246+
247+
// while let Some(item) = stream.next().await {
248+
// match item {
249+
// Ok(value) => results.push(value),
250+
// Err(e) => {
251+
// results.push(-1); // Using -1 to indicate an error occurred
252+
// println!("Stream error: {}", e);
253+
// }
254+
// }
255+
// }
256+
257+
// assert_eq!(results, vec![1, 2, -1, 4]);
258+
// }
259+
260+
// #[tokio::test]
261+
// async fn test_combined_error_streams() {
262+
// let mut stream = combined_error_streams();
263+
// let mut results = Vec::new();
264+
265+
// while let Some(item) = stream.next().await {
266+
// match item {
267+
// Ok(value) => results.push(value),
268+
// Err(e) => {
269+
// results.push("Error".to_string());
270+
// println!("Combined stream error: {}", e);
271+
// }
272+
// }
273+
// }
274+
275+
// assert_eq!(
276+
// results,
277+
// vec![
278+
// "Count: 0".to_string(),
279+
// "Count: 1".to_string(),
280+
// "Count: 2".to_string(),
281+
// "Fibonacci: 0".to_string(),
282+
// "Fibonacci: 1".to_string(),
283+
// "Error".to_string(),
284+
// ]
285+
// );
286+
// }
287+
288+
// #[tokio::test]
289+
// async fn test_error_stream_with_timeout() {
290+
// let mut stream = error_stream();
291+
// let result = timeout(Duration::from_secs(1), async {
292+
// let mut collected = Vec::new();
293+
// while let Some(item) = stream.next().await {
294+
// collected.push(item);
295+
// }
296+
// collected
297+
// })
298+
// .await;
299+
300+
// match result {
301+
// Ok(items) => {
302+
// assert_eq!(
303+
// items,
304+
// vec![
305+
// Ok(1),
306+
// Ok(2),
307+
// Err(StreamErrorInt::IntegerError("An error occurred".to_string())),
308+
// Ok(4)
309+
// ]
310+
// );
311+
// }
312+
// Err(_) => panic!("Timeout occurred while collecting error stream"),
313+
// }
314+
// }
315+
316+
317+
// #[tokio::test]
318+
// async fn test_combined_error_streams_handling() {
319+
// let mut stream = combined_error_streams();
320+
// let mut counts = 0;
321+
// let mut fibs = 0;
322+
// let mut errors = 0;
323+
324+
// while let Some(item) = stream.next().await {
325+
// let item: Result<String, StreamErrorString> = item; // Explicit type annotation
326+
327+
// match item {
328+
// Ok(ref s) if s.starts_with("Count:") => counts += 1,
329+
// Ok(ref s) if s.starts_with("Fibonacci:") => fibs += 1,
330+
// Err(_) => errors += 1,
331+
// _ => {}
332+
// }
333+
// }
334+
335+
// assert_eq!(counts, 3);
336+
// assert_eq!(fibs, 2); // One Fibonacci stream yields an error
337+
// assert_eq!(errors, 1);
338+
// }
172339
}
173340

174341
uniffi::include_scaffolding!("api");

0 commit comments

Comments
 (0)