Skip to content

Commit

Permalink
Report failed status (#48)
Browse files Browse the repository at this point in the history
  • Loading branch information
ryannedolan authored Aug 25, 2023
1 parent d90a9c5 commit f81e6ac
Show file tree
Hide file tree
Showing 17 changed files with 96 additions and 54 deletions.
4 changes: 4 additions & 0 deletions deploy/subscriptions.crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ spec:
singular: subscription
shortNames:
- sub
- subs
preserveUnknownFields: false
scope: Namespaced
versions:
Expand Down Expand Up @@ -53,6 +54,9 @@ spec:
ready:
description: Whether the subscription is ready to be consumed.
type: boolean
failed:
description: Indicates that the operator was unable to deploy a pipeline for this subscription.
type: boolean
message:
description: Error or success message, for information only.
type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* Access control rule (colloquially, an Acl)
*/
@ApiModel(description = "Access control rule (colloquially, an Acl)")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
public class V1alpha1Acl implements io.kubernetes.client.common.KubernetesObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* AclList is a list of Acl
*/
@ApiModel(description = "AclList is a list of Acl")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
public class V1alpha1AclList implements io.kubernetes.client.common.KubernetesListObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* A set of related ACL rules.
*/
@ApiModel(description = "A set of related ACL rules.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
public class V1alpha1AclSpec {
/**
* The resource access method.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* The resource being controlled.
*/
@ApiModel(description = "The resource being controlled.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
public class V1alpha1AclSpecResource {
public static final String SERIALIZED_NAME_KIND = "kind";
@SerializedName(SERIALIZED_NAME_KIND)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* Status, as set by the operator.
*/
@ApiModel(description = "Status, as set by the operator.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
public class V1alpha1AclStatus {
public static final String SERIALIZED_NAME_MESSAGE = "message";
@SerializedName(SERIALIZED_NAME_MESSAGE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* Kafka Topic
*/
@ApiModel(description = "Kafka Topic")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
public class V1alpha1KafkaTopic implements io.kubernetes.client.common.KubernetesObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* KafkaTopicList is a list of KafkaTopic
*/
@ApiModel(description = "KafkaTopicList is a list of KafkaTopic")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
public class V1alpha1KafkaTopicList implements io.kubernetes.client.common.KubernetesListObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
* Desired Kafka topic configuration.
*/
@ApiModel(description = "Desired Kafka topic configuration.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
public class V1alpha1KafkaTopicSpec {
public static final String SERIALIZED_NAME_CLIENT_CONFIGS = "clientConfigs";
@SerializedName(SERIALIZED_NAME_CLIENT_CONFIGS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
/**
* V1alpha1KafkaTopicSpecClientConfigs
*/
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
public class V1alpha1KafkaTopicSpecClientConfigs {
public static final String SERIALIZED_NAME_CONFIG_MAP_REF = "configMapRef";
@SerializedName(SERIALIZED_NAME_CONFIG_MAP_REF)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* Reference to a ConfigMap to use for AdminClient configuration.
*/
@ApiModel(description = "Reference to a ConfigMap to use for AdminClient configuration.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
public class V1alpha1KafkaTopicSpecConfigMapRef {
public static final String SERIALIZED_NAME_NAME = "name";
@SerializedName(SERIALIZED_NAME_NAME)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* Current state of the topic.
*/
@ApiModel(description = "Current state of the topic.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
public class V1alpha1KafkaTopicStatus {
public static final String SERIALIZED_NAME_MESSAGE = "message";
@SerializedName(SERIALIZED_NAME_MESSAGE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* Hoptimator Subscription
*/
@ApiModel(description = "Hoptimator Subscription")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
public class V1alpha1Subscription implements io.kubernetes.client.common.KubernetesObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* SubscriptionList is a list of Subscription
*/
@ApiModel(description = "SubscriptionList is a list of Subscription")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
public class V1alpha1SubscriptionList implements io.kubernetes.client.common.KubernetesListObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* Subscription spec
*/
@ApiModel(description = "Subscription spec")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
public class V1alpha1SubscriptionSpec {
public static final String SERIALIZED_NAME_DATABASE = "database";
@SerializedName(SERIALIZED_NAME_DATABASE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,12 @@
* Filled in by the operator.
*/
@ApiModel(description = "Filled in by the operator.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
public class V1alpha1SubscriptionStatus {
public static final String SERIALIZED_NAME_FAILED = "failed";
@SerializedName(SERIALIZED_NAME_FAILED)
private Boolean failed;

public static final String SERIALIZED_NAME_MESSAGE = "message";
@SerializedName(SERIALIZED_NAME_MESSAGE)
private String message;
Expand All @@ -49,6 +53,29 @@ public class V1alpha1SubscriptionStatus {
private String sql;


public V1alpha1SubscriptionStatus failed(Boolean failed) {

this.failed = failed;
return this;
}

/**
* Indicates that the operator was unable to deploy a pipeline for this subscription.
* @return failed
**/
@javax.annotation.Nullable
@ApiModelProperty(value = "Indicates that the operator was unable to deploy a pipeline for this subscription.")

public Boolean getFailed() {
return failed;
}


public void setFailed(Boolean failed) {
this.failed = failed;
}


public V1alpha1SubscriptionStatus message(String message) {

this.message = message;
Expand Down Expand Up @@ -158,22 +185,24 @@ public boolean equals(Object o) {
return false;
}
V1alpha1SubscriptionStatus v1alpha1SubscriptionStatus = (V1alpha1SubscriptionStatus) o;
return Objects.equals(this.message, v1alpha1SubscriptionStatus.message) &&
return Objects.equals(this.failed, v1alpha1SubscriptionStatus.failed) &&
Objects.equals(this.message, v1alpha1SubscriptionStatus.message) &&
Objects.equals(this.ready, v1alpha1SubscriptionStatus.ready) &&
Objects.equals(this.resources, v1alpha1SubscriptionStatus.resources) &&
Objects.equals(this.sql, v1alpha1SubscriptionStatus.sql);
}

@Override
public int hashCode() {
return Objects.hash(message, ready, resources, sql);
return Objects.hash(failed, message, ready, resources, sql);
}


@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("class V1alpha1SubscriptionStatus {\n");
sb.append(" failed: ").append(toIndentedString(failed)).append("\n");
sb.append(" message: ").append(toIndentedString(message)).append("\n");
sb.append(" ready: ").append(toIndentedString(ready)).append("\n");
sb.append(" resources: ").append(toIndentedString(resources)).append("\n");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,39 +86,49 @@ public Result reconcile(Request request) {
// Phase 1
log.info("Planning a new pipeline for {}/{} with SQL `{}`...", kind, name, object.getSpec().getSql());

Pipeline pipeline = pipeline(object);
Resource.Environment subEnv = new SubscriptionEnvironment(namespace, name, pipeline)
.orElse(environment);
Resource.TemplateFactory templateFactory = new Resource.SimpleTemplateFactory(subEnv);

// For sink resources, also expose hints.
Resource.TemplateFactory sinkTemplateFactory = new Resource.SimpleTemplateFactory(subEnv
.orElse(new Resource.SimpleEnvironment(map(object.getSpec().getHints()))));

// Render resources related to all source tables.
List<String> upstreamResources = pipeline.upstreamResources().stream()
.map(x -> x.render(templateFactory))
.collect(Collectors.toList());

// Render the SQL job
String sqlJob = pipeline.sqlJob().render(templateFactory);

// Render resources related to the sink table. For these resources, we pass along any
// "hints" as part of the environment.
List<String> downstreamResources = pipeline.downstreamResources().stream()
.map(x -> x.render(sinkTemplateFactory))
.collect(Collectors.toList());

List<String> combined = new ArrayList<>();
combined.addAll(upstreamResources);
combined.add(sqlJob);
combined.addAll(downstreamResources);

status.setResources(combined);

status.setSql(object.getSpec().getSql());
status.setReady(null); // null indicates that pipeline needs to be deployed
status.setMessage("Planned.");
try {
Pipeline pipeline = pipeline(object);
Resource.Environment subEnv = new SubscriptionEnvironment(namespace, name, pipeline)
.orElse(environment);
Resource.TemplateFactory templateFactory = new Resource.SimpleTemplateFactory(subEnv);

// For sink resources, also expose hints.
Resource.TemplateFactory sinkTemplateFactory = new Resource.SimpleTemplateFactory(subEnv
.orElse(new Resource.SimpleEnvironment(map(object.getSpec().getHints()))));

// Render resources related to all source tables.
List<String> upstreamResources = pipeline.upstreamResources().stream()
.map(x -> x.render(templateFactory))
.collect(Collectors.toList());

// Render the SQL job
String sqlJob = pipeline.sqlJob().render(templateFactory);

// Render resources related to the sink table. For these resources, we pass along any
// "hints" as part of the environment.
List<String> downstreamResources = pipeline.downstreamResources().stream()
.map(x -> x.render(sinkTemplateFactory))
.collect(Collectors.toList());

List<String> combined = new ArrayList<>();
combined.addAll(upstreamResources);
combined.add(sqlJob);
combined.addAll(downstreamResources);

status.setResources(combined);

status.setSql(object.getSpec().getSql());
status.setReady(null); // null indicates that pipeline needs to be deployed
status.setFailed(null);
status.setMessage("Planned.");
} catch (Exception e) {
log.error("Encountered error when planning a pipeline for {}/{} with SQL `{}`.", kind, name,
object.getSpec().getSql(), e);

// Mark the Subscription as failed.
status.setFailed(true);
status.setMessage("Error: " + e.getMessage());
}
} else if (status.getReady() == null && status.getResources() != null) {
// Phase 2
log.info("Deploying pipeline for {}/{}...", kind, name);
Expand All @@ -128,6 +138,7 @@ public Result reconcile(Request request) {

if (deployed) {
status.setReady(false);
status.setFailed(false);
status.setMessage("Deployed.");
} else {
return new Result(true, operator.failureRetryDuration());
Expand All @@ -140,11 +151,13 @@ public Result reconcile(Request request) {

if (ready) {
status.setReady(true);
status.setFailed(false);
status.setMessage("Ready.");
log.info("{}/{} is ready.", kind, name);
result = new Result(false);
} else {
status.setReady(false);
status.setFailed(false);
status.setMessage("Deployed.");
log.info("Pipeline for {}/{} is NOT ready.", kind, name);
}
Expand Down Expand Up @@ -277,10 +290,6 @@ public static Controller controller(Operator operator, HoptimatorPlanner.Factory
.withReconciler(reconciler)
.withName("subscription-controller")
.withWorkerCount(1)
//.withReadyFunc(resourceInformer::hasSynced) // optional, only starts controller when the
// cache has synced up
//.withWorkQueue(resourceWorkQueue)
//.watch()
.watch(x -> ControllerBuilder.controllerWatchBuilder(V1alpha1Subscription.class, x).build())
.build();
}
Expand Down

0 comments on commit f81e6ac

Please sign in to comment.