Skip to content

Commit 34a7734

Browse files
[FLINK-20090][rest] Expose slot sharing group info in REST API
1 parent 06fdc01 commit 34a7734

File tree

15 files changed

+231
-1
lines changed

15 files changed

+231
-1
lines changed

flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java

+78
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,13 @@
3737
import org.apache.flink.runtime.client.JobSubmissionException;
3838
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
3939
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
40+
import org.apache.flink.runtime.execution.ExecutionState;
41+
import org.apache.flink.runtime.instance.SlotSharingGroupId;
4042
import org.apache.flink.runtime.io.network.partition.DataSetMetaInfo;
4143
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
4244
import org.apache.flink.runtime.jobgraph.JobGraph;
4345
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
46+
import org.apache.flink.runtime.jobgraph.JobVertexID;
4447
import org.apache.flink.runtime.jobgraph.OperatorID;
4548
import org.apache.flink.runtime.jobmaster.JobResult;
4649
import org.apache.flink.runtime.messages.Acknowledge;
@@ -71,6 +74,7 @@
7174
import org.apache.flink.runtime.rest.messages.JobCancellationHeaders;
7275
import org.apache.flink.runtime.rest.messages.JobCancellationMessageParameters;
7376
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
77+
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
7478
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
7579
import org.apache.flink.runtime.rest.messages.MessageHeaders;
7680
import org.apache.flink.runtime.rest.messages.MessageParameters;
@@ -86,6 +90,8 @@
8690
import org.apache.flink.runtime.rest.messages.dataset.ClusterDataSetIdPathParameter;
8791
import org.apache.flink.runtime.rest.messages.dataset.ClusterDataSetListHeaders;
8892
import org.apache.flink.runtime.rest.messages.dataset.ClusterDataSetListResponseBody;
93+
import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
94+
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
8995
import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders;
9096
import org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody;
9197
import org.apache.flink.runtime.rest.messages.job.JobStatusInfoHeaders;
@@ -96,6 +102,7 @@
96102
import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationMessageParameters;
97103
import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationRequestBody;
98104
import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationResponseBody;
105+
import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
99106
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest;
100107
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusHeaders;
101108
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusMessageParameters;
@@ -1236,6 +1243,58 @@ void testNotShowSuspendedJobStatus() throws Exception {
12361243
}
12371244
}
12381245

1246+
@Test
1247+
void testJobDetailsContainsSlotSharingGroupId() throws Exception {
1248+
final IOMetricsInfo jobVertexMetrics =
1249+
new IOMetricsInfo(0, false, 0, false, 0, false, 0, false, 0, 0, 0);
1250+
SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
1251+
final Collection<JobDetailsInfo.JobVertexDetailsInfo> jobVertexDetailsInfos =
1252+
Collections.singletonList(
1253+
new JobDetailsInfo.JobVertexDetailsInfo(
1254+
new JobVertexID(),
1255+
slotSharingGroupId,
1256+
"jobVertex1",
1257+
2,
1258+
1,
1259+
ExecutionState.RUNNING,
1260+
1,
1261+
2,
1262+
1,
1263+
Collections.singletonMap(ExecutionState.RUNNING, 0),
1264+
jobVertexMetrics));
1265+
final JobDetailsInfo jobDetailsInfo =
1266+
new JobDetailsInfo(
1267+
jobId,
1268+
"foobar",
1269+
false,
1270+
JobStatus.RUNNING,
1271+
1,
1272+
2,
1273+
1,
1274+
2,
1275+
10,
1276+
Collections.singletonMap(JobStatus.RUNNING, 1L),
1277+
jobVertexDetailsInfos,
1278+
Collections.singletonMap(ExecutionState.RUNNING, 1),
1279+
new JobPlanInfo.RawJson("{\"id\":\"1234\"}"));
1280+
final TestJobDetailsInfoHandler jobDetailsInfoHandler =
1281+
new TestJobDetailsInfoHandler(jobDetailsInfo);
1282+
1283+
try (TestRestServerEndpoint restServerEndpoint =
1284+
createRestServerEndpoint(jobDetailsInfoHandler)) {
1285+
try (RestClusterClient<?> restClusterClient =
1286+
createRestClusterClient(restServerEndpoint.getServerAddress().getPort())) {
1287+
final CompletableFuture<JobDetailsInfo> jobDetailsInfoFuture =
1288+
restClusterClient.getJobDetails(jobId);
1289+
Collection<JobDetailsInfo.JobVertexDetailsInfo> jobVertexInfos =
1290+
jobDetailsInfoFuture.get().getJobVertexInfos();
1291+
assertThat(jobVertexInfos).hasSize(1);
1292+
assertThat(jobVertexInfos.iterator().next().getSlotSharingGroupId())
1293+
.isEqualTo(slotSharingGroupId);
1294+
}
1295+
}
1296+
}
1297+
12391298
private class TestClientCoordinationHandler
12401299
extends TestHandler<
12411300
ClientCoordinationRequestBody,
@@ -1402,6 +1461,25 @@ protected CompletableFuture<JobStatusInfo> handleRequest(
14021461
}
14031462
}
14041463

