Skip to content

Commit a7e1835

Browse files
authored
Modify deployers to keep state and undo changes on failure (#143)
* Add snapshot/restore behavior * Address feedback & make provider more generic * Add snapshot() to reset providers * Refactor to restore under deployer directly
1 parent 9ac5da6 commit a7e1835

File tree

14 files changed

+442
-68
lines changed

14 files changed

+442
-68
lines changed

hoptimator-api/src/main/java/com/linkedin/hoptimator/Deployer.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,7 @@ public interface Deployer {
1515

1616
/** Render a list of specs, usually YAML. */
1717
List<String> specify() throws SQLException;
18+
19+
/** Deployers are expected to track the state of changes made and revert them on demand. */
20+
void restore();
1821
}

hoptimator-api/src/main/java/com/linkedin/hoptimator/DeployerProvider.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,7 @@ public interface DeployerProvider {
88

99
/** Find deployers capable of deploying the obj. */
1010
<T extends Deployable> Collection<Deployer> deployers(T obj, Properties connectionProperties);
11+
12+
/** A DeployerProvider with lower priority will execute first */
13+
int priority();
1114
}

hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package com.linkedin.hoptimator.jdbc;
2121

2222
import com.linkedin.hoptimator.Database;
23+
import com.linkedin.hoptimator.Deployer;
2324
import com.linkedin.hoptimator.MaterializedView;
2425
import com.linkedin.hoptimator.Pipeline;
2526
import com.linkedin.hoptimator.View;
@@ -30,6 +31,7 @@
3031
import java.sql.SQLException;
3132
import java.util.ArrayList;
3233
import java.util.Arrays;
34+
import java.util.Collection;
3335
import java.util.Collections;
3436
import java.util.List;
3537
import java.util.Objects;
@@ -133,15 +135,21 @@ public void execute(SqlCreateView create, CalcitePrepare.Context context) {
133135
RelProtoDataType protoType = RelDataTypeImpl.proto(analyzed.rowType);
134136
ViewTable viewTable = new ViewTable(Object.class, protoType, sql, schemaPath, viewPath);
135137
View view = new View(viewPath, sql);
138+
139+
Collection<Deployer> deployers = null;
136140
try {
141+
deployers = DeploymentService.deployers(view, connectionProperties);
137142
ValidationService.validateOrThrow(viewTable);
138143
if (create.getReplace()) {
139-
DeploymentService.update(view, connectionProperties);
144+
DeploymentService.update(deployers);
140145
} else {
141-
DeploymentService.create(view, connectionProperties);
146+
DeploymentService.create(deployers);
142147
}
143148
schemaPlus.add(viewName, viewTable);
144149
} catch (Exception e) {
150+
if (deployers != null) {
151+
DeploymentService.restore(deployers);
152+
}
145153
throw new DdlException(create, e.getMessage(), e);
146154
}
147155
}
@@ -185,6 +193,8 @@ public void execute(SqlCreateMaterializedView create, CalcitePrepare.Context con
185193
final String sql = q.toSqlString(CalciteSqlDialect.DEFAULT).getSql();
186194
final List<String> schemaPath = pair.left.path(null);
187195

196+
Collection<Deployer> deployers = null;
197+
188198
String schemaName = schemaPlus.getName();
189199
String viewName = pair.right;
190200
List<String> viewPath = new ArrayList<>(schemaPath);
@@ -235,14 +245,18 @@ public void execute(SqlCreateMaterializedView create, CalcitePrepare.Context con
235245
// TODO support CREATE ... WITH (options...)
236246
ValidationService.validateOrThrow(hook);
237247

248+
deployers = DeploymentService.deployers(hook, connectionProperties);
238249
if (create.getReplace()) {
239-
DeploymentService.update(hook, connectionProperties);
250+
DeploymentService.update(deployers);
240251
} else {
241-
DeploymentService.create(hook, connectionProperties);
252+
DeploymentService.create(deployers);
242253
}
243254

244255
schemaPlus.add(viewName, materializedViewTable);
245-
} catch (SQLException e) {
256+
} catch (SQLException | RuntimeException e) {
257+
if (deployers != null) {
258+
DeploymentService.restore(deployers);
259+
}
246260
throw new DdlException(create, e.getMessage(), e);
247261
}
248262
}
@@ -275,19 +289,25 @@ public void execute(SqlDropObject drop, CalcitePrepare.Context context) {
275289
List<String> viewPath = new ArrayList<>(schemaPath);
276290
viewPath.add(viewName);
277291

292+
Collection<Deployer> deployers = null;
278293
try {
279294
if (table instanceof MaterializedViewTable) {
280295
MaterializedViewTable materializedViewTable = (MaterializedViewTable) table;
281296
View view = new View(viewPath, materializedViewTable.viewSql());
282-
DeploymentService.delete(view, connectionProperties);
297+
deployers = DeploymentService.deployers(view, connectionProperties);
298+
DeploymentService.delete(deployers);
283299
} else if (table instanceof ViewTable) {
284300
ViewTable viewTable = (ViewTable) table;
285301
View view = new View(viewPath, viewTable.getViewSql());
286-
DeploymentService.delete(view, connectionProperties);
302+
deployers = DeploymentService.deployers(view, connectionProperties);
303+
DeploymentService.delete(deployers);
287304
} else {
288305
throw new DdlException(drop, viewName + " is not a view.");
289306
}
290307
} catch (Exception e) {
308+
if (deployers != null) {
309+
DeploymentService.restore(deployers);
310+
}
291311
throw new DdlException(drop, e.getMessage(), e);
292312
}
293313
schemaPlus.removeTable(viewName);

hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sDeployer.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,42 +16,62 @@ public abstract class K8sDeployer<T extends KubernetesObject, U extends Kubernet
1616
implements Deployer {
1717

1818
private final K8sApi<T, U> api;
19+
private final K8sSnapshot snapshot;
1920

2021
K8sDeployer(K8sContext context, K8sApiEndpoint<T, U> endpoint) {
2122
this.api = new K8sApi<>(context, endpoint);
23+
this.snapshot = new K8sSnapshot(context);
2224
}
2325

2426
@Override
2527
public void create() throws SQLException {
26-
api.create(toK8sObject());
28+
create(toK8sObject());
2729
}
2830

2931
public V1OwnerReference createAndReference() throws SQLException {
3032
T obj = toK8sObject();
31-
api.create(obj);
33+
create(obj);
3234
return api.reference(obj);
3335
}
3436

37+
private void create(T obj) throws SQLException {
38+
snapshot.store(obj);
39+
api.create(obj);
40+
}
41+
3542
@Override
3643
public void delete() throws SQLException {
37-
api.delete(toK8sObject());
44+
T obj = toK8sObject();
45+
snapshot.store(obj);
46+
api.delete(obj);
3847
}
3948

4049
@Override
4150
public void update() throws SQLException {
42-
api.update(toK8sObject());
51+
update(toK8sObject());
4352
}
4453

4554
public V1OwnerReference updateAndReference() throws SQLException {
4655
T obj = toK8sObject();
47-
api.update(obj);
56+
update(obj);
4857
return api.reference(obj);
4958
}
5059

60+
private void update(T obj) throws SQLException {
61+
snapshot.store(obj);
62+
api.update(obj);
63+
}
64+
5165
@Override
5266
public List<String> specify() throws SQLException {
5367
return Collections.singletonList(Yaml.dump(toK8sObject()));
5468
}
5569

70+
@Override
71+
public void restore() {
72+
snapshot.restore();
73+
}
74+
75+
5676
protected abstract T toK8sObject() throws SQLException;
5777
}

hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sDeployerProvider.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,9 @@ public <T extends Deployable> Collection<Deployer> deployers(T obj, Properties c
3232

3333
return list;
3434
}
35+
36+
@Override
37+
public int priority() {
38+
return 1;
39+
}
3540
}

hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sMaterializedViewDeployer.java

Lines changed: 48 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,39 +20,71 @@ class K8sMaterializedViewDeployer implements Deployer {
2020
private final MaterializedView view;
2121
private final K8sContext context;
2222
private final Properties connectionProperties;
23+
private final K8sViewDeployer viewDeployer;
24+
private final List<Deployer> deployers;
25+
26+
private final Object crudLock = new Object();
2327

2428
K8sMaterializedViewDeployer(MaterializedView view, K8sContext context, Properties connectionProperties) {
2529
this.view = view;
2630
this.context = context;
2731
this.connectionProperties = connectionProperties;
32+
this.viewDeployer = new K8sViewDeployer(view, true, context);
33+
this.deployers = new ArrayList<>();
2834
}
2935

3036
@Override
3137
public void create() throws SQLException {
32-
String name = name();
33-
List<String> pipelineSpecs = pipelineSpecs();
34-
V1OwnerReference viewRef = new K8sViewDeployer(view, true, context).createAndReference();
35-
K8sContext viewContext = context.withOwner(viewRef);
36-
V1OwnerReference pipelineRef = new K8sPipelineDeployer(name, pipelineSpecs, sql(), viewContext).createAndReference();
37-
K8sContext pipelineContext = viewContext.withLabel("pipeline", name).withOwner(pipelineRef);
38-
new K8sYamlDeployerImpl(pipelineContext, pipelineSpecs).update(); // update, cuz some elements may already exist
38+
synchronized (crudLock) {
39+
String name = name();
40+
List<String> pipelineSpecs = pipelineSpecs();
41+
V1OwnerReference viewRef = viewDeployer.createAndReference();
42+
K8sContext viewContext = context.withOwner(viewRef);
43+
K8sPipelineDeployer pipelineDeployer = new K8sPipelineDeployer(name, pipelineSpecs, sql(), viewContext);
44+
deployers.add(pipelineDeployer);
45+
V1OwnerReference pipelineRef = pipelineDeployer.createAndReference();
46+
K8sContext pipelineContext = viewContext.withLabel("pipeline", name).withOwner(pipelineRef);
47+
K8sYamlDeployerImpl yamlDeployer = new K8sYamlDeployerImpl(pipelineContext, pipelineSpecs);
48+
deployers.add(yamlDeployer);
49+
yamlDeployer.update(); // update, cuz some elements may already exist
50+
}
3951
}
4052

4153
@Override
4254
public void update() throws SQLException {
43-
String name = name();
44-
List<String> pipelineSpecs = pipelineSpecs();
45-
V1OwnerReference viewRef = new K8sViewDeployer(view, true, context).updateAndReference();
46-
K8sContext viewContext = context.withOwner(viewRef);
47-
V1OwnerReference pipelineRef = new K8sPipelineDeployer(name, pipelineSpecs, sql(), viewContext).updateAndReference();
48-
K8sContext pipelineContext = viewContext.withLabel("pipeline", name).withOwner(pipelineRef);
49-
new K8sYamlDeployerImpl(pipelineContext, pipelineSpecs).update();
55+
synchronized (crudLock) {
56+
String name = name();
57+
List<String> pipelineSpecs = pipelineSpecs();
58+
V1OwnerReference viewRef = viewDeployer.updateAndReference();
59+
K8sContext viewContext = context.withOwner(viewRef);
60+
K8sPipelineDeployer pipelineDeployer = new K8sPipelineDeployer(name, pipelineSpecs, sql(), viewContext);
61+
deployers.add(pipelineDeployer);
62+
V1OwnerReference pipelineRef = pipelineDeployer.updateAndReference();
63+
K8sContext pipelineContext = viewContext.withLabel("pipeline", name).withOwner(pipelineRef);
64+
K8sYamlDeployerImpl yamlDeployer = new K8sYamlDeployerImpl(pipelineContext, pipelineSpecs);
65+
deployers.add(yamlDeployer);
66+
yamlDeployer.update();
67+
}
5068
}
5169

5270
@Override
5371
public void delete() throws SQLException {
54-
// Delete will cascade to any owned objects, including the Pipeline object.
55-
new K8sViewDeployer(view, true, context).delete();
72+
synchronized (crudLock) {
73+
// Delete will cascade to any owned objects, including the Pipeline object.
74+
viewDeployer.delete();
75+
}
76+
}
77+
78+
@Override
79+
public void restore() {
80+
synchronized (crudLock) {
81+
// There may be a scenario where a view creates multiple pipelines.
82+
// Restoring the pipelines in reverse order ensures the original state is preserved.
83+
for (int i = deployers.size() - 1; i >= 0; i--) {
84+
deployers.get(i).restore();
85+
}
86+
viewDeployer.restore();
87+
}
5688
}
5789

5890
List<String> pipelineSpecs() throws SQLException {

0 commit comments

Comments
 (0)