Skip to content

Commit

Permalink
Adding in Rocks DB Memory Management
Browse files Browse the repository at this point in the history
  • Loading branch information
John-Wiens committed Feb 3, 2025
1 parent 4236cb7 commit 3d8584c
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 10 deletions.
21 changes: 15 additions & 6 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,24 @@ COPY --from=builder /home/jpo-deduplicator/target/jpo-deduplicator.jar /home
#COPY cert.crt /home/cert.crt
#RUN keytool -import -trustcacerts -keystore /usr/local/openjdk-11/lib/security/cacerts -storepass changeit -noprompt -alias mycert -file cert.crt

RUN amazon-linux-extras install -y epel && \
yum install -y jemalloc-devel
ENV LD_PRELOAD="/usr/lib64/libjemalloc.so"

ENTRYPOINT ["java", \
"-Djava.rmi.server.hostname=$DOCKER_HOST_IP", \
"-Dcom.sun.management.jmxremote.port=9090", \
"-Dcom.sun.management.jmxremote.rmi.port=9090", \
"-Dcom.sun.management.jmxremote", \
"-Dcom.sun.management.jmxremote.local.only=true", \
"-Dcom.sun.management.jmxremote.authenticate=false", \
"-Dcom.sun.management.jmxremote.ssl=false", \
"-Dlogback.configurationFile=/home/logback.xml", \
"-Xmx128M", \
"-Xms16M", \
"-XX:+UseG1GC", \
"-XX:MaxGCPauseMillis=20", \
"-XX:InitiatingHeapOccupancyPercent=35", \
"-XX:MetaspaceSize=96m", \
"-XX:MinMetaspaceFreeRatio=50", \
"-XX:MaxMetaspaceFreeRatio=80", \
"-XX:+ExplicitGCInvokesConcurrent", \
"-XX:InitialRAMPercentage=5.0", \
"-XX:MaxRAMPercentage=50.0", \
"-jar", \
"/home/jpo-deduplicator.jar"]

Expand Down
11 changes: 10 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ services:
MAVEN_GITHUB_TOKEN: ${MAVEN_GITHUB_TOKEN}
MAVEN_GITHUB_ORG: ${MAVEN_GITHUB_ORG}
image: jpo-deduplicator:latest
privileged: false # Set true to allow writing to /proc/sys/vm/drop_caches
restart: ${RESTART_POLICY}
ports:
- "10091:10090" # JMX
environment:
DOCKER_HOST_IP: ${DOCKER_HOST_IP}
KAFKA_BOOTSTRAP_SERVERS: ${KAFKA_BOOTSTRAP_SERVERS}
Expand All @@ -25,6 +28,12 @@ services:
ENABLE_ODE_RAW_ENCODED_TIM_DEDUPLICATION: ${ENABLE_ODE_RAW_ENCODED_TIM_DEDUPLICATION}
ENABLE_PROCESSED_SPAT_DEDUPLICATION: ${ENABLE_PROCESSED_SPAT_DEDUPLICATION}
ENABLE_ODE_BSM_DEDUPLICATION: ${ENABLE_ODE_BSM_DEDUPLICATION}
DEDUPLICATOR_ROCKSDB_TOTAL_OFF_HEAP_MEMORY: ${DEDUPLICATOR_ROCKSDB_TOTAL_OFF_HEAP_MEMORY:?error}
DEDUPLICATOR_ROCKSDB_INDEX_FILTER_BLOCK_RATIO: ${DEDUPLICATOR_ROCKSDB_INDEX_FILTER_BLOCK_RATIO:?error}
DEDUPLICATOR_ROCKSDB_TOTAL_MEMTABLE_MEMORY: ${DEDUPLICATOR_ROCKSDB_TOTAL_MEMTABLE_MEMORY:?error}
DEDUPLICATOR_ROCKSDB_BLOCK_SIZE: ${DEDUPLICATOR_ROCKSDB_BLOCK_SIZE:?error}
DEDUPLICATOR_ROCKSDB_N_MEMTABLES: ${DEDUPLICATOR_ROCKSDB_N_MEMTABLES:?error}
DEDUPLICATOR_ROCKSDB_MEMTABLE_SIZE: ${DEDUPLICATOR_ROCKSDB_MEMTABLE_SIZE:?error}
KAFKA_TYPE: ${KAFKA_TYPE}
CONFLUENT_KEY: ${CONFLUENT_KEY}
CONFLUENT_SECRET: ${CONFLUENT_SECRET}
Expand All @@ -40,7 +49,7 @@ services:
deploy:
resources:
limits:
memory: 3G
memory: 1G
depends_on:
kafka:
condition: service_healthy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import lombok.Setter;
import lombok.AccessLevel;
import us.dot.its.jpo.conflictmonitor.AlwaysContinueProductionExceptionHandler;
import us.dot.its.jpo.deduplicator.deduplicator.BoundedMemoryRocksDBConfig;
import us.dot.its.jpo.ode.eventlog.EventLogger;
import us.dot.its.jpo.ode.util.CommonUtils;

