diff --git a/Dockerfile b/Dockerfile index 2631f79..49e558c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -33,15 +33,23 @@ COPY --from=builder /home/jpo-deduplicator/target/jpo-deduplicator.jar /home #COPY cert.crt /home/cert.crt #RUN keytool -import -trustcacerts -keystore /usr/local/openjdk-11/lib/security/cacerts -storepass changeit -noprompt -alias mycert -file cert.crt +RUN amazon-linux-extras install -y epel && \ + yum install -y jemalloc-devel +ENV LD_PRELOAD="/usr/lib64/libjemalloc.so" + ENTRYPOINT ["java", \ "-Djava.rmi.server.hostname=$DOCKER_HOST_IP", \ - "-Dcom.sun.management.jmxremote.port=9090", \ - "-Dcom.sun.management.jmxremote.rmi.port=9090", \ - "-Dcom.sun.management.jmxremote", \ - "-Dcom.sun.management.jmxremote.local.only=true", \ - "-Dcom.sun.management.jmxremote.authenticate=false", \ - "-Dcom.sun.management.jmxremote.ssl=false", \ "-Dlogback.configurationFile=/home/logback.xml", \ + "-Xmx1024M", \ + "-Xms128M", \ + "-XX:+UseG1GC", \ + "-XX:MaxGCPauseMillis=20", \ + "-XX:InitiatingHeapOccupancyPercent=35", \ + "-XX:MetaspaceSize=96m", \ + "-XX:MinMetaspaceFreeRatio=50", \ + "-XX:MaxMetaspaceFreeRatio=80", \ + "-XX:+ExplicitGCInvokesConcurrent", \ + "-XX:InitialRAMPercentage=5.0", \ "-jar", \ "/home/jpo-deduplicator.jar"] diff --git a/docker-compose.yml b/docker-compose.yml index a81f59d..bee6284 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -13,7 +13,10 @@ services: MAVEN_GITHUB_TOKEN: ${MAVEN_GITHUB_TOKEN} MAVEN_GITHUB_ORG: ${MAVEN_GITHUB_ORG} image: jpo-deduplicator:latest + privileged: false # Set true to allow writing to /proc/sys/vm/drop_caches restart: ${RESTART_POLICY} + ports: + - "10091:10090" # JMX environment: DOCKER_HOST_IP: ${DOCKER_HOST_IP} KAFKA_BOOTSTRAP_SERVERS: ${KAFKA_BOOTSTRAP_SERVERS} @@ -25,6 +28,12 @@ services: ENABLE_ODE_RAW_ENCODED_TIM_DEDUPLICATION: ${ENABLE_ODE_RAW_ENCODED_TIM_DEDUPLICATION} ENABLE_PROCESSED_SPAT_DEDUPLICATION: ${ENABLE_PROCESSED_SPAT_DEDUPLICATION} ENABLE_ODE_BSM_DEDUPLICATION: ${ENABLE_ODE_BSM_DEDUPLICATION} + DEDUPLICATOR_ROCKSDB_TOTAL_OFF_HEAP_MEMORY: ${DEDUPLICATOR_ROCKSDB_TOTAL_OFF_HEAP_MEMORY} + DEDUPLICATOR_ROCKSDB_INDEX_FILTER_BLOCK_RATIO: ${DEDUPLICATOR_ROCKSDB_INDEX_FILTER_BLOCK_RATIO} + DEDUPLICATOR_ROCKSDB_TOTAL_MEMTABLE_MEMORY: ${DEDUPLICATOR_ROCKSDB_TOTAL_MEMTABLE_MEMORY} + DEDUPLICATOR_ROCKSDB_BLOCK_SIZE: ${DEDUPLICATOR_ROCKSDB_BLOCK_SIZE} + DEDUPLICATOR_ROCKSDB_N_MEMTABLES: ${DEDUPLICATOR_ROCKSDB_N_MEMTABLES} + DEDUPLICATOR_ROCKSDB_MEMTABLE_SIZE: ${DEDUPLICATOR_ROCKSDB_MEMTABLE_SIZE} KAFKA_TYPE: ${KAFKA_TYPE} CONFLUENT_KEY: ${CONFLUENT_KEY} CONFLUENT_SECRET: ${CONFLUENT_SECRET} @@ -40,7 +49,7 @@ services: deploy: resources: limits: - memory: 3G + memory: 1G depends_on: kafka: condition: service_healthy diff --git a/jpo-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/DeduplicatorProperties.java b/jpo-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/DeduplicatorProperties.java index c5ca126..8f90068 100644 --- a/jpo-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/DeduplicatorProperties.java +++ b/jpo-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/DeduplicatorProperties.java @@ -41,6 +41,7 @@ import lombok.Setter; import lombok.AccessLevel; import us.dot.its.jpo.conflictmonitor.AlwaysContinueProductionExceptionHandler; +import us.dot.its.jpo.deduplicator.deduplicator.BoundedMemoryRocksDBConfig; import us.dot.its.jpo.ode.eventlog.EventLogger; import us.dot.its.jpo.ode.util.CommonUtils; @@ -239,6 +240,9 @@ public Properties createStreamProperties(String name) { // Reduce cache buffering per topology to 1MB streamProps.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 1 * 1024 * 1024L); + // Optionally, to disable caching: + //streamProps.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); + // Decrease default commit interval. Default for 'at least once' mode of 30000ms // is too slow. @@ -280,8 +284,10 @@ public Properties createStreamProperties(String name) { else { logger.error("Environment variables CONFLUENT_KEY and CONFLUENT_SECRET are not set. Set these in the .env file to use Confluent Cloud"); } - } + } + // Configure RocksDB memory usage + streamProps.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, BoundedMemoryRocksDBConfig.class); return streamProps; } diff --git a/jpo-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/deduplicator/BoundedMemoryRocksDBConfig.java b/jpo-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/deduplicator/BoundedMemoryRocksDBConfig.java new file mode 100644 index 0000000..2c31ce7 --- /dev/null +++ b/jpo-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/deduplicator/BoundedMemoryRocksDBConfig.java @@ -0,0 +1,144 @@ +package us.dot.its.jpo.deduplicator.deduplicator; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.streams.state.RocksDBConfigSetter; +import org.rocksdb.BlockBasedTableConfig; +import org.rocksdb.CompressionType; +import org.rocksdb.Options; +import org.rocksdb.RocksDB; +import java.util.Map; + +/** + * Bounded memory configuration for RocksDB for all topologies. + *

