Skip to content

Commit aa0fe6e

Browse files
docs: release sdf version sdf-beta5 [bot] (#316)
Updating sdf-beta5 Auto-generated by [create-pull-request](https://github.com/peter-evans/create-pull-request) GitHub action Co-authored-by: morenol <22335041+morenol@users.noreply.github.com>
1 parent f41f2cb commit aa0fe6e

File tree

95 files changed

+7572
-0
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

95 files changed

+7572
-0
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
sdf-beta5
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
# dataflow.yaml
2+
apiVersion: 0.5.0
3+
meta:
4+
name: arrow-example
5+
version: 0.1.0
6+
namespace: examples
7+
8+
config:
9+
converter: json
10+
11+
types:
12+
order:
13+
type: object
14+
properties:
15+
name:
16+
type: string
17+
amount:
18+
type: u32
19+
price:
20+
type: f32
21+
22+
topics:
23+
buy:
24+
schema:
25+
value:
26+
type: order
27+
sell:
28+
schema:
29+
value:
30+
type: order
31+
command:
32+
schema:
33+
value:
34+
type: string
35+
converter: raw
36+
message:
37+
schema:
38+
value:
39+
type: string
40+
41+
42+
services:
43+
interface:
44+
sources:
45+
- type: topic
46+
id: command
47+
states:
48+
tracker:
49+
from: mergeservice.tracker
50+
sinks:
51+
- type: topic
52+
id: message
53+
transforms:
54+
- operator: map
55+
run: |
56+
fn new_input(_input: String) -> Result<String> {
57+
let track = tracker();
58+
let trackrow = track.sql(&format!("select * from `tracker`"))?;
59+
let rows = trackrow.rows()?;
60+
if !rows.next() {
61+
return Ok("empty".to_string())
62+
}
63+
let balancecol = trackrow.col("balance")?;
64+
let balance = rows.f32(&balancecol)?;
65+
Ok(format!("{:#?}",balance))
66+
}
67+
mergeservice:
68+
sources:
69+
- type: topic
70+
id: buy
71+
transforms:
72+
- operator: map
73+
run: |
74+
fn buy_order(order: Order) -> Result<f32> {
75+
Ok(order.amount as f32 * order.price * -1.0)
76+
}
77+
- type: topic
78+
id: sell
79+
transforms:
80+
- operator: map
81+
run: |
82+
fn sell_order(order: Order) -> Result<f32> {
83+
Ok(order.amount as f32 * order.price)
84+
}
85+
states:
86+
tracker:
87+
type: keyed-state
88+
properties:
89+
key:
90+
type: string
91+
value:
92+
type: arrow-row
93+
properties:
94+
balance:
95+
type: f32
96+
partition:
97+
assign-key:
98+
run: |
99+
fn map_cash(order: f32) -> Result<String> {
100+
Ok("cash".to_string())
101+
}
102+
update-state:
103+
run: |
104+
fn add_count(order: f32) -> Result<()> {
105+
let mut tracker = tracker();
106+
tracker.balance += order;
107+
tracker.update()?;
108+
Ok(())
109+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
apiVersion: 0.5.0
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
config:
2+
converter: json
3+
consumer:
4+
default_starting_offset:
5+
value: 0
6+
position: End
7+
producer:
8+
linger_ms: 0
9+
batch_size: 1000000
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
imports:
2+
- pkg: my-dataflow/my-pkg@0.1.0
3+
types:
4+
- name: sentence
5+
- name: word-count
6+
functions:
7+
- name: sentence-to-words
8+
- name: augment-count
9+
states:
10+
- name: word-count-table
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
meta:
2+
name: my-dataflow
3+
version: 0.1.0
4+
namespace: my-org
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
topics:
2+
cars:
3+
schema:
4+
value:
5+
type: Car
6+
converter: json
7+
consumer:
8+
default_starting_offset:
9+
value: 0
10+
position: End
11+
car-events:
12+
schema:
13+
key:
14+
type: CarLicense
15+
value:
16+
type: CarEvent
17+
producer:
18+
linger_ms: 0
19+
batch_size: 1000000
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
# dataflow.yaml
2+
apiVersion: 0.5.0
3+
meta:
4+
name: filter-map-example
5+
version: 0.1.0
6+
namespace: examples
7+
8+
config:
9+
converter: raw
10+
11+
topics:
12+
sentences:
13+
schema:
14+
value:
15+
type: string
16+
domath:
17+
schema:
18+
value:
19+
type: string
20+
21+
services:
22+
filter-map-service:
23+
sources:
24+
- type: topic
25+
id: sentences
26+
27+
transforms:
28+
- operator: filter-map
29+
dependencies:
30+
- name: regex
31+
version: "1"
32+
run: |
33+
fn do_addition(input: String) -> Result<Option<String> > {
34+
let re = regex::Regex::new(r"^(\d+)\+(\d+)=$").unwrap();
35+
if let Some(num) = re.captures(&input) {
36+
let a: i32 = num.get(1).unwrap().as_str().parse().unwrap();
37+
let b: i32 = num.get(2).unwrap().as_str().parse().unwrap();
38+
return Ok(Some(format!("{}{}",input,(a+b))));
39+
} else{
40+
return Ok(None);
41+
}
42+
}
43+
44+
sinks:
45+
- type: topic
46+
id: domath
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
# dataflow.yaml
2+
apiVersion: 0.5.0
3+
meta:
4+
name: filter-example
5+
version: 0.1.0
6+
namespace: examples
7+
8+
config:
9+
converter: raw
10+
11+
topics:
12+
sentences:
13+
schema:
14+
value:
15+
type: string
16+
17+
questions:
18+
schema:
19+
value:
20+
type: string
21+
22+
services:
23+
filter-service:
24+
sources:
25+
- type: topic
26+
id: sentences
27+
28+
transforms:
29+
- operator: filter
30+
run: |
31+
fn filter_questions(input: String) -> Result<bool> {
32+
Ok(input.contains("?"))
33+
}
34+
35+
sinks:
36+
- type: topic
37+
id: questions
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
# dataflow.yaml
2+
apiVersion: 0.5.0
3+
meta:
4+
name: flat-map-example
5+
version: 0.1.0
6+
namespace: examples
7+
8+
config:
9+
converter: raw
10+
11+
topics:
12+
sentences:
13+
schema:
14+
value:
15+
type: string
16+
halfword:
17+
schema:
18+
value:
19+
type: string
20+
21+
services:
22+
flat-map-service:
23+
sources:
24+
- type: topic
25+
id: sentences
26+
27+
transforms:
28+
- operator: flat-map
29+
run: |
30+
fn halfword(input: String) -> Result<Vec<String>> {
31+
let mut ret: Vec<String> = Vec::new();
32+
let mid = input.len() / 2;
33+
ret.push(format!("{}",&input[..mid]));
34+
ret.push(format!("{}",&input[mid..]));
35+
Ok(ret)
36+
}
37+
38+
sinks:
39+
- type: topic
40+
id: halfword
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
apiVersion: 0.5.0
2+
3+
meta:
4+
name: chained-key
5+
version: 0.1.0
6+
namespace: example
7+
8+
config:
9+
converter: json
10+
consumer:
11+
default_starting_offset:
12+
value: 0
13+
position: End
14+
15+
types:
16+
car-info:
17+
type: object
18+
properties:
19+
year:
20+
type: u32
21+
brand:
22+
type: string
23+
model:
24+
type: string
25+
car-short:
26+
type: object
27+
properties:
28+
vin:
29+
type: string
30+
desc:
31+
type: string
32+
#Both topics are key-values
33+
topics:
34+
input:
35+
schema:
36+
key:
37+
type: string
38+
value:
39+
type: car-info
40+
output:
41+
schema:
42+
key:
43+
type: string
44+
value:
45+
type: car-short
46+
47+
services:
48+
kv-to-kv:
49+
sources:
50+
- type: topic
51+
id: input
52+
53+
transforms:
54+
- operator: map
55+
run: |
56+
fn vin_to_car(vin: Option<String>, car: CarInfo) -> Result<CarShort> {
57+
Ok(CarShort{
58+
vin: vin.unwrap(),
59+
desc: format!("{} {} {}",car.year,car.brand,car.model),
60+
})
61+
}
62+
63+
- operator: map
64+
run: |
65+
fn car_to_manu(vin: Option<String>, car_short: CarShort) -> Result<(Option<String>, CarShort)> {
66+
let country = if let Some(first_char) = vin.unwrap().chars().next() {
67+
if first_char == '1' || first_char == '4' || first_char == '5' { "United States" }
68+
else if first_char == '2' { "Canada" }
69+
else if first_char == '3' { "Mexico" }
70+
else if first_char == 'J' { "Japan" }
71+
else if first_char == 'K' { "South Korea" }
72+
else if first_char == 'L' { "China" }
73+
else if first_char == 'S' { "United Kingdom" }
74+
else if first_char == 'V' { "France" }
75+
else if first_char == 'W' { "Germany" }
76+
else if first_char == 'Z' { "Italy" }
77+
else { "Unknown" }
78+
}
79+
else { "Invalid VIN" };
80+
Ok((Some(country.to_string()), car_short))
81+
}
82+
83+
sinks:
84+
- type: topic
85+
id: output

0 commit comments

Comments
 (0)