diff --git a/agent/src/main/java/com/criteo/hadoop/garmadon/agent/headers/ContainerHeader.java b/agent/src/main/java/com/criteo/hadoop/garmadon/agent/headers/ContainerHeader.java index 60babf71..762c3a29 100644 --- a/agent/src/main/java/com/criteo/hadoop/garmadon/agent/headers/ContainerHeader.java +++ b/agent/src/main/java/com/criteo/hadoop/garmadon/agent/headers/ContainerHeader.java @@ -1,5 +1,6 @@ package com.criteo.hadoop.garmadon.agent.headers; +import com.criteo.hadoop.garmadon.jvm.utils.FlinkRuntime; import com.criteo.hadoop.garmadon.jvm.utils.JavaRuntime; import com.criteo.hadoop.garmadon.jvm.utils.SparkRuntime; import com.criteo.hadoop.garmadon.schema.enums.Component; @@ -80,11 +81,13 @@ private void setFrameworkComponent() { case "org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint": case "org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint": framework = Framework.FLINK; + frameworkVersion = FlinkRuntime.getVersion(); component = Component.APP_MASTER; break; case "org.apache.flink.yarn.YarnTaskManager": case "org.apache.flink.yarn.YarnTaskExecutorRunner": framework = Framework.FLINK; + frameworkVersion = FlinkRuntime.getVersion(); component = Component.TASK_MANAGER; break; // YARN diff --git a/jvm-statistics/pom.xml b/jvm-statistics/pom.xml index 452fea03..5cb1ba5b 100644 --- a/jvm-statistics/pom.xml +++ b/jvm-statistics/pom.xml @@ -63,5 +63,11 @@ 3.2.1 test + + org.apache.flink + flink-runtime_2.12 + ${flink.version} + test + diff --git a/jvm-statistics/src/main/java/com/criteo/hadoop/garmadon/jvm/utils/FlinkRuntime.java b/jvm-statistics/src/main/java/com/criteo/hadoop/garmadon/jvm/utils/FlinkRuntime.java new file mode 100644 index 00000000..0e2053e9 --- /dev/null +++ b/jvm-statistics/src/main/java/com/criteo/hadoop/garmadon/jvm/utils/FlinkRuntime.java @@ -0,0 +1,24 @@ +package com.criteo.hadoop.garmadon.jvm.utils; + +import java.lang.reflect.Method; + +public final class FlinkRuntime { + private static final String VERSION = computeVersion(); + + private FlinkRuntime() { + } + + public static String getVersion() { + return VERSION; + } + + static String computeVersion() { + try { + Class clazz = Class.forName("org.apache.flink.runtime.util.EnvironmentInformation"); + Method versionMethod = clazz.getDeclaredMethod("getVersion"); + return (String) versionMethod.invoke(null); + } catch (Throwable e) { + return "unknown"; + } + } +} diff --git a/jvm-statistics/src/test/java/com/criteo/hadoop/garmadon/jvm/utils/FlinkRuntimeTest.java b/jvm-statistics/src/test/java/com/criteo/hadoop/garmadon/jvm/utils/FlinkRuntimeTest.java new file mode 100644 index 00000000..f748e7e5 --- /dev/null +++ b/jvm-statistics/src/test/java/com/criteo/hadoop/garmadon/jvm/utils/FlinkRuntimeTest.java @@ -0,0 +1,13 @@ +package com.criteo.hadoop.garmadon.jvm.utils; + +import junit.framework.TestCase; + +import static org.assertj.core.api.Assertions.assertThat; + +public class FlinkRuntimeTest extends TestCase { + + public void test_get_version() { + assertThat(FlinkRuntime.getVersion()).isEqualTo("1.9.3"); + } + +} diff --git a/pom.xml b/pom.xml index 4881c12f..4059f416 100644 --- a/pom.xml +++ b/pom.xml @@ -106,7 +106,7 @@ 1.0.0 2.9.9 2.6 - 1.6.4 + 1.9.3 0.217 315 3.11.1