Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory update #3

Merged
merged 4 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,24 @@ COPY --from=builder /home/jpo-deduplicator/target/jpo-deduplicator.jar /home
#COPY cert.crt /home/cert.crt
#RUN keytool -import -trustcacerts -keystore /usr/local/openjdk-11/lib/security/cacerts -storepass changeit -noprompt -alias mycert -file cert.crt

RUN amazon-linux-extras install -y epel && \
yum install -y jemalloc-devel
ENV LD_PRELOAD="/usr/lib64/libjemalloc.so"

ENTRYPOINT ["java", \
"-Djava.rmi.server.hostname=$DOCKER_HOST_IP", \
"-Dcom.sun.management.jmxremote.port=9090", \
"-Dcom.sun.management.jmxremote.rmi.port=9090", \
"-Dcom.sun.management.jmxremote", \
"-Dcom.sun.management.jmxremote.local.only=true", \
"-Dcom.sun.management.jmxremote.authenticate=false", \
"-Dcom.sun.management.jmxremote.ssl=false", \
"-Dlogback.configurationFile=/home/logback.xml", \
"-Xmx1024M", \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this make it, so this image's JVM won't be able to use more than 1Gb of memory?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This sets the containers memory limit to 1 GB. Currently 1 deduplicator is able to handle about 27 intersections for CDOT. Any more intersections and we should probably move to scaling horizontally instead of vertically.

"-Xms128M", \
"-XX:+UseG1GC", \
"-XX:MaxGCPauseMillis=20", \
"-XX:InitiatingHeapOccupancyPercent=35", \
"-XX:MetaspaceSize=96m", \
"-XX:MinMetaspaceFreeRatio=50", \
"-XX:MaxMetaspaceFreeRatio=80", \
"-XX:+ExplicitGCInvokesConcurrent", \
"-XX:InitialRAMPercentage=5.0", \
# "-XX:MaxRAMPercentage=50.0", \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the # "-XX:MaxRAMPercentage=50.0", \ still be here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This parameter has been removed. in favor of using the allocated memory limits instead of percentage based limits.

"-jar", \
"/home/jpo-deduplicator.jar"]

Expand Down
11 changes: 10 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ services:
MAVEN_GITHUB_TOKEN: ${MAVEN_GITHUB_TOKEN}
MAVEN_GITHUB_ORG: ${MAVEN_GITHUB_ORG}
image: jpo-deduplicator:latest
privileged: false # Set true to allow writing to /proc/sys/vm/drop_caches
restart: ${RESTART_POLICY}
ports:
- "10091:10090" # JMX
environment:
DOCKER_HOST_IP: ${DOCKER_HOST_IP}
KAFKA_BOOTSTRAP_SERVERS: ${KAFKA_BOOTSTRAP_SERVERS}
Expand All @@ -25,6 +28,12 @@ services:
ENABLE_ODE_RAW_ENCODED_TIM_DEDUPLICATION: ${ENABLE_ODE_RAW_ENCODED_TIM_DEDUPLICATION}
ENABLE_PROCESSED_SPAT_DEDUPLICATION: ${ENABLE_PROCESSED_SPAT_DEDUPLICATION}
ENABLE_ODE_BSM_DEDUPLICATION: ${ENABLE_ODE_BSM_DEDUPLICATION}
DEDUPLICATOR_ROCKSDB_TOTAL_OFF_HEAP_MEMORY: ${DEDUPLICATOR_ROCKSDB_TOTAL_OFF_HEAP_MEMORY:?error}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to error out if it isn't set? Seems like defaults are provided in the source code.

DEDUPLICATOR_ROCKSDB_INDEX_FILTER_BLOCK_RATIO: ${DEDUPLICATOR_ROCKSDB_INDEX_FILTER_BLOCK_RATIO:?error}
DEDUPLICATOR_ROCKSDB_TOTAL_MEMTABLE_MEMORY: ${DEDUPLICATOR_ROCKSDB_TOTAL_MEMTABLE_MEMORY:?error}
DEDUPLICATOR_ROCKSDB_BLOCK_SIZE: ${DEDUPLICATOR_ROCKSDB_BLOCK_SIZE:?error}
DEDUPLICATOR_ROCKSDB_N_MEMTABLES: ${DEDUPLICATOR_ROCKSDB_N_MEMTABLES:?error}
DEDUPLICATOR_ROCKSDB_MEMTABLE_SIZE: ${DEDUPLICATOR_ROCKSDB_MEMTABLE_SIZE:?error}
KAFKA_TYPE: ${KAFKA_TYPE}
CONFLUENT_KEY: ${CONFLUENT_KEY}
CONFLUENT_SECRET: ${CONFLUENT_SECRET}
Expand All @@ -40,7 +49,7 @@ services:
deploy:
resources:
limits:
memory: 3G
memory: 1G
depends_on:
kafka:
condition: service_healthy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import lombok.Setter;
import lombok.AccessLevel;
import us.dot.its.jpo.conflictmonitor.AlwaysContinueProductionExceptionHandler;
import us.dot.its.jpo.deduplicator.deduplicator.BoundedMemoryRocksDBConfig;
import us.dot.its.jpo.ode.eventlog.EventLogger;
import us.dot.its.jpo.ode.util.CommonUtils;

