Skip to content

Commit 78116a3

Browse files
authored
chore: how to SQL cli (#320)
Co-authored-by: Luis Moreno <morenol@users.noreply.github.com>
1 parent 93eb074 commit 78116a3

File tree

6 files changed

+536
-2
lines changed

6 files changed

+536
-2
lines changed

sdf/_embeds/dataflows/sql_cli.yaml

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
apiVersion: 0.5.0
2+
meta:
3+
name: sql-example
4+
version: 0.1.0
5+
namespace: examples
6+
7+
config:
8+
converter: json
9+
10+
types:
11+
vehicle-data-type:
12+
type: object
13+
properties:
14+
vehicle_id:
15+
type: string
16+
latitude:
17+
type: f64
18+
longitude:
19+
type: f64
20+
sensor_status:
21+
type: string
22+
fuel_consumption:
23+
type: u32
24+
engine_rpm:
25+
type: u32
26+
engine_temperature:
27+
type: i32
28+
speed:
29+
type: float32
30+
31+
topics:
32+
vehicle-sensor:
33+
schema:
34+
value:
35+
type: vehicle-data-type
36+
services:
37+
collect-sensor-data:
38+
sources:
39+
- type: topic
40+
id: vehicle-sensor
41+
states:
42+
vehicle-data:
43+
type: keyed-state
44+
properties:
45+
key:
46+
type: string
47+
value:
48+
type: arrow-row
49+
properties:
50+
latitude:
51+
type: f64
52+
longitude:
53+
type: f64
54+
fuel_consumption:
55+
type: u32
56+
sensor_status:
57+
type: string
58+
engine_temperature:
59+
type: i32
60+
61+
partition:
62+
assign-key:
63+
run: |
64+
fn key_by_id(data: VehicleDataType) -> Result<String> {
65+
Ok(data.vehicle_id)
66+
}
67+
68+
update-state:
69+
run: |
70+
fn update_temperature(data: VehicleDataType) -> Result<()> {
71+
let mut vd = vehicle_data();
72+
73+
vd.latitude = data.latitude;
74+
vd.longitude = data.longitude;
75+
vd.fuel_consumption = data.fuel_consumption;
76+
vd.engine_temperature = data.engine_temperature;
77+
vd.sensor_status = data.sensor_status;
78+
vd.update()?;
79+
Ok(())
80+
}

sdf/how-to/state_sql_cli.mdx

Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
---
2+
title: "State CLI SQL access"
3+
description: "State Example using CLI SQL"
4+
sidebar_position: 1050
5+
---
6+
7+
import CodeBlock from '@theme/CodeBlock';
8+
import SqlCLi from '!!raw-loader!../_embeds/dataflows/sql_cli.yaml';
9+
10+
This tutorial is a continuation from the previous state example. This tutorial shows how to use SQL interface through CLI to access state data.
11+
12+
## Prerequisites
13+
14+
This guide uses `local` Fluvio cluster. If you need to install it, please follow the instructions at [here][installation].
15+
16+
## Dataflow
17+
18+
### Overview
19+
From previous examples we learned how to create and manipulate a state. Here we will do the same but also will use CLI interface to access it.
20+
21+
### Collect-sensor-data
22+
23+
#### 1. Define the state
24+
25+
For this state, we will track the `latitude`, `longitude`, `sensor_status` and more parameters from the sensors.
26+
27+
```YAML
28+
states:
29+
vehicle-data:
30+
type: keyed-state
31+
properties:
32+
key:
33+
type: string
34+
value:
35+
type: arrow-row
36+
properties:
37+
latitude:
38+
type: f64
39+
longitude:
40+
type: f64
41+
fuel_consumption:
42+
type: u32
43+
sensor_status:
44+
type: string
45+
engine_temperature:
46+
type: i32
47+
```
48+
49+
Here, the `key` is a string but the value is stored as an `arrow-row` which can contain multiple `properties`(acts like columns).
50+
51+
#### 2. Assign key
52+
We will access the id from the sensor data to partition the state.
53+
```YAML
54+
partition:
55+
assign-key:
56+
run: |
57+
fn key_by_id(data: VehicleDataType) -> Result<String> {
58+
Ok(data.vehicle_id)
59+
}
60+
update-state:
61+
(...)
62+
```
63+
#### 3. Updating State
64+
To update the state in an `arrow-row`, we need to update the individual row's columns manual and call an `update()`.
65+
```YAML
66+
partition:
67+
assign-key:
68+
(...)
69+
update-state:
70+
run: |
71+
fn update_temperature(data: VehicleDataType) -> Result<()> {
72+
let mut vd = vehicle_data();
73+
74+
vd.latitude = data.latitude;
75+
vd.longitude = data.longitude;
76+
vd.fuel_consumption = data.fuel_consumption;
77+
vd.engine_temperature = data.engine_temperature;
78+
vd.sensor_status = data.sensor_status;
79+
vd.update()?;
80+
Ok(())
81+
}
82+
```
83+
84+
States are terminal so no other action will be run.
85+
86+
In this example there is not other service consuming the state, we will use the SQL interface to access it from CLI.
87+
88+
89+
## Running the Example
90+
### Full Code
91+
Copy and paste following config and save it as `dataflow.yaml`.
92+
<CodeBlock language="yaml">{SqlCLi}</CodeBlock>
93+
94+
### Running SDF
95+
To run example:
96+
```bash copy="cmd"
97+
$ sdf run
98+
```
99+
100+
### Produce data
101+
We will produce some data to mimic sensors behavior.
102+
```bash copy="cmd"
103+
$ echo '{ "timestamp": "2023-11-22T12:34:56Z", "vehicle_id": "V001", "latitude": 40.7128, "longitude": -74.0060, "speed": 60, "engine_temperature": 90, "engine_rpm": 2000, "fuel_consumption": 10, "sensor_status": "ok" }
104+
' | fluvio produce vehicle-sensor
105+
$ echo '{ "timestamp": "2023-11-22T12:35:01Z", "vehicle_id": "V002", "latitude": 34.0522, "longitude": -118.2437, "speed": 30, "engine_temperature": 85, "engine_rpm": 1500, "fuel_consumption": 8, "sensor_status": "failed"}
106+
' | fluvio produce vehicle-sensor
107+
```
108+
109+
### Enter SQL Mode
110+
111+
In the SDF interactive shell use the `sql` command to enter the SQL Mode:
112+
113+
```bash
114+
>> sql
115+
SDF SQL version sdf-beta5
116+
Type .help for help.
117+
sql >>
118+
```
119+
120+
### Run queries on the SQL mode
121+
122+
In the SQL mode we will be able to access the dataframe states of the dataflow.
123+
124+
We can list the tables available with:
125+
126+
```bash
127+
sql >> show tables
128+
shape: (1, 1)
129+
┌──────────────┐
130+
│ name │
131+
│ --- │
132+
│ str │
133+
╞══════════════╡
134+
│ vehicle_data │
135+
└──────────────┘
136+
```
137+
138+
We can also perform normal sql queries:
139+
140+
```
141+
select * from vehicle_data
142+
shape: (2, 6)
143+
┌──────┬────────────────────┬──────────────────┬──────────┬───────────┬───────────────┐
144+
│ _key ┆ engine_temperature ┆ fuel_consumption ┆ latitude ┆ longitude ┆ sensor_status │
145+
│ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │
146+
│ str ┆ i32 ┆ u32 ┆ f64 ┆ f64 ┆ str │
147+
╞══════╪════════════════════╪══════════════════╪══════════╪═══════════╪═══════════════╡
148+
│ V001 ┆ 90 ┆ 10 ┆ 40.7128 ┆ -74.006 ┆ ok │
149+
│ V002 ┆ 85 ┆ 8 ┆ 34.0522 ┆ -118.2437 ┆ failed │
150+
└──────┴────────────────────┴──────────────────┴──────────┴───────────┴───────────────┘
151+
152+
sql >> select * from vehicle_data where sensor_status = 'failed'
153+
shape: (1, 6)
154+
┌──────┬────────────────────┬──────────────────┬──────────┬───────────┬───────────────┐
155+
│ _key ┆ engine_temperature ┆ fuel_consumption ┆ latitude ┆ longitude ┆ sensor_status │
156+
│ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │
157+
│ str ┆ i32 ┆ u32 ┆ f64 ┆ f64 ┆ str │
158+
╞══════╪════════════════════╪══════════════════╪══════════╪═══════════╪═══════════════╡
159+
│ V002 ┆ 85 ┆ 8 ┆ 34.0522 ┆ -118.2437 ┆ failed │
160+
└──────┴────────────────────┴──────────────────┴──────────┴───────────┴───────────────┘
161+
```
162+
163+
### Exit the SQL mode
164+
165+
Use `.quit` or `.exit` to exit the SQL mode.
166+
167+
```bash
168+
sql >> .quit
169+
```
170+
171+
## Cleanup
172+
173+
Exit `sdf` terminal and clean-up. The `--force` flag removes the topics:
174+
175+
```bash copy="cmd"
176+
$ sdf clean --force
177+
```
178+
179+
180+
## Conclusion
181+
182+
We just implement example accessing arrow states through SQL interface. The following link contains another example using the data from multiple states to perform a JOIN Query
183+
1. [Join Example][github_sql_join]
184+
185+
186+
[installation]: /docs/fluvio/quickstart#install-fluvio
187+
[github_sql_join]: https://github.com/infinyon/stateful-dataflows-examples/tree/main/primitives/sql/join

sdf/how-to/tumbling_window.mdx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
---
22
title: "Tumbling Window"
33
description: "Tumbling Window"
4-
sidebar_position: 1000
4+
sidebar_position: 1100
55
---
66

77
import CodeBlock from '@theme/CodeBlock';
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
apiVersion: 0.5.0
2+
meta:
3+
name: sql-example
4+
version: 0.1.0
5+
namespace: examples
6+
7+
config:
8+
converter: json
9+
10+
types:
11+
vehicle-data-type:
12+
type: object
13+
properties:
14+
vehicle_id:
15+
type: string
16+
latitude:
17+
type: f64
18+
longitude:
19+
type: f64
20+
sensor_status:
21+
type: string
22+
fuel_consumption:
23+
type: u32
24+
engine_rpm:
25+
type: u32
26+
engine_temperature:
27+
type: i32
28+
speed:
29+
type: float32
30+
31+
topics:
32+
vehicle-sensor:
33+
schema:
34+
value:
35+
type: vehicle-data-type
36+
services:
37+
collect-sensor-data:
38+
sources:
39+
- type: topic
40+
id: vehicle-sensor
41+
states:
42+
vehicle-data:
43+
type: keyed-state
44+
properties:
45+
key:
46+
type: string
47+
value:
48+
type: arrow-row
49+
properties:
50+
latitude:
51+
type: f64
52+
longitude:
53+
type: f64
54+
fuel_consumption:
55+
type: u32
56+
sensor_status:
57+
type: string
58+
engine_temperature:
59+
type: i32
60+
61+
partition:
62+
assign-key:
63+
run: |
64+
fn key_by_id(data: VehicleDataType) -> Result<String> {
65+
Ok(data.vehicle_id)
66+
}
67+
68+
update-state:
69+
run: |
70+
fn update_temperature(data: VehicleDataType) -> Result<()> {
71+
let mut vd = vehicle_data();
72+
73+
vd.latitude = data.latitude;
74+
vd.longitude = data.longitude;
75+
vd.fuel_consumption = data.fuel_consumption;
76+
vd.engine_temperature = data.engine_temperature;
77+
vd.sensor_status = data.sensor_status;
78+
vd.update()?;
79+
Ok(())
80+
}

0 commit comments

Comments
 (0)