1464+
private class TestJobDetailsInfoHandler
1465+
extends TestHandler<EmptyRequestBody, JobDetailsInfo, JobMessageParameters> {
1466+
1467+
private final JobDetailsInfo jobDetailsInfo;
1468+
1469+
private TestJobDetailsInfoHandler(@Nonnull JobDetailsInfo jobDetailsInfo) {
1470+
super(JobDetailsHeaders.getInstance());
1471+
this.jobDetailsInfo = checkNotNull(jobDetailsInfo);
1472+
}
1473+
1474+
@Override
1475+
protected CompletableFuture<JobDetailsInfo> handleRequest(
1476+
@Nonnull HandlerRequest<EmptyRequestBody> request,
1477+
@Nonnull DispatcherGateway gateway)
1478+
throws RestHandlerException {
1479+
return CompletableFuture.completedFuture(jobDetailsInfo);
1480+
}
1481+
}
1482+
14051483
private abstract class TestHandler<
14061484
R extends RequestBody, P extends ResponseBody, M extends MessageParameters>
14071485
extends AbstractRestHandler<DispatcherGateway, R, P, M> {

flink-runtime-web/src/test/resources/rest_api_v1.snapshot

+3
Original file line numberDiff line numberDiff line change
@@ -899,6 +899,9 @@
899899
"id" : {
900900
"type" : "any"
901901
},
902+
"slotSharingGroupId" : {
903+
"type" : "any"
904+
},
902905
"name" : {
903906
"type" : "string"
904907
},

flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java

+8
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
2222
import org.apache.flink.runtime.execution.ExecutionState;
2323
import org.apache.flink.runtime.jobgraph.JobVertexID;
24+
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
2425

2526
/**
2627
* Common interface for the runtime {@link ExecutionJobVertex} and {@link
@@ -48,6 +49,13 @@ public interface AccessExecutionJobVertex {
4849
*/
4950
int getMaxParallelism();
5051

52+
/**
53+
* Returns the slot sharing group for this job vertex.
54+
*
55+
* @return slot sharing group for this job vertex.
56+
*/
57+
SlotSharingGroup getSlotSharingGroup();
58+
5159
/**
5260
* Returns the resource profile for this job vertex.
5361
*

flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java

+1
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,7 @@ public static ArchivedExecutionGraph createSparseArchivedExecutionGraphWithJobVe
400400
jobVertex.getName(),
401401
parallelismInfo.getParallelism(),
402402
parallelismInfo.getMaxParallelism(),
403+
jobVertex.getSlotSharingGroup(),
403404
ResourceProfile.fromResourceSpec(
404405
jobVertex.getMinResources(), MemorySize.ZERO),
405406
new StringifiedAccumulatorResult[0]);

flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java

+11
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
2222
import org.apache.flink.runtime.execution.ExecutionState;
2323
import org.apache.flink.runtime.jobgraph.JobVertexID;
24+
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
2425

2526
import java.io.Serializable;
2627

@@ -39,6 +40,8 @@ public class ArchivedExecutionJobVertex implements AccessExecutionJobVertex, Ser
3940

4041
private final int maxParallelism;
4142

43+
private final SlotSharingGroup slotSharingGroup;
44+
4245
private final ResourceProfile resourceProfile;
4346

4447
private final StringifiedAccumulatorResult[] archivedUserAccumulators;
@@ -55,6 +58,7 @@ public ArchivedExecutionJobVertex(ExecutionJobVertex jobVertex) {
5558
this.name = jobVertex.getJobVertex().getName();
5659
this.parallelism = jobVertex.getParallelism();
5760
this.maxParallelism = jobVertex.getMaxParallelism();
61+
this.slotSharingGroup = jobVertex.getSlotSharingGroup();
5862
this.resourceProfile = jobVertex.getResourceProfile();
5963
}
6064

@@ -64,13 +68,15 @@ public ArchivedExecutionJobVertex(
6468
String name,
6569
int parallelism,
6670
int maxParallelism,
71+
SlotSharingGroup slotSharingGroup,
6772
ResourceProfile resourceProfile,
6873
StringifiedAccumulatorResult[] archivedUserAccumulators) {
6974
this.taskVertices = taskVertices;
7075
this.id = id;
7176
this.name = name;
7277
this.parallelism = parallelism;
7378
this.maxParallelism = maxParallelism;
79+
this.slotSharingGroup = slotSharingGroup;
7480
this.resourceProfile = resourceProfile;
7581
this.archivedUserAccumulators = archivedUserAccumulators;
7682
}
@@ -94,6 +100,11 @@ public int getMaxParallelism() {
94100
return maxParallelism;
95101
}
96102

103+
@Override
104+
public SlotSharingGroup getSlotSharingGroup() {
105+
return slotSharingGroup;
106+
}
107+
97108
@Override
98109
public ResourceProfile getResourceProfile() {
99110
return resourceProfile;

flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java

+1
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,7 @@ public InputSplitAssigner getSplitAssigner() {
391391
return splitAssigner;
392392
}
393393

394+
@Override
394395
public SlotSharingGroup getSlotSharingGroup() {
395396
return slotSharingGroup;
396397
}

flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupId.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,24 @@
1919
package org.apache.flink.runtime.instance;
2020

2121
import org.apache.flink.util.AbstractID;
22+
import org.apache.flink.util.StringUtils;
2223

2324
public class SlotSharingGroupId extends AbstractID {
2425
private static final long serialVersionUID = 8837647978345422042L;
2526

27+
public SlotSharingGroupId() {
28+
super();
29+
}
30+
2631
public SlotSharingGroupId(long lowerPart, long upperPart) {
2732
super(lowerPart, upperPart);
2833
}
2934

30-
public SlotSharingGroupId() {}
35+
public SlotSharingGroupId(byte[] bytes) {
36+
super(bytes);
37+
}
38+
39+
public static SlotSharingGroupId fromHexString(String hexString) {
40+
return new SlotSharingGroupId(StringUtils.hexStringToByte(hexString));
41+
}
3142
}

flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java

+1
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ private static JobDetailsInfo.JobVertexDetailsInfo createJobVertexDetailsInfo(
230230

231231
return new JobDetailsInfo.JobVertexDetailsInfo(
232232
ejv.getJobVertexId(),
233+
ejv.getSlotSharingGroup().getSlotSharingGroupId(),
233234
ejv.getName(),
234235
ejv.getMaxParallelism(),
235236
ejv.getParallelism(),

flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java

+20
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.flink.api.common.JobID;
2222
import org.apache.flink.api.common.JobStatus;
2323
import org.apache.flink.runtime.execution.ExecutionState;
24+
import org.apache.flink.runtime.instance.SlotSharingGroupId;
2425
import org.apache.flink.runtime.jobgraph.JobVertexID;
2526
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
2627
import org.apache.flink.runtime.rest.messages.ResponseBody;
@@ -29,6 +30,8 @@
2930
import org.apache.flink.runtime.rest.messages.json.JobIDSerializer;
3031
import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer;
3132
import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer;
33+
import org.apache.flink.runtime.rest.messages.json.SlotSharingGroupIDDeserializer;
34+
import org.apache.flink.runtime.rest.messages.json.SlotSharingGroupIDSerializer;
3235
import org.apache.flink.util.Preconditions;
3336

3437
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
@@ -263,6 +266,8 @@ public static final class JobVertexDetailsInfo {
263266

264267
public static final String FIELD_NAME_JOB_VERTEX_ID = "id";
265268

269+
public static final String FIELD_NAME_SLOT_SHARING_GROUP_ID = "slotSharingGroupId";
270+
266271
public static final String FIELD_NAME_JOB_VERTEX_NAME = "name";
267272

268273
public static final String FIELD_NAME_MAX_PARALLELISM = "maxParallelism";
@@ -285,6 +290,10 @@ public static final class JobVertexDetailsInfo {
285290
@JsonSerialize(using = JobVertexIDSerializer.class)
286291
private final JobVertexID jobVertexID;
287292

293+
@JsonProperty(FIELD_NAME_SLOT_SHARING_GROUP_ID)
294+
@JsonSerialize(using = SlotSharingGroupIDSerializer.class)
295+
private final SlotSharingGroupId slotSharingGroupId;
296+
288297
@JsonProperty(FIELD_NAME_JOB_VERTEX_NAME)
289298
private final String name;
290299

@@ -317,6 +326,9 @@ public JobVertexDetailsInfo(
317326
@JsonDeserialize(using = JobVertexIDDeserializer.class)
318327
@JsonProperty(FIELD_NAME_JOB_VERTEX_ID)
319328
JobVertexID jobVertexID,
329+
@JsonDeserialize(using = SlotSharingGroupIDDeserializer.class)
330+
@JsonProperty(FIELD_NAME_SLOT_SHARING_GROUP_ID)
331+
SlotSharingGroupId slotSharingGroupId,
320332
@JsonProperty(FIELD_NAME_JOB_VERTEX_NAME) String name,
321333
@JsonProperty(FIELD_NAME_MAX_PARALLELISM) int maxParallelism,
322334
@JsonProperty(FIELD_NAME_PARALLELISM) int parallelism,
@@ -328,6 +340,7 @@ public JobVertexDetailsInfo(
328340
Map<ExecutionState, Integer> tasksPerState,
329341
@JsonProperty(FIELD_NAME_JOB_VERTEX_METRICS) IOMetricsInfo jobVertexMetrics) {
330342
this.jobVertexID = Preconditions.checkNotNull(jobVertexID);
343+
this.slotSharingGroupId = Preconditions.checkNotNull(slotSharingGroupId);
331344
this.name = Preconditions.checkNotNull(name);
332345
this.maxParallelism = maxParallelism;
333346
this.parallelism = parallelism;
@@ -344,6 +357,11 @@ public JobVertexID getJobVertexID() {
344357
return jobVertexID;
345358
}
346359

360+
@JsonIgnore
361+
public SlotSharingGroupId getSlotSharingGroupId() {
362+
return slotSharingGroupId;
363+
}
364+
347365
@JsonIgnore
348366
public String getName() {
349367
return name;
@@ -404,6 +422,7 @@ public boolean equals(Object o) {
404422
&& endTime == that.endTime
405423
&& duration == that.duration
406424
&& Objects.equals(jobVertexID, that.jobVertexID)
425+
&& Objects.equals(slotSharingGroupId, that.slotSharingGroupId)
407426
&& Objects.equals(name, that.name)
408427
&& executionState == that.executionState
409428
&& Objects.equals(tasksPerState, that.tasksPerState)
@@ -414,6 +433,7 @@ public boolean equals(Object o) {
414433
public int hashCode() {
415434
return Objects.hash(
416435
jobVertexID,
436+
slotSharingGroupId,
417437
name,
418438
maxParallelism,
419439
parallelism,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.runtime.rest.messages.json;
20+
21+
import org.apache.flink.runtime.instance.SlotSharingGroupId;
22+
import org.apache.flink.runtime.jobgraph.JobVertexID;
23+
24+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
25+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
26+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
27+
28+
import java.io.IOException;
29+
30+
/** Jackson deserializer for {@link SlotSharingGroupId}. */
31+
public class SlotSharingGroupIDDeserializer extends StdDeserializer<SlotSharingGroupId> {
32+
33+
private static final long serialVersionUID = -2908308366715321301L;
34+
35+
protected SlotSharingGroupIDDeserializer() {
36+
super(JobVertexID.class);
37+
}
38+
39+
@Override
40+
public SlotSharingGroupId deserialize(JsonParser p, DeserializationContext ctxt)
41+
throws IOException {
42+
return SlotSharingGroupId.fromHexString(p.getValueAsString());
43+
}
44+
}

0 commit comments

Comments
 (0)