From 3d8584ce0d4fe0c6919639e2c3deef46a874dc8e Mon Sep 17 00:00:00 2001
From: john-wiens
Date: Mon, 3 Feb 2025 12:42:14 -0700
Subject: [PATCH 1/4] Adding in Rocks DB Memory Management
---
Dockerfile | 21 ++-
docker-compose.yml | 11 +-
.../deduplicator/DeduplicatorProperties.java | 8 +-
.../BoundedMemoryRocksDBConfig.java | 144 ++++++++++++++++++
.../ProcessedMapDeduplicatorTopologyTest.java | 1 -
sample.env | 14 +-
6 files changed, 189 insertions(+), 10 deletions(-)
create mode 100644 jpo-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/deduplicator/BoundedMemoryRocksDBConfig.java
diff --git a/Dockerfile b/Dockerfile
index 2631f79..452d143 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -33,15 +33,24 @@ COPY --from=builder /home/jpo-deduplicator/target/jpo-deduplicator.jar /home
#COPY cert.crt /home/cert.crt
#RUN keytool -import -trustcacerts -keystore /usr/local/openjdk-11/lib/security/cacerts -storepass changeit -noprompt -alias mycert -file cert.crt
+RUN amazon-linux-extras install -y epel && \
+ yum install -y jemalloc-devel
+ENV LD_PRELOAD="/usr/lib64/libjemalloc.so"
+
ENTRYPOINT ["java", \
"-Djava.rmi.server.hostname=$DOCKER_HOST_IP", \
- "-Dcom.sun.management.jmxremote.port=9090", \
- "-Dcom.sun.management.jmxremote.rmi.port=9090", \
- "-Dcom.sun.management.jmxremote", \
- "-Dcom.sun.management.jmxremote.local.only=true", \
- "-Dcom.sun.management.jmxremote.authenticate=false", \
- "-Dcom.sun.management.jmxremote.ssl=false", \
"-Dlogback.configurationFile=/home/logback.xml", \
+ "-Xmx128M", \
+ "-Xms16M", \
+ "-XX:+UseG1GC", \
+ "-XX:MaxGCPauseMillis=20", \
+ "-XX:InitiatingHeapOccupancyPercent=35", \
+ "-XX:MetaspaceSize=96m", \
+ "-XX:MinMetaspaceFreeRatio=50", \
+ "-XX:MaxMetaspaceFreeRatio=80", \
+ "-XX:+ExplicitGCInvokesConcurrent", \
+ "-XX:InitialRAMPercentage=5.0", \
+ "-XX:MaxRAMPercentage=50.0", \
"-jar", \
"/home/jpo-deduplicator.jar"]
diff --git a/docker-compose.yml b/docker-compose.yml
index a81f59d..d65ca5a 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -13,7 +13,10 @@ services:
MAVEN_GITHUB_TOKEN: ${MAVEN_GITHUB_TOKEN}
MAVEN_GITHUB_ORG: ${MAVEN_GITHUB_ORG}
image: jpo-deduplicator:latest
+ privileged: false # Set true to allow writing to /proc/sys/vm/drop_caches
restart: ${RESTART_POLICY}
+ ports:
+ - "10091:10090" # JMX
environment:
DOCKER_HOST_IP: ${DOCKER_HOST_IP}
KAFKA_BOOTSTRAP_SERVERS: ${KAFKA_BOOTSTRAP_SERVERS}
@@ -25,6 +28,12 @@ services:
ENABLE_ODE_RAW_ENCODED_TIM_DEDUPLICATION: ${ENABLE_ODE_RAW_ENCODED_TIM_DEDUPLICATION}
ENABLE_PROCESSED_SPAT_DEDUPLICATION: ${ENABLE_PROCESSED_SPAT_DEDUPLICATION}
ENABLE_ODE_BSM_DEDUPLICATION: ${ENABLE_ODE_BSM_DEDUPLICATION}
+ DEDUPLICATOR_ROCKSDB_TOTAL_OFF_HEAP_MEMORY: ${DEDUPLICATOR_ROCKSDB_TOTAL_OFF_HEAP_MEMORY:?error}
+ DEDUPLICATOR_ROCKSDB_INDEX_FILTER_BLOCK_RATIO: ${DEDUPLICATOR_ROCKSDB_INDEX_FILTER_BLOCK_RATIO:?error}
+ DEDUPLICATOR_ROCKSDB_TOTAL_MEMTABLE_MEMORY: ${DEDUPLICATOR_ROCKSDB_TOTAL_MEMTABLE_MEMORY:?error}
+ DEDUPLICATOR_ROCKSDB_BLOCK_SIZE: ${DEDUPLICATOR_ROCKSDB_BLOCK_SIZE:?error}
+ DEDUPLICATOR_ROCKSDB_N_MEMTABLES: ${DEDUPLICATOR_ROCKSDB_N_MEMTABLES:?error}
+ DEDUPLICATOR_ROCKSDB_MEMTABLE_SIZE: ${DEDUPLICATOR_ROCKSDB_MEMTABLE_SIZE:?error}
KAFKA_TYPE: ${KAFKA_TYPE}
CONFLUENT_KEY: ${CONFLUENT_KEY}
CONFLUENT_SECRET: ${CONFLUENT_SECRET}
@@ -40,7 +49,7 @@ services:
deploy:
resources:
limits:
- memory: 3G
+ memory: 1G
depends_on:
kafka:
condition: service_healthy
diff --git a/jpo-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/DeduplicatorProperties.java b/jpo-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/DeduplicatorProperties.java
index c5ca126..8f90068 100644
--- a/jpo-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/DeduplicatorProperties.java
+++ b/jpo-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/DeduplicatorProperties.java
@@ -41,6 +41,7 @@
import lombok.Setter;
import lombok.AccessLevel;
import us.dot.its.jpo.conflictmonitor.AlwaysContinueProductionExceptionHandler;
+import us.dot.its.jpo.deduplicator.deduplicator.BoundedMemoryRocksDBConfig;
import us.dot.its.jpo.ode.eventlog.EventLogger;
import us.dot.its.jpo.ode.util.CommonUtils;
@@ -239,6 +240,9 @@ public Properties createStreamProperties(String name) {
// Reduce cache buffering per topology to 1MB
streamProps.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 1 * 1024 * 1024L);
+ // Optionally, to disable caching:
+ //streamProps.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+
// Decrease default commit interval. Default for 'at least once' mode of 30000ms
// is too slow.
@@ -280,8 +284,10 @@ public Properties createStreamProperties(String name) {
else {
logger.error("Environment variables CONFLUENT_KEY and CONFLUENT_SECRET are not set. Set these in the .env file to use Confluent Cloud");
}
- }
+ }
+ // Configure RocksDB memory usage
+ streamProps.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, BoundedMemoryRocksDBConfig.class);
return streamProps;
}
diff --git a/jpo-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/deduplicator/BoundedMemoryRocksDBConfig.java b/jpo-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/deduplicator/BoundedMemoryRocksDBConfig.java
new file mode 100644
index 0000000..c0d5b17
--- /dev/null
+++ b/jpo-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/deduplicator/BoundedMemoryRocksDBConfig.java
@@ -0,0 +1,144 @@
+package us.dot.its.jpo.deduplicator.deduplicator;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.streams.state.RocksDBConfigSetter;
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.CompressionType;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import java.util.Map;
+
+/**
+ * Bounded memory configuration for RocksDB for all topologies.
+ * References:
+ *
+ * Configured using environment variables:
+ *
+ * - DEDUPLICATOR_ROCKSDB_TOTAL_OFF_HEAP_MEMORY
- Total block cache size
+ * - DEDUPLICATOR_ROCKSDB_INDEX_FILTER_BLOCK_RATIO
- Fraction of the block cache to use for high priority blocks (index
+ * and filter blocks).
+ * - DEDUPLICATOR_ROCKSDB_TOTAL_MEMTABLE_MEMORY
- Write buffer size, include in block cache
+ * - DEDUPLICATOR_ROCKSDB_BLOCK_SIZE
- {@link org.rocksdb.BlockBasedTableConfig#blockSize()}, Default 4KB
+ * - DEDUPLICATOR_ROCKSDB_N_MEMTABLES
- {@link org.rocksdb.Options#maxWriteBufferNumber()}, Default 2
+ * - DEDUPLICATOR_ROCKSDB_MEMTABLE_SIZE
- {@link org.rocksdb.Options#writeBufferSize()}, Default 64MB
+ *
+ *
+ */
+
+@Slf4j
+public class BoundedMemoryRocksDBConfig implements RocksDBConfigSetter {
+
+ private final static long TOTAL_OFF_HEAP_MEMORY;
+ private final static double INDEX_FILTER_BLOCK_RATIO;
+ private final static long TOTAL_MEMTABLE_MEMORY;
+
+ // Block size: Default 4K
+ private final static long BLOCK_SIZE;
+
+ // MaxWriteBufferNumber: Default 2
+ private final static int N_MEMTABLES;
+
+ // WriteBufferSize: Default 64MB
+ private final static long MEMTABLE_SIZE;
+ private final static long KB = 1024L;
+ private final static long MB = KB * KB;
+
+ static {
+ RocksDB.loadLibrary();
+ // Initialize properties from env variables
+ TOTAL_OFF_HEAP_MEMORY = getEnvLong("DEDUPLICATOR_ROCKSDB_TOTAL_OFF_HEAP_MEMORY", 128 * MB);
+ INDEX_FILTER_BLOCK_RATIO = getEnvDouble("DEDUPLICATOR_ROCKSDB_INDEX_FILTER_BLOCK_RATIO", 0.1);
+ TOTAL_MEMTABLE_MEMORY = getEnvLong("DEDUPLICATOR_ROCKSDB_TOTAL_MEMTABLE_MEMORY", 64 * MB);
+ BLOCK_SIZE = getEnvLong("DEDUPLICATOR_ROCKSDB_BLOCK_SIZE", 4 * KB);
+ N_MEMTABLES = getEnvInt("DEDUPLICATOR_ROCKSDB_N_MEMTABLES", 2);
+ MEMTABLE_SIZE = getEnvLong("ROCKSDB_MEMTABLE_SIZE", 16 * MB);
+
+ log.info("Initialized BoundedMemoryRocksDBConfig. TOTAL_OFF_HEAP_MEMORY = {}, INDEX_FILTER_BLOCK_RATIO = {}," +
+ " TOTAL_MEMTABLE_MEMORY = {}, BLOCK_SIZE = {}, N_MEMTABLES = {}, MEMTABLE_SIZE = {}",
+ TOTAL_OFF_HEAP_MEMORY, INDEX_FILTER_BLOCK_RATIO, TOTAL_MEMTABLE_MEMORY, BLOCK_SIZE, N_MEMTABLES,
+ MEMTABLE_SIZE);
+ }
+
+ // See #1 below
+ private static final org.rocksdb.Cache cache
+ = new org.rocksdb.LRUCache(TOTAL_OFF_HEAP_MEMORY, -1, false, INDEX_FILTER_BLOCK_RATIO);
+ private static final org.rocksdb.WriteBufferManager writeBufferManager
+ = new org.rocksdb.WriteBufferManager(TOTAL_MEMTABLE_MEMORY, cache);
+ @Override
+ public void setConfig(final String storeName, final Options options, final Map configs) {
+ log.info("Setting RocksDB config for store {}", storeName);
+ BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();
+
+ // These three options in combination will limit the memory used by RocksDB to the size passed to the block
+ // cache (TOTAL_OFF_HEAP_MEMORY)
+ tableConfig.setBlockCache(cache);
+ tableConfig.setCacheIndexAndFilterBlocks(true);
+ options.setWriteBufferManager(writeBufferManager);
+
+ // These options are recommended to be set when bounding the total memory
+ // See #2 below
+ tableConfig.setCacheIndexAndFilterBlocksWithHighPriority(true);
+ tableConfig.setPinTopLevelIndexAndFilter(true);
+
+ // See #3 below
+ tableConfig.setBlockSize(BLOCK_SIZE);
+ options.setMaxWriteBufferNumber(N_MEMTABLES);
+ options.setWriteBufferSize(MEMTABLE_SIZE);
+
+ // Enable compression (optional). Compression can decrease the required storage
+ // and increase the CPU usage of the machine. For CompressionType values, see
+ // https://javadoc.io/static/org.rocksdb/rocksdbjni/6.4.6/org/rocksdb/CompressionType.html.
+ options.setCompressionType(CompressionType.LZ4_COMPRESSION);
+ options.setTableFormatConfig(tableConfig);
+ log.info("Rocksdb set table config: {}, options: {}", tableConfig, options);
+ }
+ @Override
+ public void close(final String storeName, final Options options) {
+ // Cache and WriteBufferManager should not be closed here, as the same objects are shared by every store instance.
+ }
+
+ private static long getEnvLong(String name, long defaultValue) {
+ String strValue = getEnvString(name);
+ if (strValue == null) return defaultValue;
+
+ try {
+ return Long.parseLong(strValue);
+ } catch (NumberFormatException nfe) {
+ log.error("Error parsing env variable to long {}, {}", name, strValue, nfe);
+ return defaultValue;
+ }
+ }
+
+ private static int getEnvInt(String name, int defaultValue) {
+ String strValue = getEnvString(name);
+ if (strValue == null) return defaultValue;
+ try {
+ return Integer.parseInt(strValue);
+ } catch (NumberFormatException nfe) {
+ log.error("Error parsing env variable to long {}, {}", name, strValue, nfe);
+ return defaultValue;
+ }
+ }
+
+ private static double getEnvDouble(String name, double defaultValue) {
+ String strValue = getEnvString(name);
+ if (strValue == null) return defaultValue;
+ try {
+ return Double.parseDouble(strValue);
+ } catch (NumberFormatException nfe) {
+ log.error("Error parsing env variable to long {}, {}", name, strValue, nfe);
+ return defaultValue;
+ }
+ }
+
+ private static String getEnvString(String name) {
+ String strValue = System.getenv(name);
+ if (strValue == null) {
+ log.warn("Env variable {} is not set", name);
+ }
+ return strValue;
+ }
+}
\ No newline at end of file
diff --git a/jpo-deduplicator/src/test/java/deduplicator/ProcessedMapDeduplicatorTopologyTest.java b/jpo-deduplicator/src/test/java/deduplicator/ProcessedMapDeduplicatorTopologyTest.java
index b6c7466..6868cf0 100644
--- a/jpo-deduplicator/src/test/java/deduplicator/ProcessedMapDeduplicatorTopologyTest.java
+++ b/jpo-deduplicator/src/test/java/deduplicator/ProcessedMapDeduplicatorTopologyTest.java
@@ -28,7 +28,6 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
-import java.util.List;
public class ProcessedMapDeduplicatorTopologyTest {
diff --git a/sample.env b/sample.env
index cf0a681..912cfb5 100644
--- a/sample.env
+++ b/sample.env
@@ -47,4 +47,16 @@ ENABLE_ODE_MAP_DEDUPLICATION=true
ENABLE_ODE_TIM_DEDUPLICATION=true
ENABLE_ODE_RAW_ENCODED_TIM_DEDUPLICATION=true
ENABLE_PROCESSED_SPAT_DEDUPLICATION=true
-ENABLE_ODE_BSM_DEDUPLICATION=true
\ No newline at end of file
+ENABLE_ODE_BSM_DEDUPLICATION=true
+
+
+# RocksDB Bounded Memory Config Properties
+# 128 MB = 134217728
+# 64 MB = 67108864
+# 16 MB = 16777216
+DEDUPLICATOR_ROCKSDB_TOTAL_OFF_HEAP_MEMORY=134217728
+DEDUPLICATOR_ROCKSDB_INDEX_FILTER_BLOCK_RATIO=0.1
+DEDUPLICATOR_ROCKSDB_TOTAL_MEMTABLE_MEMORY=67108864
+DEDUPLICATOR_ROCKSDB_BLOCK_SIZE=4096
+DEDUPLICATOR_ROCKSDB_N_MEMTABLES=2
+DEDUPLICATOR_ROCKSDB_MEMTABLE_SIZE=16777216
\ No newline at end of file
From 7bce34eda34ffd199fe5cdcf7dc3fafe85da8585 Mon Sep 17 00:00:00 2001
From: john-wiens
Date: Tue, 4 Feb 2025 11:18:02 -0700
Subject: [PATCH 2/4] Tuned Java Memory Allocation
---
Dockerfile | 6 +++---
.../deduplicator/BoundedMemoryRocksDBConfig.java | 2 +-
2 files changed, 4 insertions(+), 4 deletions(-)
diff --git a/Dockerfile b/Dockerfile
index 452d143..66e83d0 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -40,8 +40,8 @@ ENV LD_PRELOAD="/usr/lib64/libjemalloc.so"
ENTRYPOINT ["java", \
"-Djava.rmi.server.hostname=$DOCKER_HOST_IP", \
"-Dlogback.configurationFile=/home/logback.xml", \
- "-Xmx128M", \
- "-Xms16M", \
+ "-Xmx1024M", \
+ "-Xms128M", \
"-XX:+UseG1GC", \
"-XX:MaxGCPauseMillis=20", \
"-XX:InitiatingHeapOccupancyPercent=35", \
@@ -50,7 +50,7 @@ ENTRYPOINT ["java", \
"-XX:MaxMetaspaceFreeRatio=80", \
"-XX:+ExplicitGCInvokesConcurrent", \
"-XX:InitialRAMPercentage=5.0", \
- "-XX:MaxRAMPercentage=50.0", \
+ # "-XX:MaxRAMPercentage=50.0", \
"-jar", \
"/home/jpo-deduplicator.jar"]
diff --git a/jpo-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/deduplicator/BoundedMemoryRocksDBConfig.java b/jpo-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/deduplicator/BoundedMemoryRocksDBConfig.java
index c0d5b17..2c31ce7 100644
--- a/jpo-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/deduplicator/BoundedMemoryRocksDBConfig.java
+++ b/jpo-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/deduplicator/BoundedMemoryRocksDBConfig.java
@@ -54,7 +54,7 @@ public class BoundedMemoryRocksDBConfig implements RocksDBConfigSetter {
TOTAL_MEMTABLE_MEMORY = getEnvLong("DEDUPLICATOR_ROCKSDB_TOTAL_MEMTABLE_MEMORY", 64 * MB);
BLOCK_SIZE = getEnvLong("DEDUPLICATOR_ROCKSDB_BLOCK_SIZE", 4 * KB);
N_MEMTABLES = getEnvInt("DEDUPLICATOR_ROCKSDB_N_MEMTABLES", 2);
- MEMTABLE_SIZE = getEnvLong("ROCKSDB_MEMTABLE_SIZE", 16 * MB);
+ MEMTABLE_SIZE = getEnvLong("DEDUPLICATOR_ROCKSDB_MEMTABLE_SIZE", 16 * MB);
log.info("Initialized BoundedMemoryRocksDBConfig. TOTAL_OFF_HEAP_MEMORY = {}, INDEX_FILTER_BLOCK_RATIO = {}," +
" TOTAL_MEMTABLE_MEMORY = {}, BLOCK_SIZE = {}, N_MEMTABLES = {}, MEMTABLE_SIZE = {}",
From e08ceb43de7954326c459dbf0db52ef9de1030ec Mon Sep 17 00:00:00 2001
From: john-wiens
Date: Tue, 4 Feb 2025 11:45:46 -0700
Subject: [PATCH 3/4] Removing max ram percentage
---
Dockerfile | 1 -
1 file changed, 1 deletion(-)
diff --git a/Dockerfile b/Dockerfile
index 66e83d0..49e558c 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -50,7 +50,6 @@ ENTRYPOINT ["java", \
"-XX:MaxMetaspaceFreeRatio=80", \
"-XX:+ExplicitGCInvokesConcurrent", \
"-XX:InitialRAMPercentage=5.0", \
- # "-XX:MaxRAMPercentage=50.0", \
"-jar", \
"/home/jpo-deduplicator.jar"]
From cb2e497469f8f43750325c02a6faee1934d81022 Mon Sep 17 00:00:00 2001
From: john-wiens
Date: Tue, 4 Feb 2025 11:56:01 -0700
Subject: [PATCH 4/4] Removing Errors from docker-compose variables
---
docker-compose.yml | 12 ++++++------
1 file changed, 6 insertions(+), 6 deletions(-)
diff --git a/docker-compose.yml b/docker-compose.yml
index d65ca5a..bee6284 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -28,12 +28,12 @@ services:
ENABLE_ODE_RAW_ENCODED_TIM_DEDUPLICATION: ${ENABLE_ODE_RAW_ENCODED_TIM_DEDUPLICATION}
ENABLE_PROCESSED_SPAT_DEDUPLICATION: ${ENABLE_PROCESSED_SPAT_DEDUPLICATION}
ENABLE_ODE_BSM_DEDUPLICATION: ${ENABLE_ODE_BSM_DEDUPLICATION}
- DEDUPLICATOR_ROCKSDB_TOTAL_OFF_HEAP_MEMORY: ${DEDUPLICATOR_ROCKSDB_TOTAL_OFF_HEAP_MEMORY:?error}
- DEDUPLICATOR_ROCKSDB_INDEX_FILTER_BLOCK_RATIO: ${DEDUPLICATOR_ROCKSDB_INDEX_FILTER_BLOCK_RATIO:?error}
- DEDUPLICATOR_ROCKSDB_TOTAL_MEMTABLE_MEMORY: ${DEDUPLICATOR_ROCKSDB_TOTAL_MEMTABLE_MEMORY:?error}
- DEDUPLICATOR_ROCKSDB_BLOCK_SIZE: ${DEDUPLICATOR_ROCKSDB_BLOCK_SIZE:?error}
- DEDUPLICATOR_ROCKSDB_N_MEMTABLES: ${DEDUPLICATOR_ROCKSDB_N_MEMTABLES:?error}
- DEDUPLICATOR_ROCKSDB_MEMTABLE_SIZE: ${DEDUPLICATOR_ROCKSDB_MEMTABLE_SIZE:?error}
+ DEDUPLICATOR_ROCKSDB_TOTAL_OFF_HEAP_MEMORY: ${DEDUPLICATOR_ROCKSDB_TOTAL_OFF_HEAP_MEMORY}
+ DEDUPLICATOR_ROCKSDB_INDEX_FILTER_BLOCK_RATIO: ${DEDUPLICATOR_ROCKSDB_INDEX_FILTER_BLOCK_RATIO}
+ DEDUPLICATOR_ROCKSDB_TOTAL_MEMTABLE_MEMORY: ${DEDUPLICATOR_ROCKSDB_TOTAL_MEMTABLE_MEMORY}
+ DEDUPLICATOR_ROCKSDB_BLOCK_SIZE: ${DEDUPLICATOR_ROCKSDB_BLOCK_SIZE}
+ DEDUPLICATOR_ROCKSDB_N_MEMTABLES: ${DEDUPLICATOR_ROCKSDB_N_MEMTABLES}
+ DEDUPLICATOR_ROCKSDB_MEMTABLE_SIZE: ${DEDUPLICATOR_ROCKSDB_MEMTABLE_SIZE}
KAFKA_TYPE: ${KAFKA_TYPE}
CONFLUENT_KEY: ${CONFLUENT_KEY}
CONFLUENT_SECRET: ${CONFLUENT_SECRET}