Skip to content

Commit 7a55ca4

Browse files
authored
Merge pull request #584 from boozallen/578-fix-hanging-pipeline
[#578] only generate messaging files if messaging is needed
2 parents a9a342f + 9393ed7 commit 7a55ca4

15 files changed

+126
-117
lines changed

foundation/foundation-mda/src/main/java/com/boozallen/aiops/mda/ManualActionNotificationService.java

+35-36
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ public void addNoticeToAddModuleToParentBuild(GenerationContext context, String
119119
/**
120120
* Checks if updates are needed to dependencies in a POM file.
121121
*
122-
* @param context the generation context
122+
* @param context the generation context
123123
* @param artifactId the artifact ID
124124
* @param persistType the persist type
125125
*/
@@ -312,41 +312,6 @@ private void appendTiltHelmBuild(String appName, String deployArtifactId, String
312312
builder.append(EMPTY_LINE);
313313
}
314314

315-
/**
316-
* Adds a notification to update the Tiltfile.
317-
* NOTE: This "k8s_yaml" line is also being output by appendTiltHelmBuild() and addSparkWorkerTiltResources() so
318-
* we should consider refactoring to extract out common code.
319-
*
320-
* @param context the generation context
321-
* @param appName the application name
322-
* @param deployArtifactId the deploy artifact ID
323-
* @param yamlFileName the deploy artifact ID
324-
*/
325-
public void addYamlTiltFileMessage(final GenerationContext context, final String appName,
326-
final String deployArtifactId, final String yamlFileName) {
327-
328-
final File rootDir = context.getExecutionRootDirectory();
329-
if (!rootDir.exists() || !tiltFileFound(rootDir)) {
330-
logger.warn("Unable to find Tiltfile. Will not be able to direct manual Helm updates to for Tiltfile");
331-
} else {
332-
final String tiltFilePath = rootDir.getAbsolutePath() + File.separator + "Tiltfile";
333-
final String yamlFilePath = deployArtifactId + "/src/main/resources/apps/" + appName + "/" + yamlFileName;
334-
final String text = "apps/" + appName;
335-
336-
boolean tiltFileContainsArtifact = existsInFile(tiltFilePath, text);
337-
if (!tiltFileContainsArtifact && showWarnings(tiltFilePath)) {
338-
final String key = getMessageKey("Tiltfile", "yaml", appName);
339-
340-
HashSet<String> items = new HashSet<String>();
341-
items.add(yamlFilePath);
342-
343-
VelocityNotification notification = new VelocityNotification(key, GROUP_TILT, items,
344-
"templates/notifications/notification.yaml.tiltfile.vm");
345-
addManualAction(tiltFilePath, notification);
346-
}
347-
}
348-
}
349-
350315
/**
351316
* Adds a notification to update the tiltfile with necessary elasticsearch resources
352317
*
@@ -499,6 +464,40 @@ public void addDockerPomMessage(final GenerationContext context, final String pr
499464
}
500465
}
501466
}
467+
468+
/**
469+
* Adds a manual action notification to configure an outbound SmallRye connector for a specific channel to support
470+
* sending messages to an external messaging system.
471+
*
472+
* @param context the Fermenter generation context
473+
* @param pipelineArtifactId the artifact ID for the pipeline which needs an outbound connector configured
474+
* @param description the plain English description of the messaging channel, e.g. "Alert Producer"
475+
* @param channel the SmallRye channel name on which the connector will receive outbound messages
476+
* @param serializerClass the class used to serialize the message values
477+
*/
478+
public void addSmallRyeConnectorMessage(GenerationContext context, String pipelineArtifactId, String description, String channel, String serializerClass) {
479+
File root = context.getExecutionRootDirectory();
480+
String pipelinesModule = MavenUtil.getPipelinesModuleName(root);
481+
if (root != null && pipelinesModule != null) {
482+
Path pipelineDir = root.toPath().resolve(Path.of(pipelinesModule, pipelineArtifactId));
483+
Path mpConfig = pipelineDir.resolve("src/main/resources/META-INF/microprofile-config.properties");
484+
String connectorProperty = "mp.messaging.outgoing." + channel + ".connector=";
485+
486+
if (Files.exists(mpConfig) && !existsInFile(mpConfig.toString(), connectorProperty)) {
487+
String key = getMessageKey(pipelineArtifactId, "microprofile-config", channel, "outgoing-connector");
488+
VelocityNotification notification = new VelocityNotification(key, "microprofile-config", new HashSet<>(),
489+
"templates/notifications/notification.microprofile-config.connector.vm");
490+
notification.addToVelocityContext("description", description);
491+
notification.addToVelocityContext("channel", channel);
492+
notification.addToVelocityContext("topic", channel);
493+
notification.addToVelocityContext("serializer", serializerClass);
494+
// Group Template properties
495+
notification.addToExternalVelocityContextProperties("pipelinesArtifactId", pipelinesModule);
496+
notification.addToExternalVelocityContextProperties("pipelineArtifactId", pipelineArtifactId);
497+
addManualAction(mpConfig.toString(), notification);
498+
}
499+
}
500+
}
502501

503502
private boolean executionAppExistsInPomFile(File file, String appName) {
504503
return propertyVariableExistsInPomFile(file, appName, "appName", appName);

foundation/foundation-mda/src/main/java/com/boozallen/aiops/mda/generator/TargetedPipelineResourcesGenerator.java foundation/foundation-mda/src/main/java/com/boozallen/aiops/mda/generator/PipelineMessagingResourcesGenerator.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
* Generates configuration code with no model interaction. This is often useful for
2424
* configuration files that must exist in some form or similar constructs.
2525
*/
26-
public class TargetedPipelineResourcesGenerator extends AbstractResourcesGenerator {
26+
public class PipelineMessagingResourcesGenerator extends AbstractResourcesGenerator {
2727
/*--~-~-~~
2828
* Usages:
2929
* | Target | Template | Generated File |
@@ -50,12 +50,12 @@ public void generate(GenerationContext generationContext) {
5050
String fileName = replace("pipelineName", baseOutputFile, targetPipeline.getName());
5151
generationContext.setOutputFile(fileName);
5252

53-
if (shouldGenerate(pipeline)) {
53+
if (shouldGenerate(targetPipeline)) {
5454
generateFile(generationContext, vc);
5555
}
5656
}
5757

58-
protected boolean shouldGenerate(Pipeline pipeline) {
59-
return true;
58+
protected boolean shouldGenerate(BasePipelineDecorator pipeline) {
59+
return pipeline.isMessagingSupportNeeded();
6060
}
6161
}

foundation/foundation-mda/src/main/java/com/boozallen/aiops/mda/generator/TargetedPipelineJavaGenerator.java

+12-4
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ public class TargetedPipelineJavaGenerator extends AbstractJavaGenerator {
4343
* | javaPipelineDriver | pipeline.driver.impl.java.vm | ${basePackage}/${pipelineName}Driver.java |
4444
*/
4545

46+
public static final String METADATA_CHANNEL = "metadata-ingest";
47+
public static final String ALERT_CHANNEL = "alerts";
48+
4649
protected ManualActionNotificationService manualActionNotificationService = new ManualActionNotificationService();
4750

4851
/**
@@ -56,8 +59,9 @@ public void generate(GenerationContext generationContext) {
5659

5760
Pipeline pipeline = PipelineUtils.getTargetedPipeline(generationContext, metadataContext);
5861
JavaPipeline javaTargetPipeline = new JavaPipeline(pipeline);
62+
String artifactId = javaTargetPipeline.deriveArtifactIdFromCamelCase();
5963
vc.put(VelocityProperty.PIPELINE, javaTargetPipeline);
60-
vc.put(VelocityProperty.ARTIFACT_ID, javaTargetPipeline.deriveArtifactIdFromCamelCase());
64+
vc.put(VelocityProperty.ARTIFACT_ID, artifactId);
6165

6266
String baseOutputFile = generationContext.getOutputFile();
6367
String fileName = replace("pipelineName", baseOutputFile, javaTargetPipeline.getName());
@@ -73,7 +77,7 @@ public void generate(GenerationContext generationContext) {
7377

7478
// TODO: Conditional on whether we're specifically using Kafka
7579

76-
if(javaTargetPipeline.hasMessaging()) {
80+
if(javaTargetPipeline.hasMessagingSteps()) {
7781
for (JavaStep eachStep : javaTargetPipeline.getMessagingSteps()) {
7882
if (eachStep.hasMessagingInbound()) {
7983
manualActionNotificationService.addNoticeToUpdateKafkaConfig(generationContext, eachStep.getInbound().getChannelName());
@@ -84,13 +88,17 @@ public void generate(GenerationContext generationContext) {
8488
}
8589
}
8690
if(javaTargetPipeline.isAlertingSupportNeeded()) {
87-
manualActionNotificationService.addNoticeToUpdateKafkaConfig(generationContext, "alerts");
91+
manualActionNotificationService.addNoticeToUpdateKafkaConfig(generationContext, ALERT_CHANNEL);
92+
// TODO: StringSerializer actually throws an exception. We need an AlertSerializer in alerting-core
93+
manualActionNotificationService.addSmallRyeConnectorMessage(generationContext, artifactId, "Alert Producer", ALERT_CHANNEL, "org.apache.kafka.common.serialization.StringSerializer");
8894
}
8995
if(javaTargetPipeline.isMetadataNeeded()) {
90-
manualActionNotificationService.addNoticeToUpdateKafkaConfig(generationContext, "metadata-ingest");
96+
manualActionNotificationService.addNoticeToUpdateKafkaConfig(generationContext, METADATA_CHANNEL);
97+
manualActionNotificationService.addSmallRyeConnectorMessage(generationContext, artifactId, "Metadata Producer", METADATA_CHANNEL, "com.boozallen.aissemble.core.metadata.producer.MetadataSerializer");
9198
}
9299
if(javaTargetPipeline.getDataLineage()) {
93100
manualActionNotificationService.addNoticeToUpdateKafkaConfig(generationContext, Constants.DATA_LINEAGE_CHANNEL_NAME);
101+
manualActionNotificationService.addSmallRyeConnectorMessage(generationContext, artifactId, "Data Lineage Emitter", Constants.DATA_LINEAGE_CHANNEL_NAME, "org.apache.kafka.common.serialization.StringSerializer");
94102
}
95103
}
96104

foundation/foundation-mda/src/main/java/com/boozallen/aiops/mda/generator/util/MavenUtil.java

+4
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,10 @@ public static String getSharedModuleName(final File rootProjectDirectory) {
107107
return getChildModuleName(rootProjectDirectory, ".+-shared");
108108
}
109109

110+
public static String getPipelinesModuleName(final File rootProjectDirectory) {
111+
return getChildModuleName(rootProjectDirectory, ".+-pipelines");
112+
}
113+
110114
/**
111115
* Discovers the appropriate data record module for the given language and data module type. If the language is not
112116
* the default language for the data module, a language-specific module is returned.

foundation/foundation-mda/src/main/java/com/boozallen/aiops/mda/metamodel/element/BasePipelineDecorator.java

+16-11
Original file line numberDiff line numberDiff line change
@@ -158,17 +158,13 @@ public String getCapitalizedName() {
158158
}
159159

160160

161-
public boolean hasMessaging() {
162-
for (Step step : getSteps()) {
163-
if (step.getInbound() != null) {
164-
if ("messaging".equals(step.getInbound().getType())) {
165-
return true;
166-
}
167-
}
168-
if (step.getOutbound() != null) {
169-
if ("messaging".equals(step.getOutbound().getType())) {
170-
return true;
171-
}
161+
/**
162+
* @return true if any step uses `messaging` or `multimessaging` type inbound/outbound
163+
*/
164+
public boolean hasMessagingSteps() {
165+
for (BaseStepDecorator step : getSteps()) {
166+
if(step.hasMessagingInbound() || step.hasMessagingOutbound()) {
167+
return true;
172168
}
173169
}
174170
return false;
@@ -344,6 +340,15 @@ public boolean isMetadataNeeded() {
344340
.anyMatch(BaseStepDecorator::isMetadataEnabled);
345341
}
346342

343+
/**
344+
* @return true if any features that require messaging are enabled
345+
*/
346+
public boolean isMessagingSupportNeeded() {
347+
return isAlertingSupportNeeded() ||
348+
isMetadataNeeded() ||
349+
hasMessagingSteps();
350+
}
351+
347352
private boolean hasPersistType(PersistType persistType) {
348353
boolean hasPersistType = false;
349354

foundation/foundation-mda/src/main/java/com/boozallen/aiops/mda/metamodel/element/BaseStepDecorator.java

+32-24
Original file line numberDiff line numberDiff line change
@@ -288,26 +288,6 @@ public boolean isNative(StepDataBinding stepDataBinding) {
288288
return stepDataBinding != null && "native".equalsIgnoreCase(stepDataBinding.getType());
289289
}
290290

291-
/**
292-
* Whether or not the given step data binding has type 'messaging'.
293-
*
294-
* @param stepDataBinding
295-
* the step data binding to check
296-
* @return true if the given step data binding has type 'messaging'
297-
*/
298-
public boolean isMessaging(StepDataBinding stepDataBinding) {
299-
return stepDataBinding != null && ("messaging".equalsIgnoreCase(stepDataBinding.getType()));
300-
}
301-
302-
/**
303-
* Whether or not the given step data binding has type 'multimessaging'.
304-
* @param stepDataBinding the step data binding to check
305-
* @return true if the given step data binding has type 'multimessaging'.
306-
*/
307-
public boolean isMultiMessaging(StepDataBinding stepDataBinding) {
308-
return stepDataBinding != null && ("multimessaging".equalsIgnoreCase(stepDataBinding.getType()));
309-
}
310-
311291
/**
312292
* Whether or not the step has inbound type 'messaging'.
313293
*
@@ -318,12 +298,19 @@ public boolean hasMessagingInbound() {
318298
}
319299

320300
/**
321-
* Whether or not the step has outbound type 'messaging'.
322-
*
323-
* @return true if the step has outbound type 'messaging'
301+
* @return true if the step has outbound type 'messaging' or 'multimessaging'
324302
*/
325303
public boolean hasMessagingOutbound() {
326-
return isMessaging(getOutbound()) || isMultiMessaging(getOutbound());
304+
return hasSingleMessagingOutbound() || hasMultiMessagingOutbound();
305+
}
306+
307+
/**
308+
* Whether or not the step has outbound type 'messaging'.
309+
*
310+
* @return true if the step has outbound type 'messaging'
311+
*/
312+
public boolean hasSingleMessagingOutbound() {
313+
return isMessaging(getOutbound());
327314
}
328315

329316
/**
@@ -457,4 +444,25 @@ public boolean hasInboundNativeCollectionTypeAndRelations() {
457444

458445
return isNative && hasRelations;
459446
}
447+
448+
/**
449+
* Whether or not the given step data binding has type 'messaging'.
450+
*
451+
* @param stepDataBinding
452+
* the step data binding to check
453+
* @return true if the given step data binding has type 'messaging'
454+
*/
455+
protected boolean isMessaging(StepDataBinding stepDataBinding) {
456+
return stepDataBinding != null && ("messaging".equalsIgnoreCase(stepDataBinding.getType()));
457+
}
458+
459+
/**
460+
* Whether or not the given step data binding has type 'multimessaging'.
461+
* @param stepDataBinding the step data binding to check
462+
* @return true if the given step data binding has type 'multimessaging'.
463+
*/
464+
protected boolean isMultiMessaging(StepDataBinding stepDataBinding) {
465+
return stepDataBinding != null && ("multimessaging".equalsIgnoreCase(stepDataBinding.getType()));
466+
}
467+
460468
}

foundation/foundation-mda/src/main/java/com/boozallen/aiops/mda/metamodel/element/java/JavaStep.java

+2-12
Original file line numberDiff line numberDiff line change
@@ -176,16 +176,6 @@ public Set<String> getImplImports() {
176176
return imports;
177177
}
178178

179-
@Override
180-
public boolean isMessaging(StepDataBinding stepDataBinding) {
181-
return stepDataBinding != null
182-
&& ("messaging".equalsIgnoreCase(stepDataBinding.getType()));
183-
}
184-
185-
public boolean isMultiMessaging(StepDataBinding stepDataBinding) {
186-
return stepDataBinding != null && "multimessaging".equalsIgnoreCase(stepDataBinding.getType());
187-
}
188-
189179
/**
190180
* If the outbound type of this step is messaging, this method will return the Reactive Messaging outgoing channel
191181
* name. Note this is not the same as {@link StepDataBinding#getChannelName()} and is simply an externality of our
@@ -267,10 +257,10 @@ public String getAsyncBaseOutboundType() {
267257
outboundType += "<Void>";
268258
} else {
269259
List<String> types = new ArrayList<>();
270-
if (isMessaging(outbound)) {
260+
if (hasSingleMessagingOutbound()) {
271261
types.add("<Message");
272262
}
273-
if (isMultiMessaging(getOutbound())) {
263+
if (hasMultiMessagingOutbound()) {
274264
types.add("<Message");
275265
outboundType = "Publisher";
276266
}

foundation/foundation-mda/src/main/resources/targets.json

+3-3
Original file line numberDiff line numberDiff line change
@@ -2625,15 +2625,15 @@
26252625
"name": "microprofileConfigProperties",
26262626
"templateName": "templates/pipeline.microprofile-config.properties.vm",
26272627
"outputFile": "META-INF/microprofile-config.properties",
2628-
"generator": "com.boozallen.aiops.mda.generator.TargetedPipelineResourcesGenerator",
2628+
"generator": "com.boozallen.aiops.mda.generator.PipelineMessagingResourcesGenerator",
26292629
"metadataContext": "targeted",
26302630
"overwritable": false
26312631
},
26322632
{
26332633
"name": "microprofileConfigServices",
26342634
"templateName": "templates/pipeline.services.microprofile.config.vm",
26352635
"outputFile": "META-INF/services/org.eclipse.microprofile.config.spi.ConfigSource",
2636-
"generator": "com.boozallen.aiops.mda.generator.TargetedPipelineResourcesGenerator",
2636+
"generator": "com.boozallen.aiops.mda.generator.PipelineMessagingResourcesGenerator",
26372637
"metadataContext": "targeted",
26382638
"overwritable": false
26392639
},
@@ -2649,7 +2649,7 @@
26492649
"name": "pipelineTestConfig",
26502650
"templateName": "templates/cucumber.test.pipeline-messaging.properties.vm",
26512651
"outputFile": "krausening/test/pipeline-messaging.properties",
2652-
"generator": "com.boozallen.aiops.mda.generator.TargetedPipelineResourcesGenerator",
2652+
"generator": "com.boozallen.aiops.mda.generator.PipelineMessagingResourcesGenerator",
26532653
"metadataContext": "targeted",
26542654
"overwritable": false,
26552655
"artifactType": "test"

foundation/foundation-mda/src/main/resources/templates/data-delivery-pyspark/pyproject.toml.vm

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ aissemble-extensions-data-delivery-spark-py = "${aissemblePythonVersion}"
2626
aissemble-foundation-data-lineage-python = "${aissemblePythonVersion}"
2727
#end
2828

29-
#if ($pipeline.hasMessaging() || $pipeline.isMetadataNeeded() || $pipeline.getDataLineage())
29+
#if ($pipeline.isMessagingSupportNeeded())
3030
confluent-kafka = { version = "2.1.1", optional = true }
3131
kafka-python = "^2.0.2"
3232
#end
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
To connect to external messaging services, configure the following connectors in $pipelinesArtifactId/$pipelineArtifactId/src/main/resources/META-INF/microprofile-config.properties:
2+
3+
#foreach ($item in $items)
4+
${item}
5+
#end
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
#set ($comment = '#')
2+
#set ($header = '###')
3+
$header $description configs $header
4+
mp.messaging.outgoing.${channel}.connector=smallrye-kafka
5+
mp.messaging.outgoing.${channel}.topic=$topic
6+
mp.messaging.outgoing.${channel}.value.serializer=${serializer}

foundation/foundation-mda/src/main/resources/templates/notifications/notification.spark.worker.tilt.vm

+3-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
#set($path = "/src/main/resources/apps/")
44
#end
55

6-
yaml = local('helm template oci://ghcr.io/boozallen/aissemble-spark-application-chart --version %s --values ${parentArtifactId}/${pipelineArtifactId}${path}${pipelineArtifactId}-base-values.yaml,${parentArtifactId}/${pipelineArtifactId}${path}${pipelineArtifactId}-dev-values.yaml' % aissemble_version)
6+
yaml = local('helm template oci://ghcr.io/boozallen/aissemble-spark-application-chart --version %s\
7+
--values ${parentArtifactId}/${pipelineArtifactId}${path}${pipelineArtifactId}-base-values.yaml\
8+
--values ${parentArtifactId}/${pipelineArtifactId}${path}${pipelineArtifactId}-dev-values.yaml' % aissemble_version)
79
k8s_yaml(yaml)
810
k8s_resource('${pipelineArtifactId}', port_forwards=[port_forward(4747, 4747, 'debug')], auto_init=False, trigger_mode=TRIGGER_MODE_MANUAL)

foundation/foundation-mda/src/main/resources/templates/notifications/notification.yaml.tiltfile.vm

-4
This file was deleted.

0 commit comments

Comments
 (0)