Expand Down Expand Up @@ -239,6 +240,9 @@ public Properties createStreamProperties(String name) {

// Reduce cache buffering per topology to 1MB
streamProps.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 1 * 1024 * 1024L);
// Optionally, to disable caching:
//streamProps.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);


// Decrease default commit interval. Default for 'at least once' mode of 30000ms
// is too slow.
Expand Down Expand Up @@ -280,8 +284,10 @@ public Properties createStreamProperties(String name) {
else {
logger.error("Environment variables CONFLUENT_KEY and CONFLUENT_SECRET are not set. Set these in the .env file to use Confluent Cloud");
}
}
}

// Configure RocksDB memory usage
streamProps.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, BoundedMemoryRocksDBConfig.class);

return streamProps;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package us.dot.its.jpo.deduplicator.deduplicator;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.CompressionType;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import java.util.Map;

/**
* Bounded memory configuration for RocksDB for all topologies.
* <p>References:
* <ul>
* <li><a href="https://docs.confluent.io/platform/current/streams/developer-guide/memory-mgmt.html#rocksdb">Confluent: RocksDB Memory Management</a></li>
* <li><a href="https://docs.confluent.io/platform/current/streams/developer-guide/config-streams.html#rocksdb-config-setter">Confluent: RocksDB Config Setter</a></li>
* </ul></p>
* <p>Configured using environment variables:
* <dl>
* <dt>DEDUPLICATOR_ROCKSDB_TOTAL_OFF_HEAP_MEMORY</dt><dd>Total block cache size</dd>
* <dt>DEDUPLICATOR_ROCKSDB_INDEX_FILTER_BLOCK_RATIO</dt><dd>Fraction of the block cache to use for high priority blocks (index
* and filter blocks).</dd>
* <dt>DEDUPLICATOR_ROCKSDB_TOTAL_MEMTABLE_MEMORY</dt><dd>Write buffer size, include in block cache</dd>
* <dt>DEDUPLICATOR_ROCKSDB_BLOCK_SIZE</dt><dd>{@link org.rocksdb.BlockBasedTableConfig#blockSize()}, Default 4KB</dd>
* <dt>DEDUPLICATOR_ROCKSDB_N_MEMTABLES</dt><dd>{@link org.rocksdb.Options#maxWriteBufferNumber()}, Default 2</dd>
* <dt>DEDUPLICATOR_ROCKSDB_MEMTABLE_SIZE</dt><dd>{@link org.rocksdb.Options#writeBufferSize()}, Default 64MB</dd>
* </dl>
* </p>
*/

