17
17
*/
18
18
package org .apache .beam .runners .flink ;
19
19
20
- import static org .apache .beam .sdk .util .Preconditions .checkStateNotNull ;
21
20
import static org .apache .beam .sdk .util .construction .resources .PipelineResources .detectClassPathResourcesToStage ;
22
21
23
22
import java .util .UUID ;
32
31
import org .apache .beam .vendor .grpc .v1p69p0 .com .google .protobuf .Struct ;
33
32
import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .base .Strings ;
34
33
import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .util .concurrent .ListeningExecutorService ;
34
+ import org .checkerframework .checker .nullness .qual .Nullable ;
35
35
import org .slf4j .Logger ;
36
36
import org .slf4j .LoggerFactory ;
37
37
38
38
/** Job Invoker for the {@link FlinkRunner}. */
39
+ @ SuppressWarnings ({
40
+ "nullness" // TODO(https://github.com/apache/beam/issues/20497)
41
+ })
39
42
public class FlinkJobInvoker extends JobInvoker {
40
43
private static final Logger LOG = LoggerFactory .getLogger (FlinkJobInvoker .class );
41
44
@@ -54,7 +57,7 @@ protected FlinkJobInvoker(FlinkJobServerDriver.FlinkServerConfiguration serverCo
54
57
protected JobInvocation invokeWithExecutor (
55
58
RunnerApi .Pipeline pipeline ,
56
59
Struct options ,
57
- String retrievalToken ,
60
+ @ Nullable String retrievalToken ,
58
61
ListeningExecutorService executorService ) {
59
62
60
63
// TODO: How to make Java/Python agree on names of keys and their values?
@@ -71,22 +74,20 @@ protected JobInvocation invokeWithExecutor(
71
74
72
75
PortablePipelineOptions portableOptions = flinkOptions .as (PortablePipelineOptions .class );
73
76
74
- ClassLoader thisClassLoader =
75
- checkStateNotNull (
76
- FlinkJobInvoker .class .getClassLoader (),
77
- "FlinkJobInvoker class loader is null - this means it was loaded by the bootstrap classloader, which should be impossible" );
78
-
79
77
PortablePipelineRunner pipelineRunner ;
80
78
if (Strings .isNullOrEmpty (portableOptions .getOutputExecutablePath ())) {
81
79
pipelineRunner =
82
80
new FlinkPipelineRunner (
83
81
flinkOptions ,
84
82
serverConfig .getFlinkConfDir (),
85
- detectClassPathResourcesToStage (thisClassLoader , flinkOptions ));
83
+ detectClassPathResourcesToStage (
84
+ FlinkJobInvoker .class .getClassLoader (), flinkOptions ));
86
85
} else {
87
86
pipelineRunner = new PortablePipelineJarCreator (FlinkPipelineRunner .class );
88
87
}
89
88
89
+ flinkOptions .setRunner (null );
90
+
90
91
LOG .info ("Invoking job {} with pipeline runner {}" , invocationId , pipelineRunner );
91
92
return createJobInvocation (
92
93
invocationId , retrievalToken , executorService , pipeline , flinkOptions , pipelineRunner );
0 commit comments