Expand Down Expand Up @@ -239,6 +240,9 @@ public Properties createStreamProperties(String name) {

// Reduce cache buffering per topology to 1MB
streamProps.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 1 * 1024 * 1024L);
// Optionally, to disable caching:
//streamProps.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);


// Decrease default commit interval. Default for 'at least once' mode of 30000ms
// is too slow.
Expand Down Expand Up @@ -280,8 +284,10 @@ public Properties createStreamProperties(String name) {
else {
logger.error("Environment variables CONFLUENT_KEY and CONFLUENT_SECRET are not set. Set these in the .env file to use Confluent Cloud");
}
}
}

// Configure RocksDB memory usage
streamProps.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, BoundedMemoryRocksDBConfig.class);

return streamProps;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package us.dot.its.jpo.deduplicator.deduplicator;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.CompressionType;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import java.util.Map;

/**
* Bounded memory configuration for RocksDB for all topologies.
* <p>References:
* <ul>
* <li><a href="https://docs.confluent.io/platform/current/streams/developer-guide/memory-mgmt.html#rocksdb">Confluent: RocksDB Memory Management</a></li>
* <li><a href="https://docs.confluent.io/platform/current/streams/developer-guide/config-streams.html#rocksdb-config-setter">Confluent: RocksDB Config Setter</a></li>
* </ul></p>
* <p>Configured using environment variables:
* <dl>
* <dt>DEDUPLICATOR_ROCKSDB_TOTAL_OFF_HEAP_MEMORY</dt><dd>Total block cache size</dd>
* <dt>DEDUPLICATOR_ROCKSDB_INDEX_FILTER_BLOCK_RATIO</dt><dd>Fraction of the block cache to use for high priority blocks (index
* and filter blocks).</dd>
* <dt>DEDUPLICATOR_ROCKSDB_TOTAL_MEMTABLE_MEMORY</dt><dd>Write buffer size, include in block cache</dd>
* <dt>DEDUPLICATOR_ROCKSDB_BLOCK_SIZE</dt><dd>{@link org.rocksdb.BlockBasedTableConfig#blockSize()}, Default 4KB</dd>
* <dt>DEDUPLICATOR_ROCKSDB_N_MEMTABLES</dt><dd>{@link org.rocksdb.Options#maxWriteBufferNumber()}, Default 2</dd>
* <dt>DEDUPLICATOR_ROCKSDB_MEMTABLE_SIZE</dt><dd>{@link org.rocksdb.Options#writeBufferSize()}, Default 64MB</dd>
* </dl>
* </p>
*/

