-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathQueries.ksql
53 lines (47 loc) · 1.6 KB
/
Queries.ksql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
CREATE STREAM PREDICTED_WEIGHT(
"Fish_Id" VARCHAR KEY,
"Species" VARCHAR,
"Height" DOUBLE,
"Length" DOUBLE,
"Timestamp" VARCHAR,
"Prediction" STRUCT<"Weight" DOUBLE, "ModelTime" VARCHAR>
)
WITH(KAFKA_TOPIC = 'weight-prediction', VALUE_FORMAT = 'JSON');
CREATE STREAM ACTUAL_WEIGHT(
"Fish_Id" VARCHAR KEY,
"Species" VARCHAR,
"Weight" DOUBLE,
"Timestamp" VARCHAR
)
WITH(KAFKA_TOPIC = 'machine-weight', VALUE_FORMAT = 'JSON');
CREATE STREAM DIFF_WEIGHT
WITH(KAFKA_TOPIC = 'weight-diff', VALUE_FORMAT = 'JSON')
AS SELECT
'Key' AS "Key",
PREDICTED_WEIGHT."Fish_Id" AS "Fish_Id",
PREDICTED_WEIGHT."Species" AS "Species",
PREDICTED_WEIGHT."Length" AS "Length",
PREDICTED_WEIGHT."Height" AS "Height",
PREDICTED_WEIGHT."Prediction"->"Weight" AS "PredictedWeight",
ACTUAL_WEIGHT."Weight" AS "ActualWeight",
ROUND(ABS(PREDICTED_WEIGHT."Prediction"->"Weight" - ACTUAL_WEIGHT."Weight") / ACTUAL_WEIGHT."Weight", 3) AS "Error",
PREDICTED_WEIGHT."Prediction"->"ModelTime" AS "ModelTime",
ACTUAL_WEIGHT."Timestamp" AS "Timestamp"
FROM PREDICTED_WEIGHT
INNER JOIN ACTUAL_WEIGHT
WITHIN 1 MINUTE
ON PREDICTED_WEIGHT."Fish_Id" = ACTUAL_WEIGHT."Fish_Id";
set 'ksql.suppress.enabled'='true';
CREATE TABLE RETRAIN_WEIGHT
WITH(KAFKA_TOPIC = 'weight-retrain', VALUE_FORMAT = 'JSON')
AS SELECT
"Key",
COLLECT_SET("Species") AS "Species",
EARLIEST_BY_OFFSET("Fish_Id") AS "Fish_Id_Start",
LATEST_BY_OFFSET("Fish_Id") AS "Fish_Id_End",
AVG("Error") AS "ErrorAvg"
FROM DIFF_WEIGHT
WINDOW TUMBLING (SIZE 1 MINUTE)
GROUP BY "Key"
HAVING AVG("Error") > 0.15
EMIT FINAL;