Skip to content

Commit c1840e3

Browse files
Merge pull request #569 from boozallen/558-pyspark-schema-relation-records
#558 PySpark schema functionality works for record relations
2 parents bff8bfd + ac6da96 commit c1840e3

File tree

26 files changed

+585
-80
lines changed

26 files changed

+585
-80
lines changed

DRAFT_RELEASE_NOTES.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ Spark and PySpark have been upgraded from version 3.5.2 to 3.5.4.
1515
## Record Relation
1616
To enable nested data records, we have added a new relation feature to the record metamodel. This allows records to reference other records. For more details, refer to the [Record Relation Options](https://boozallen.github.io/aissemble/aissemble/current-dev/record-metamodel.html#_record_relation_options).
1717
Several features are still a work in progress:
18-
- PySpark schema generation for records with any multiplicity
18+
- PySpark and Spark based validation for records with a One to Many multiplicity. (Object validation is available.)
1919

2020
## Helm Charts Resource Specification
2121
The following Helm charts have been updated to include the configuration options for specifying container resource requests/limits:

foundation/foundation-mda/src/main/java/com/boozallen/aiops/mda/metamodel/element/pyspark/PySparkSchemaRecord.java

+13
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121

2222
import com.boozallen.aiops.mda.metamodel.element.Record;
2323
import com.boozallen.aiops.mda.metamodel.element.RecordField;
24+
import com.boozallen.aiops.mda.metamodel.element.Relation;
2425
import com.boozallen.aiops.mda.metamodel.element.python.PythonRecord;
26+
import com.boozallen.aiops.mda.metamodel.element.python.PythonRecordRelation;
2527

2628
/**
2729
* Decorates Record with PySpark-specific functionality.
@@ -30,6 +32,8 @@ public class PySparkSchemaRecord extends PythonRecord {
3032

3133
private static final Logger logger = LoggerFactory.getLogger(PySparkSchemaRecord.class);
3234

35+
private static final String SCHEMA_PACKAGE = "from ...schema.%s_schema import %sSchema";
36+
private static final String PYSPARK_ARRAY_IMPORT = "from pyspark.sql.types import ArrayType";
3337
private Set<String> imports = new TreeSet<>();
3438

3539
/**
@@ -75,6 +79,15 @@ public Set<String> getBaseImports() {
7579
imports.add(dictionaryTypeImport);
7680
}
7781
}
82+
boolean isArrayImportAdded = false;
83+
for (Relation relation : getRelations()) {
84+
PythonRecordRelation wrappedRelation = new PythonRecordRelation(relation);
85+
if(wrappedRelation.isOneToManyRelation() && !isArrayImportAdded) {
86+
isArrayImportAdded = true;
87+
imports.add(PYSPARK_ARRAY_IMPORT);
88+
}
89+
imports.add(String.format(SCHEMA_PACKAGE, wrappedRelation.getSnakeCaseName(), relation.getName()));
90+
}
7891

7992
return imports;
8093
}

foundation/foundation-mda/src/main/resources/templates/data-delivery-data-records/pyspark.schema.base.py.vm

+52-12
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
from abc import ABC
2+
23
from pyspark.sql.dataframe import DataFrame
34
from pyspark.sql.column import Column
45
from pyspark.sql.types import StructType
56
from pyspark.sql.types import DataType
6-
from pyspark.sql.functions import col
7+
from pyspark.sql.functions import col, lit
78
from typing import List
89
import types
910
#foreach ($import in $record.baseImports)
@@ -34,6 +35,9 @@ class ${record.capitalizedName}SchemaBase(ABC):
3435
#foreach ($field in $record.fields)
3536
${field.upperSnakecaseName}_COLUMN: str = '${field.sparkAttributes.columnName}'
3637
#end
38+
#foreach ($relation in $record.relations)
39+
${relation.upperSnakecaseName}_COLUMN: str = '${relation.columnName}'
40+
#end
3741

3842

3943
def __init__(self):
@@ -47,19 +51,33 @@ class ${record.capitalizedName}SchemaBase(ABC):
4751
self.add(${record.capitalizedName}SchemaBase.${field.upperSnakecaseName}_COLUMN, ${field.shortType}(), ${nullable})
4852
#end
4953
#end
54+
#foreach ($relation in $record.relations)
55+
#set ($nullable = "#if($relation.isNullable())True#{else}False#end")
56+
#if ($relation.isOneToManyRelation())
57+
self.add(${record.capitalizedName}SchemaBase.${relation.upperSnakecaseName}_COLUMN, ArrayType(${relation.capitalizedName}Schema().struct_type), ${nullable})
58+
#else
59+
self.add(${record.capitalizedName}SchemaBase.${relation.upperSnakecaseName}_COLUMN, ${relation.capitalizedName}Schema().struct_type, ${nullable})
60+
#end
61+
#end
5062

51-
#if ($record.hasFields())
63+
#if ($record.hasFields() || $record.hasRelations())
5264
def cast(self, dataset: DataFrame) -> DataFrame:
5365
"""
5466
Returns the given dataset cast to this schema.
5567
"""
5668
#foreach ($field in $record.fields)
5769
${field.snakeCaseName}_type = self.get_data_type(${record.capitalizedName}SchemaBase.${field.upperSnakecaseName}_COLUMN)
5870
#end
71+
#foreach ($relation in $record.relations)
72+
${relation.snakeCaseName}_type = self.get_data_type(${record.capitalizedName}SchemaBase.${relation.upperSnakecaseName}_COLUMN)
73+
#end
5974

6075
return dataset \
6176
#foreach ($field in $record.fields)
62-
.withColumn(${record.capitalizedName}SchemaBase.${field.upperSnakecaseName}_COLUMN, dataset[${record.capitalizedName}SchemaBase.${field.upperSnakecaseName}_COLUMN].cast(${field.snakeCaseName}_type))#if ($foreach.hasNext) \\#end
77+
.withColumn(${record.capitalizedName}SchemaBase.${field.upperSnakecaseName}_COLUMN, dataset[${record.capitalizedName}SchemaBase.${field.upperSnakecaseName}_COLUMN].cast(${field.snakeCaseName}_type))#if ($foreach.hasNext || $record.hasRelations()) \\#end
78+
#end
79+
#foreach ($relation in $record.relations)
80+
.withColumn(${record.capitalizedName}SchemaBase.${relation.upperSnakecaseName}_COLUMN, dataset[${record.capitalizedName}SchemaBase.${relation.upperSnakecaseName}_COLUMN].cast(${relation.snakeCaseName}_type))#if ($foreach.hasNext) \\#end
6381
#end
6482
#end
6583

@@ -111,39 +129,52 @@ class ${record.capitalizedName}SchemaBase(ABC):
111129
self._schema = update
112130

113131
def validate_dataset(self, ingest_dataset: DataFrame) -> DataFrame:
132+
return self.validate_dataset_with_prefix(ingest_dataset, "")
133+
134+
def validate_dataset_with_prefix(self, ingest_dataset: DataFrame, column_prefix: str) -> DataFrame:
114135
"""
115136
Validates the given dataset and returns the lists of validated records.
116137
"""
117138
data_with_validations = ingest_dataset
118139
#foreach ($field in $record.fields)
119140
#set ( $columnName = "#if($field.column)$field.column#{else}$field.upperSnakecaseName#end" )
120141
#if (${field.isRequired()})
121-
data_with_validations = data_with_validations.withColumn("${field.upperSnakecaseName}_IS_NOT_NULL", col("${columnName}").isNotNull())
142+
data_with_validations = data_with_validations.withColumn("${field.upperSnakecaseName}_IS_NOT_NULL", col(column_prefix + "${columnName}").isNotNull())
122143
#end
123144
#if (${field.getValidation().getMinValue()})
124-
data_with_validations = data_with_validations.withColumn("${field.upperSnakecaseName}_GREATER_THAN_MIN", col("${columnName}").cast('double') >= ${field.getValidation().getMinValue()})
145+
data_with_validations = data_with_validations.withColumn("${field.upperSnakecaseName}_GREATER_THAN_MIN", col(column_prefix + "${columnName}").cast('double') >= ${field.getValidation().getMinValue()})
125146
#end
126147
#if (${field.getValidation().getMaxValue()})
127-
data_with_validations = data_with_validations.withColumn("${field.upperSnakecaseName}_LESS_THAN_MAX", col("${columnName}").cast('double') <= ${field.getValidation().getMaxValue()})
148+
data_with_validations = data_with_validations.withColumn("${field.upperSnakecaseName}_LESS_THAN_MAX", col(column_prefix + "${columnName}").cast('double') <= ${field.getValidation().getMaxValue()})
128149
#end
129150
#if (${field.getValidation().getScale()})
130-
data_with_validations = data_with_validations.withColumn("${field.upperSnakecaseName}_MATCHES_SCALE", col("${columnName}").cast(StringType()).rlike(r"^[0-9]*(?:\.[0-9]{0,${field.getValidation().getScale()}})?$"))
151+
data_with_validations = data_with_validations.withColumn("${field.upperSnakecaseName}_MATCHES_SCALE", col(column_prefix + "${columnName}").cast(StringType()).rlike(r"^[0-9]*(?:\.[0-9]{0,${field.getValidation().getScale()}})?$"))
131152
#end
132153
#if (${field.getValidation().getMinLength()})
133-
data_with_validations = data_with_validations.withColumn("${field.upperSnakecaseName}_GREATER_THAN_MAX_LENGTH", col("${columnName}").rlike("^.{${field.getValidation().getMinLength()},}"))
154+
data_with_validations = data_with_validations.withColumn("${field.upperSnakecaseName}_GREATER_THAN_OR_EQUAL_TO_MIN_LENGTH", col(column_prefix + "${columnName}").rlike("^.{${field.getValidation().getMinLength()},}"))
134155
#end
135156
#if (${field.getValidation().getMaxLength()})
136-
data_with_validations = data_with_validations.withColumn("${field.upperSnakecaseName}_LESS_THAN_MAX_LENGTH", col("${columnName}").rlike("^.{${field.getValidation().getMaxLength()},}").eqNullSafe(False))
157+
#set($max = ${field.getValidation().getMaxLength()} + 1)
158+
data_with_validations = data_with_validations.withColumn("${field.upperSnakecaseName}_LESS_THAN_OR_EQUAL_TO_MAX_LENGTH", col(column_prefix + "${columnName}").rlike("^.{$max,}").eqNullSafe(False))
137159
#end
138160
#foreach ($format in $field.getValidation().getFormats())
139161
#if ($foreach.first)
140-
data_with_validations = data_with_validations.withColumn("${field.upperSnakecaseName}_MATCHES_FORMAT", col("${columnName}").rlike("$format.replace("\","\\")")#if($foreach.last))#end
162+
data_with_validations = data_with_validations.withColumn("${field.upperSnakecaseName}_MATCHES_FORMAT", col(column_prefix + "${columnName}").rlike("$format.replace("\","\\")")#if($foreach.last))#end
141163
#else
142-
| col("${columnName}").rlike("$format.replace("\","\\")")#if($foreach.last))#end
164+
| col(column_prefix + "${columnName}").rlike("$format.replace("\","\\")")#if($foreach.last))#end
143165
#end
144166
#end
145167
#end
146168

169+
#foreach($relation in $record.relations)
170+
#if($relation.isOneToManyRelation())
171+
data_with_validations = data_with_validations.withColumn(self.${relation.upperSnakecaseName}_COLUMN + "_VALID", lit(self._validate_with_${relation.snakeCaseName}_schema(data_with_validations.select(col(self.${relation.upperSnakecaseName}_COLUMN)))))
172+
#else
173+
${relation.snakeCaseName}_schema = ${relation.name}Schema()
174+
data_with_validations = data_with_validations.withColumn(self.${relation.upperSnakecaseName}_COLUMN + "_VALID", lit(not ${relation.snakeCaseName}_schema.validate_dataset_with_prefix(data_with_validations.select(col(self.${relation.upperSnakecaseName}_COLUMN)), '${relation.columnName}.').isEmpty()))
175+
#end
176+
#end
177+
147178
validation_columns = [x for x in data_with_validations.columns if x not in ingest_dataset.columns]
148179

149180
# Schema for filtering for valid data
@@ -159,4 +190,13 @@ class ${record.capitalizedName}SchemaBase(ABC):
159190
if isinstance(filter_schema, Column):
160191
valid_data = data_with_validations.filter(filter_schema)
161192
valid_data = valid_data.drop(*validation_columns)
162-
return valid_data
193+
return valid_data
194+
195+
#foreach($relation in $record.relations)
196+
#if($relation.isOneToManyRelation())
197+
def _validate_with_${relation.snakeCaseName}_schema(self, dataset: DataFrame) -> bool:
198+
raise NotImplementedError
199+
#end
200+
#end
201+
202+

foundation/foundation-mda/src/main/resources/templates/data-delivery-data-records/record.base.py.vm

+24-7
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ class ${record.capitalizedName}Base(ABC):
3737
Creates a record with the given PySpark dataframe row's data.
3838
"""
3939
record = cls()
40+
#if($record.hasFields() || $record.hasRelations())
4041
if row is not None:
4142
#foreach ($field in $record.fields)
4243
#set ($rowField = "#if($field.column)${field.column}#else${field.name}#end")
@@ -46,7 +47,15 @@ class ${record.capitalizedName}Base(ABC):
4647
#else
4748
record.${field.snakeCaseName} = ${field.snakeCaseName}_value
4849
#end
49-
50+
#end
51+
#foreach ($relation in $record.relations)
52+
${relation.snakeCaseName}_value = cls.get_row_value(row, '${relation.columnName}')
53+
#if($relation.isOneToManyRelation())
54+
record.${relation.snakeCaseName} = [${relation.name}.from_row($relation.snakeCaseName) for $relation.snakeCaseName in ${relation.snakeCaseName}_value]
55+
#else
56+
record.${relation.snakeCaseName} = ${relation.name}.from_row(${relation.snakeCaseName}_value)
57+
#end
58+
#end
5059
#end
5160
return record
5261

@@ -58,18 +67,26 @@ class ${record.capitalizedName}Base(ABC):
5867
"""
5968
return row[field] if field in row else None
6069

70+
6171
def as_row(self) -> Row:
6272
"""
6373
Returns this record as a PySpark dataframe row.
6474
"""
6575
return Row(
66-
#foreach ($field in $record.fields)
67-
#if ($field.type.dictionaryType.isComplex())
68-
self.${field.snakeCaseName}.value if self.${field.snakeCaseName} is not None else None#if ($foreach.hasNext),#end
69-
#else
70-
self.${field.snakeCaseName}#if ($foreach.hasNext),#end
71-
#end
76+
#foreach ($field in $record.fields)
77+
#if ($field.type.dictionaryType.isComplex())
78+
self.${field.snakeCaseName}.value if self.${field.snakeCaseName} is not None else None#if ($foreach.hasNext || $record.hasRelations()),#end
79+
#else
80+
self.${field.snakeCaseName}#if ($foreach.hasNext || $record.hasRelations()),#end
7281
#end
82+
#end
83+
#foreach($relation in $record.relations)
84+
#if ($relation.isOneToManyRelation())
85+
[${relation.snakeCaseName}.as_row() for ${relation.snakeCaseName} in self.${relation.snakeCaseName}] if self.${relation.snakeCaseName} is not None else None#if ($foreach.hasNext),#end
86+
#else
87+
self.${relation.snakeCaseName}.as_row() if self.${relation.snakeCaseName} is not None else None#if ($foreach.hasNext),#end
88+
#end
89+
#end
7390
)
7491

7592
#end

foundation/foundation-mda/src/main/resources/templates/data-delivery-data-records/spark.schema.base.java.vm

+4-8
Original file line numberDiff line numberDiff line change
@@ -283,17 +283,13 @@ public abstract class ${record.capitalizedName}SchemaBase extends SparkSchema {
283283
/**
284284
* Validate the given ${relation.capitalizedName} 1:M multiplicity relation dataset against ${relation.capitalizedName}Schema.
285285
* A false will be return if any one of the relation records schema validation is failed.
286+
* Currently not implemented so it throws a NotImplementedException
286287
* @param ${relation.uncapitalizedName}Dataset
287-
* @return boolean value to indicate validation result
288+
* @return NotImplementedException
288289
*/
289290
private boolean validateWith${relation.capitalizedName}Schema(Dataset<Row> ${relation.uncapitalizedName}Dataset) {
290-
${relation.capitalizedName}Schema ${relation.uncapitalizedName}Schema = new ${relation.capitalizedName}Schema();
291-
// flatten ${relation.uncapitalizedName} data
292-
Dataset<Row> flattenDataset = ${relation.uncapitalizedName}Dataset.select(explode(col(${relationVars[$relation.name]})));
293-
294-
// validate ${relation.capitalizedName}Schema
295-
Dataset<Row> validData = ${relation.uncapitalizedName}Schema.validateDataFrame(flattenDataset, "col.");
296-
return flattenDataset.count() == validData.count();
291+
throw new NotImplementedException(
292+
"Validation against relations with One to Many multiplicity is not yet implemented");
297293
}
298294

299295

test/test-mda-models/aissemble-test-data-delivery-pyspark-model/src/aissemble_test_data_delivery_pyspark_model/resources/dictionaries/AddressDictionary.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,12 @@
2222
}
2323
},
2424
{
25-
"name": "state",
25+
"name": "stateAddress",
2626
"simpleType": "string",
2727
"validation": {
2828
"maxLength": 2,
2929
"minLength": 2
3030
}
3131
}
3232
]
33-
}
33+
}