@Slf4j
public class BoundedMemoryRocksDBConfig implements RocksDBConfigSetter {

private final static long TOTAL_OFF_HEAP_MEMORY;
private final static double INDEX_FILTER_BLOCK_RATIO;
private final static long TOTAL_MEMTABLE_MEMORY;

// Block size: Default 4K
private final static long BLOCK_SIZE;

// MaxWriteBufferNumber: Default 2
private final static int N_MEMTABLES;

// WriteBufferSize: Default 64MB
private final static long MEMTABLE_SIZE;
private final static long KB = 1024L;
private final static long MB = KB * KB;

static {
RocksDB.loadLibrary();
// Initialize properties from env variables
TOTAL_OFF_HEAP_MEMORY = getEnvLong("DEDUPLICATOR_ROCKSDB_TOTAL_OFF_HEAP_MEMORY", 128 * MB);
INDEX_FILTER_BLOCK_RATIO = getEnvDouble("DEDUPLICATOR_ROCKSDB_INDEX_FILTER_BLOCK_RATIO", 0.1);
TOTAL_MEMTABLE_MEMORY = getEnvLong("DEDUPLICATOR_ROCKSDB_TOTAL_MEMTABLE_MEMORY", 64 * MB);
BLOCK_SIZE = getEnvLong("DEDUPLICATOR_ROCKSDB_BLOCK_SIZE", 4 * KB);
N_MEMTABLES = getEnvInt("DEDUPLICATOR_ROCKSDB_N_MEMTABLES", 2);
MEMTABLE_SIZE = getEnvLong("DEDUPLICATOR_ROCKSDB_MEMTABLE_SIZE", 16 * MB);

log.info("Initialized BoundedMemoryRocksDBConfig. TOTAL_OFF_HEAP_MEMORY = {}, INDEX_FILTER_BLOCK_RATIO = {}," +
" TOTAL_MEMTABLE_MEMORY = {}, BLOCK_SIZE = {}, N_MEMTABLES = {}, MEMTABLE_SIZE = {}",
TOTAL_OFF_HEAP_MEMORY, INDEX_FILTER_BLOCK_RATIO, TOTAL_MEMTABLE_MEMORY, BLOCK_SIZE, N_MEMTABLES,
MEMTABLE_SIZE);
}

// See #1 below
private static final org.rocksdb.Cache cache
= new org.rocksdb.LRUCache(TOTAL_OFF_HEAP_MEMORY, -1, false, INDEX_FILTER_BLOCK_RATIO);
private static final org.rocksdb.WriteBufferManager writeBufferManager
= new org.rocksdb.WriteBufferManager(TOTAL_MEMTABLE_MEMORY, cache);
@Override
public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
log.info("Setting RocksDB config for store {}", storeName);
BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();

// These three options in combination will limit the memory used by RocksDB to the size passed to the block
// cache (TOTAL_OFF_HEAP_MEMORY)
tableConfig.setBlockCache(cache);
tableConfig.setCacheIndexAndFilterBlocks(true);
options.setWriteBufferManager(writeBufferManager);

// These options are recommended to be set when bounding the total memory
// See #2 below
tableConfig.setCacheIndexAndFilterBlocksWithHighPriority(true);
tableConfig.setPinTopLevelIndexAndFilter(true);

// See #3 below
tableConfig.setBlockSize(BLOCK_SIZE);
options.setMaxWriteBufferNumber(N_MEMTABLES);
options.setWriteBufferSize(MEMTABLE_SIZE);

// Enable compression (optional). Compression can decrease the required storage
// and increase the CPU usage of the machine. For CompressionType values, see
// https://javadoc.io/static/org.rocksdb/rocksdbjni/6.4.6/org/rocksdb/CompressionType.html.
options.setCompressionType(CompressionType.LZ4_COMPRESSION);
options.setTableFormatConfig(tableConfig);
log.info("Rocksdb set table config: {}, options: {}", tableConfig, options);
}
@Override
public void close(final String storeName, final Options options) {
// Cache and WriteBufferManager should not be closed here, as the same objects are shared by every store instance.
}

private static long getEnvLong(String name, long defaultValue) {
String strValue = getEnvString(name);
if (strValue == null) return defaultValue;

try {
return Long.parseLong(strValue);
} catch (NumberFormatException nfe) {
log.error("Error parsing env variable to long {}, {}", name, strValue, nfe);
return defaultValue;
}
}

private static int getEnvInt(String name, int defaultValue) {
String strValue = getEnvString(name);
if (strValue == null) return defaultValue;
try {
return Integer.parseInt(strValue);
} catch (NumberFormatException nfe) {
log.error("Error parsing env variable to long {}, {}", name, strValue, nfe);
return defaultValue;
}
}

private static double getEnvDouble(String name, double defaultValue) {
String strValue = getEnvString(name);
if (strValue == null) return defaultValue;
try {
return Double.parseDouble(strValue);
} catch (NumberFormatException nfe) {
log.error("Error parsing env variable to long {}, {}", name, strValue, nfe);
return defaultValue;
}
}

private static String getEnvString(String name) {
String strValue = System.getenv(name);
if (strValue == null) {
log.warn("Env variable {} is not set", name);
}
return strValue;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;

public class ProcessedMapDeduplicatorTopologyTest {

Expand Down
14 changes: 13 additions & 1 deletion sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,16 @@ ENABLE_ODE_MAP_DEDUPLICATION=true
ENABLE_ODE_TIM_DEDUPLICATION=true
ENABLE_ODE_RAW_ENCODED_TIM_DEDUPLICATION=true
ENABLE_PROCESSED_SPAT_DEDUPLICATION=true
ENABLE_ODE_BSM_DEDUPLICATION=true
ENABLE_ODE_BSM_DEDUPLICATION=true


# RocksDB Bounded Memory Config Properties
# 128 MB = 134217728
# 64 MB = 67108864
# 16 MB = 16777216
DEDUPLICATOR_ROCKSDB_TOTAL_OFF_HEAP_MEMORY=134217728
DEDUPLICATOR_ROCKSDB_INDEX_FILTER_BLOCK_RATIO=0.1
DEDUPLICATOR_ROCKSDB_TOTAL_MEMTABLE_MEMORY=67108864
DEDUPLICATOR_ROCKSDB_BLOCK_SIZE=4096
DEDUPLICATOR_ROCKSDB_N_MEMTABLES=2
DEDUPLICATOR_ROCKSDB_MEMTABLE_SIZE=16777216
Loading