References: + *

+ *

Configured using environment variables: + *

+ *
DEDUPLICATOR_ROCKSDB_TOTAL_OFF_HEAP_MEMORY
Total block cache size
+ *
DEDUPLICATOR_ROCKSDB_INDEX_FILTER_BLOCK_RATIO
Fraction of the block cache to use for high priority blocks (index + * and filter blocks).
+ *
DEDUPLICATOR_ROCKSDB_TOTAL_MEMTABLE_MEMORY
Write buffer size, include in block cache
+ *
DEDUPLICATOR_ROCKSDB_BLOCK_SIZE
{@link org.rocksdb.BlockBasedTableConfig#blockSize()}, Default 4KB
+ *
DEDUPLICATOR_ROCKSDB_N_MEMTABLES
{@link org.rocksdb.Options#maxWriteBufferNumber()}, Default 2
+ *
DEDUPLICATOR_ROCKSDB_MEMTABLE_SIZE
{@link org.rocksdb.Options#writeBufferSize()}, Default 64MB
+ *
+ *

+ */ + +@Slf4j +public class BoundedMemoryRocksDBConfig implements RocksDBConfigSetter { + + private final static long TOTAL_OFF_HEAP_MEMORY; + private final static double INDEX_FILTER_BLOCK_RATIO; + private final static long TOTAL_MEMTABLE_MEMORY; + + // Block size: Default 4K + private final static long BLOCK_SIZE; + + // MaxWriteBufferNumber: Default 2 + private final static int N_MEMTABLES; + + // WriteBufferSize: Default 64MB + private final static long MEMTABLE_SIZE; + private final static long KB = 1024L; + private final static long MB = KB * KB; + + static { + RocksDB.loadLibrary(); + // Initialize properties from env variables + TOTAL_OFF_HEAP_MEMORY = getEnvLong("DEDUPLICATOR_ROCKSDB_TOTAL_OFF_HEAP_MEMORY", 128 * MB); + INDEX_FILTER_BLOCK_RATIO = getEnvDouble("DEDUPLICATOR_ROCKSDB_INDEX_FILTER_BLOCK_RATIO", 0.1); + TOTAL_MEMTABLE_MEMORY = getEnvLong("DEDUPLICATOR_ROCKSDB_TOTAL_MEMTABLE_MEMORY", 64 * MB); + BLOCK_SIZE = getEnvLong("DEDUPLICATOR_ROCKSDB_BLOCK_SIZE", 4 * KB); + N_MEMTABLES = getEnvInt("DEDUPLICATOR_ROCKSDB_N_MEMTABLES", 2); + MEMTABLE_SIZE = getEnvLong("DEDUPLICATOR_ROCKSDB_MEMTABLE_SIZE", 16 * MB); + + log.info("Initialized BoundedMemoryRocksDBConfig. TOTAL_OFF_HEAP_MEMORY = {}, INDEX_FILTER_BLOCK_RATIO = {}," + + " TOTAL_MEMTABLE_MEMORY = {}, BLOCK_SIZE = {}, N_MEMTABLES = {}, MEMTABLE_SIZE = {}", + TOTAL_OFF_HEAP_MEMORY, INDEX_FILTER_BLOCK_RATIO, TOTAL_MEMTABLE_MEMORY, BLOCK_SIZE, N_MEMTABLES, + MEMTABLE_SIZE); + } + + // See #1 below + private static final org.rocksdb.Cache cache + = new org.rocksdb.LRUCache(TOTAL_OFF_HEAP_MEMORY, -1, false, INDEX_FILTER_BLOCK_RATIO); + private static final org.rocksdb.WriteBufferManager writeBufferManager + = new org.rocksdb.WriteBufferManager(TOTAL_MEMTABLE_MEMORY, cache); + @Override + public void setConfig(final String storeName, final Options options, final Map configs) { + log.info("Setting RocksDB config for store {}", storeName); + BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig(); + + // These three options in combination will limit the memory used by RocksDB to the size passed to the block + // cache (TOTAL_OFF_HEAP_MEMORY) + tableConfig.setBlockCache(cache); + tableConfig.setCacheIndexAndFilterBlocks(true); + options.setWriteBufferManager(writeBufferManager); + + // These options are recommended to be set when bounding the total memory + // See #2 below + tableConfig.setCacheIndexAndFilterBlocksWithHighPriority(true); + tableConfig.setPinTopLevelIndexAndFilter(true); + + // See #3 below + tableConfig.setBlockSize(BLOCK_SIZE); + options.setMaxWriteBufferNumber(N_MEMTABLES); + options.setWriteBufferSize(MEMTABLE_SIZE); + + // Enable compression (optional). Compression can decrease the required storage + // and increase the CPU usage of the machine. For CompressionType values, see + // https://javadoc.io/static/org.rocksdb/rocksdbjni/6.4.6/org/rocksdb/CompressionType.html. + options.setCompressionType(CompressionType.LZ4_COMPRESSION); + options.setTableFormatConfig(tableConfig); + log.info("Rocksdb set table config: {}, options: {}", tableConfig, options); + } + @Override + public void close(final String storeName, final Options options) { + // Cache and WriteBufferManager should not be closed here, as the same objects are shared by every store instance. + } + + private static long getEnvLong(String name, long defaultValue) { + String strValue = getEnvString(name); + if (strValue == null) return defaultValue; + + try { + return Long.parseLong(strValue); + } catch (NumberFormatException nfe) { + log.error("Error parsing env variable to long {}, {}", name, strValue, nfe); + return defaultValue; + } + } + + private static int getEnvInt(String name, int defaultValue) { + String strValue = getEnvString(name); + if (strValue == null) return defaultValue; + try { + return Integer.parseInt(strValue); + } catch (NumberFormatException nfe) { + log.error("Error parsing env variable to long {}, {}", name, strValue, nfe); + return defaultValue; + } + } + + private static double getEnvDouble(String name, double defaultValue) { + String strValue = getEnvString(name); + if (strValue == null) return defaultValue; + try { + return Double.parseDouble(strValue); + } catch (NumberFormatException nfe) { + log.error("Error parsing env variable to long {}, {}", name, strValue, nfe); + return defaultValue; + } + } + + private static String getEnvString(String name) { + String strValue = System.getenv(name); + if (strValue == null) { + log.warn("Env variable {} is not set", name); + } + return strValue; + } +} \ No newline at end of file diff --git a/jpo-deduplicator/src/test/java/deduplicator/ProcessedMapDeduplicatorTopologyTest.java b/jpo-deduplicator/src/test/java/deduplicator/ProcessedMapDeduplicatorTopologyTest.java index b6c7466..6868cf0 100644 --- a/jpo-deduplicator/src/test/java/deduplicator/ProcessedMapDeduplicatorTopologyTest.java +++ b/jpo-deduplicator/src/test/java/deduplicator/ProcessedMapDeduplicatorTopologyTest.java @@ -28,7 +28,6 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; -import java.util.List; public class ProcessedMapDeduplicatorTopologyTest { diff --git a/sample.env b/sample.env index cf0a681..912cfb5 100644 --- a/sample.env +++ b/sample.env @@ -47,4 +47,16 @@ ENABLE_ODE_MAP_DEDUPLICATION=true ENABLE_ODE_TIM_DEDUPLICATION=true ENABLE_ODE_RAW_ENCODED_TIM_DEDUPLICATION=true ENABLE_PROCESSED_SPAT_DEDUPLICATION=true -ENABLE_ODE_BSM_DEDUPLICATION=true \ No newline at end of file +ENABLE_ODE_BSM_DEDUPLICATION=true + + +# RocksDB Bounded Memory Config Properties +# 128 MB = 134217728 +# 64 MB = 67108864 +# 16 MB = 16777216 +DEDUPLICATOR_ROCKSDB_TOTAL_OFF_HEAP_MEMORY=134217728 +DEDUPLICATOR_ROCKSDB_INDEX_FILTER_BLOCK_RATIO=0.1 +DEDUPLICATOR_ROCKSDB_TOTAL_MEMTABLE_MEMORY=67108864 +DEDUPLICATOR_ROCKSDB_BLOCK_SIZE=4096 +DEDUPLICATOR_ROCKSDB_N_MEMTABLES=2 +DEDUPLICATOR_ROCKSDB_MEMTABLE_SIZE=16777216 \ No newline at end of file