Skip to content

Commit afe098d

Browse files
authored
add schema validation to concepts (#284)
1 parent aea5e08 commit afe098d

10 files changed

+183
-0
lines changed

sdf/_embeds/schema/config-json.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
config:
2+
converter: json

sdf/_embeds/schema/dataflow.yaml

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
apiVersion: 0.5.0
2+
meta:
3+
name: person-age-validation
4+
version: 0.1.0
5+
namespace: examples
6+
config:
7+
converter: json
8+
consumer:
9+
default_starting_offset:
10+
value: 0
11+
position: End
12+
13+
types:
14+
user:
15+
type: object
16+
properties:
17+
name:
18+
type: string
19+
age:
20+
type: u8
21+
22+
topics:
23+
user-topic:
24+
name: user
25+
schema:
26+
value:
27+
type: user
28+
29+
message-topic:
30+
name: message
31+
schema:
32+
value:
33+
type: string
34+
35+
36+
services:
37+
check-adult:
38+
sources:
39+
- type: topic
40+
id: user-topic
41+
transforms:
42+
- operator: map
43+
run: |
44+
fn age_check(user: User) -> Result<String> {
45+
if user.age < 18 {
46+
Ok("minor".to_string())
47+
} else {
48+
Ok("adult".to_string())
49+
}
50+
}
51+
sinks:
52+
- type: topic
53+
id: message-topic

sdf/_embeds/schema/schema-error.log

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
$ sdf log -f
2+
Error deserializing input value ExpectedUnsigned at character 0

sdf/_embeds/schema/show-state.txt

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
>> show state
2+
Namespace Keys Type
3+
check-adult/user-topic/topic.offset 1 offset
4+
check-adult/age-check/metrics 1 table
5+
request-processing/request/metrics 1 table
6+
request-processing/request/topic.offset 1 offset
7+
check-adult/user-topic/metrics 1 table
8+
>> show state check-adult/user-topic/metrics
9+
Key Window succeeded failed last_error_offset
10+
stats * 4 2 5
11+
>>

sdf/_embeds/schema/topic-schema.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
topics:
2+
persons:
3+
schema:
4+
value:
5+
type: person

sdf/_embeds/schema/user-bad.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
{
2+
"name": "joe",
3+
"age": "30"
4+
}

sdf/_embeds/schema/user-good.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
{
2+
"name": "joe",
3+
"age": 30
4+
}

sdf/_embeds/schema/user-good.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
types:
2+
person:
3+
type: object
4+
properties:
5+
name:
6+
type: string
7+
weight:
8+
type: u8

sdf/_embeds/schema/version.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
apiVersion: 0.5.0
2+
meta:
3+
name: person-age-validation
4+
version: 0.1.0
5+
namespace: examples

sdf/concepts/schema_validation.mdx

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
---
2+
title: Schema Validation
3+
description: Schema Validation
4+
sidebar_position: 60
5+
---
6+
7+
import CodeBlock from '@theme/CodeBlock';
8+
import UserGood from '!!raw-loader!../_embeds/schema/user-good.yaml';
9+
import TypeSchema from '!!raw-loader!../_embeds/schema/topic-schema.yaml';
10+
import JsonConfig from '!!raw-loader!../_embeds/schema/config-json.yaml';
11+
import UserGoodData from '!!raw-loader!../_embeds/schema/user-good.json';
12+
import UserBadData from '!!raw-loader!../_embeds/schema/user-bad.json';
13+
import ShowState from '!!raw-loader!../_embeds/schema/show-state.txt';
14+
import SchemaErrorLog from '!!raw-loader!../_embeds/schema/schema-error.log';
15+
import Version from '!!raw-loader!../_embeds/schema/version.yaml';
16+
import DataFlow from '!!raw-loader!../_embeds/schema/dataflow.yaml';
17+
18+
SDF provides a schema validation feature to ensure that the data flowing through the dataflow is in the correct format.
19+
20+
# Schema
21+
22+
First step is to define data schema. Schema is defined in the [types] section of the dataflow. The types can be define as inline or in the package which can be shared across multiple dataflows.
23+
24+
For example, the following is a simple object type representing a person.
25+
26+
<CodeBlock language="yaml">{UserGood}</CodeBlock>
27+
28+
To enforce schema, all you have to is to specify the schema in the `topic` section. For example, the following is a topic definition with schema:
29+
30+
<CodeBlock language="yaml">{TypeSchema}</CodeBlock>
31+
32+
Schema can be enforced for both key and value part of the record.
33+
34+
Once defined, it can used to enforce the schema on the data from the source. The enforcement is specific to serialization format. Currently, SDF supports JSON serialization format but it can be extended to other formats in the future.
35+
36+
The serialization format is defined int the configuration section:
37+
38+
<CodeBlock language="yaml">{JsonConfig}</CodeBlock>
39+
40+
This will use `json` for all topics. But you can override per topic.
41+
42+
43+
Given the schema above, the following JSON object will pass the schema validation:
44+
45+
<CodeBlock language="json">{UserGoodData}</CodeBlock>
46+
47+
However, the following JSON object will fail the schema validation:
48+
49+
<CodeBlock language="json">{UserBadData}</CodeBlock>
50+
51+
The schema validation error will be reported in the [operator log]. The error message will indicate the field that failed the validation. The failed record will be skipped and the dataflow will continue to process the next record.
52+
53+
For example, with bad user data above, the error message will be:
54+
55+
<CodeBlock language="bash">{SchemaErrorLog}</CodeBlock>
56+
57+
Number of failed records will be also reflected in the internal metrics. The metrics can be accessed via the `sdf show state <operator>/metrics` command.
58+
59+
<CodeBlock>{ShowState}</CodeBlock>
60+
61+
The SDF type supports following concepts in the schema:
62+
- primitive types such as string, integer, float, boolean.
63+
- enum types
64+
- composite objects with nested properties
65+
- array or list of objects
66+
67+
# Versioning
68+
69+
Inline schema's version is inherited from dataflow version. If you want to version the schema, you can define the schema in the package and then version the package. The versioned package can be then used in the dataflow.
70+
71+
The schema package then can be published to [Hub] and imported into the dataflow.
72+
73+
Version follows semver syntax. For example, the following is a versioned schema package:
74+
75+
<CodeBlock language="yaml">{Version}</CodeBlock>
76+
77+
The `apiVersion` is the pkg syntax version and `version` in the `meta` section is the schema version.
78+
79+
80+
# Dataflow
81+
82+
Full dataflow is defined as follows:
83+
<CodeBlock language="yaml">{DataFlow}</CodeBlock>
84+
85+
86+
87+
[types]: /sdf/concepts/types.mdx
88+
[operator log]: /sdf/cli/log.mdx
89+
[Hub]: /sdf/composition/hub.mdx

0 commit comments

Comments
 (0)