test/test-mda-models/aissemble-test-data-delivery-pyspark-model/src/aissemble_test_data_delivery_pyspark_model/resources/dictionaries/PysparkDataDeliveryDictionary.json

+5-1
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@
5959
{
6060
"name": "binarydata",
6161
"simpleType": "bytearray"
62+
},
63+
{
64+
"name": "string",
65+
"simpleType": "string"
6266
}
6367
]
64-
}
68+
}

test/test-mda-models/aissemble-test-data-delivery-pyspark-model/src/aissemble_test_data_delivery_pyspark_model/resources/pipelines/PysparkDataDeliveryPatterns.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -347,4 +347,4 @@
347347
}
348348
}
349349
]
350-
}
350+
}

test/test-mda-models/aissemble-test-data-delivery-pyspark-model/src/aissemble_test_data_delivery_pyspark_model/resources/records/Address.json

+5-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22
"name": "Address",
33
"package": "com.boozallen.aiops.mda.pattern.record",
44
"description": "Address custom record",
5+
"frameworks": [{
6+
"name": "pyspark"
7+
}],
58
"fields": [
69
{
710
"name": "street",
@@ -27,9 +30,9 @@
2730
{
2831
"name": "state",
2932
"type": {
30-
"name": "state",
33+
"name": "stateAddress",
3134
"package": "com.boozallen.aiops.mda.pattern.dictionary"
3235
}
3336
}
3437
]
35-
}
38+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
{
2+
"name": "City",
3+
"package": "com.boozallen.aiops.mda.pattern.record",
4+
"description": "Example City record for testing Spark Java Data Relations",
5+
"frameworks": [{
6+
"name": "pyspark"
7+
}],
8+
"relations": [
9+
{
10+
"name": "Mayor",
11+
"package": "com.boozallen.aiops.mda.pattern.records",
12+
"multiplicity": "1-1",
13+
"column": "MAYOR",
14+
"documentation": "There is one mayor in the city"
15+
},
16+
{
17+
"name": "State",
18+
"package": "com.boozallen.aiops.mda.pattern.records",
19+
"multiplicity": "M-1",
20+
"column": "STATE"
21+
},
22+
{
23+
"name": "Street",
24+
"package": "com.boozallen.aiops.mda.pattern.records",
25+
"multiplicity": "1-M",
26+
"column": "STREET"
27+
}
28+
]
29+
}

test/test-mda-models/aissemble-test-data-delivery-pyspark-model/src/aissemble_test_data_delivery_pyspark_model/resources/records/CustomData.json

+3
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22
"name": "CustomData",
33
"package": "com.boozallen.aiops.mda.pattern.record",
44
"description": "Example custom record for Pyspark Data Delivery Patterns",
5+
"frameworks": [{
6+
"name": "pyspark"
7+
}],
58
"fields": [
69
{
710
"name": "customField",
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
{
2+
"name": "Mayor",
3+
"package": "com.boozallen.aiops.mda.pattern.record",
4+
"description": "Example Mayor record for testing Spark Java Data Relations",
5+
"frameworks": [{
6+
"name": "pyspark"
7+
}],
8+
"fields": [
9+
{
10+
"name": "name",
11+
"type": {
12+
"name": "string",
13+
"package": "com.boozallen.aiops.mda.pattern.dictionary"
14+
}
15+
},
16+
{
17+
"name": "integerValidation",
18+
"type": {
19+
"name": "integerWithValidation",
20+
"package": "com.boozallen.aiops.mda.pattern.dictionary"
21+
},
22+
"column": "int_v8n"
23+
}
24+
]
25+
}

0 commit comments

Comments
 (0)