@Slf4j
public class BoundedMemoryRocksDBConfig implements RocksDBConfigSetter {

private final static long TOTAL_OFF_HEAP_MEMORY;
private final static double INDEX_FILTER_BLOCK_RATIO;
private final static long TOTAL_MEMTABLE_MEMORY;

// Block size: Default 4K
private final static long BLOCK_SIZE;

// MaxWriteBufferNumber: Default 2
private final static int N_MEMTABLES;

// WriteBufferSize: Default 64MB
private final static long MEMTABLE_SIZE;
private final static long KB = 1024L;
private final static long MB = KB * KB;

static {
RocksDB.loadLibrary();
// Initialize properties from env variables
TOTAL_OFF_HEAP_MEMORY = getEnvLong("DEDUPLICATOR_ROCKSDB_TOTAL_OFF_HEAP_MEMORY", 128 * MB);
INDEX_FILTER_BLOCK_RATIO = getEnvDouble("DEDUPLICATOR_ROCKSDB_INDEX_FILTER_BLOCK_RATIO", 0.1);
TOTAL_MEMTABLE_MEMORY = getEnvLong("DEDUPLICATOR_ROCKSDB_TOTAL_MEMTABLE_MEMORY", 64 * MB);
BLOCK_SIZE = getEnvLong("DEDUPLICATOR_ROCKSDB_BLOCK_SIZE", 4 * KB);
N_MEMTABLES = getEnvInt("DEDUPLICATOR_ROCKSDB_N_MEMTABLES", 2);
MEMTABLE_SIZE = getEnvLong("ROCKSDB_MEMTABLE_SIZE", 16 * MB);

log.info("Initialized BoundedMemoryRocksDBConfig. TOTAL_OFF_HEAP_MEMORY = {}, INDEX_FILTER_BLOCK_RATIO = {}," +
" TOTAL_MEMTABLE_MEMORY = {}, BLOCK_SIZE = {}, N_MEMTABLES = {}, MEMTABLE_SIZE = {}",
TOTAL_OFF_HEAP_MEMORY, INDEX_FILTER_BLOCK_RATIO, TOTAL_MEMTABLE_MEMORY, BLOCK_SIZE, N_MEMTABLES,
MEMTABLE_SIZE);
}

// See #1 below
private static final org.rocksdb.Cache cache
= new org.rocksdb.LRUCache(TOTAL_OFF_HEAP_MEMORY, -1, false, INDEX_FILTER_BLOCK_RATIO);
private static final org.rocksdb.WriteBufferManager writeBufferManager
= new org.rocksdb.WriteBufferManager(TOTAL_MEMTABLE_MEMORY, cache);
@Override
public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
log.info("Setting RocksDB config for store {}", storeName);
BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();

// These three options in combination will limit the memory used by RocksDB to the size passed to the block
// cache (TOTAL_OFF_HEAP_MEMORY)
tableConfig.setBlockCache(cache);
tableConfig.setCacheIndexAndFilterBlocks(true);
options.setWriteBufferManager(writeBufferManager);

// These options are recommended to be set when bounding the total memory
// See #2 below
tableConfig.setCacheIndexAndFilterBlocksWithHighPriority(true);
tableConfig.setPinTopLevelIndexAndFilter(true);

// See #3 below
tableConfig.setBlockSize(BLOCK_SIZE);
options.setMaxWriteBufferNumber(N_MEMTABLES);
options.setWriteBufferSize(MEMTABLE_SIZE);

// Enable compression (optional). Compression can decrease the required storage
// and increase the CPU usage of the machine. For CompressionType values, see
// https://javadoc.io/static/org.rocksdb/rocksdbjni/6.4.6/org/rocksdb/CompressionType.html.
options.setCompressionType(CompressionType.LZ4_COMPRESSION);
options.setTableFormatConfig(tableConfig);
log.info("Rocksdb set table config: {}, options: {}", tableConfig, options);
}
@Override
public void close(final String storeName, final Options options) {
// Cache and WriteBufferManager should not be closed here, as the same objects are shared by every store instance.
}

private static long getEnvLong(String name, long defaultValue) {
String strValue = getEnvString(name);
if (strValue == null) return defaultValue;

try {
return Long.parseLong(strValue);
} catch (NumberFormatException nfe) {
log.error("Error parsing env variable to long {}, {}", name, strValue, nfe);
return defaultValue;
}
}

private static int getEnvInt(String name, int defaultValue) {
String strValue = getEnvString(name);
if (strValue == null) return defaultValue;
try {
return Integer.parseInt(strValue);
} catch (NumberFormatException nfe) {
log.error("Error parsing env variable to long {}, {}", name, strValue, nfe);
return defaultValue;
}
}

private static double getEnvDouble(String name, double defaultValue) {
String strValue = getEnvString(name);
if (strValue == null) return defaultValue;
try {
return Double.parseDouble(strValue);
} catch (NumberFormatException nfe) {
log.error("Error parsing env variable to long {}, {}", name, strValue, nfe);
return defaultValue;
}
}

private static String getEnvString(String name) {
String strValue = System.getenv(name);
if (strValue == null) {
log.warn("Env variable {} is not set", name);
}
return strValue;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;

public class ProcessedMapDeduplicatorTopologyTest {

Expand Down
14 changes: 13 additions & 1 deletion sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,16 @@ ENABLE_ODE_MAP_DEDUPLICATION=true
ENABLE_ODE_TIM_DEDUPLICATION=true
ENABLE_ODE_RAW_ENCODED_TIM_DEDUPLICATION=true
ENABLE_PROCESSED_SPAT_DEDUPLICATION=true
ENABLE_ODE_BSM_DEDUPLICATION=true
ENABLE_ODE_BSM_DEDUPLICATION=true


# RocksDB Bounded Memory Config Properties
# 128 MB = 134217728
# 64 MB = 67108864
# 16 MB = 16777216
DEDUPLICATOR_ROCKSDB_TOTAL_OFF_HEAP_MEMORY=134217728
DEDUPLICATOR_ROCKSDB_INDEX_FILTER_BLOCK_RATIO=0.1
DEDUPLICATOR_ROCKSDB_TOTAL_MEMTABLE_MEMORY=67108864
DEDUPLICATOR_ROCKSDB_BLOCK_SIZE=4096
DEDUPLICATOR_ROCKSDB_N_MEMTABLES=2
DEDUPLICATOR_ROCKSDB_MEMTABLE_SIZE=16777216

0 comments on commit 3d8584c

Please sign in to comment.