-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Memory update #3
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,15 +33,24 @@ COPY --from=builder /home/jpo-deduplicator/target/jpo-deduplicator.jar /home | |
#COPY cert.crt /home/cert.crt | ||
#RUN keytool -import -trustcacerts -keystore /usr/local/openjdk-11/lib/security/cacerts -storepass changeit -noprompt -alias mycert -file cert.crt | ||
|
||
RUN amazon-linux-extras install -y epel && \ | ||
yum install -y jemalloc-devel | ||
ENV LD_PRELOAD="/usr/lib64/libjemalloc.so" | ||
|
||
ENTRYPOINT ["java", \ | ||
"-Djava.rmi.server.hostname=$DOCKER_HOST_IP", \ | ||
"-Dcom.sun.management.jmxremote.port=9090", \ | ||
"-Dcom.sun.management.jmxremote.rmi.port=9090", \ | ||
"-Dcom.sun.management.jmxremote", \ | ||
"-Dcom.sun.management.jmxremote.local.only=true", \ | ||
"-Dcom.sun.management.jmxremote.authenticate=false", \ | ||
"-Dcom.sun.management.jmxremote.ssl=false", \ | ||
"-Dlogback.configurationFile=/home/logback.xml", \ | ||
"-Xmx1024M", \ | ||
"-Xms128M", \ | ||
"-XX:+UseG1GC", \ | ||
"-XX:MaxGCPauseMillis=20", \ | ||
"-XX:InitiatingHeapOccupancyPercent=35", \ | ||
"-XX:MetaspaceSize=96m", \ | ||
"-XX:MinMetaspaceFreeRatio=50", \ | ||
"-XX:MaxMetaspaceFreeRatio=80", \ | ||
"-XX:+ExplicitGCInvokesConcurrent", \ | ||
"-XX:InitialRAMPercentage=5.0", \ | ||
# "-XX:MaxRAMPercentage=50.0", \ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This parameter has been removed. in favor of using the allocated memory limits instead of percentage based limits. |
||
"-jar", \ | ||
"/home/jpo-deduplicator.jar"] | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,7 +13,10 @@ services: | |
MAVEN_GITHUB_TOKEN: ${MAVEN_GITHUB_TOKEN} | ||
MAVEN_GITHUB_ORG: ${MAVEN_GITHUB_ORG} | ||
image: jpo-deduplicator:latest | ||
privileged: false # Set true to allow writing to /proc/sys/vm/drop_caches | ||
restart: ${RESTART_POLICY} | ||
ports: | ||
- "10091:10090" # JMX | ||
environment: | ||
DOCKER_HOST_IP: ${DOCKER_HOST_IP} | ||
KAFKA_BOOTSTRAP_SERVERS: ${KAFKA_BOOTSTRAP_SERVERS} | ||
|
@@ -25,6 +28,12 @@ services: | |
ENABLE_ODE_RAW_ENCODED_TIM_DEDUPLICATION: ${ENABLE_ODE_RAW_ENCODED_TIM_DEDUPLICATION} | ||
ENABLE_PROCESSED_SPAT_DEDUPLICATION: ${ENABLE_PROCESSED_SPAT_DEDUPLICATION} | ||
ENABLE_ODE_BSM_DEDUPLICATION: ${ENABLE_ODE_BSM_DEDUPLICATION} | ||
DEDUPLICATOR_ROCKSDB_TOTAL_OFF_HEAP_MEMORY: ${DEDUPLICATOR_ROCKSDB_TOTAL_OFF_HEAP_MEMORY:?error} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to error out if it isn't set? Seems like defaults are provided in the source code. |
||
DEDUPLICATOR_ROCKSDB_INDEX_FILTER_BLOCK_RATIO: ${DEDUPLICATOR_ROCKSDB_INDEX_FILTER_BLOCK_RATIO:?error} | ||
DEDUPLICATOR_ROCKSDB_TOTAL_MEMTABLE_MEMORY: ${DEDUPLICATOR_ROCKSDB_TOTAL_MEMTABLE_MEMORY:?error} | ||
DEDUPLICATOR_ROCKSDB_BLOCK_SIZE: ${DEDUPLICATOR_ROCKSDB_BLOCK_SIZE:?error} | ||
DEDUPLICATOR_ROCKSDB_N_MEMTABLES: ${DEDUPLICATOR_ROCKSDB_N_MEMTABLES:?error} | ||
DEDUPLICATOR_ROCKSDB_MEMTABLE_SIZE: ${DEDUPLICATOR_ROCKSDB_MEMTABLE_SIZE:?error} | ||
KAFKA_TYPE: ${KAFKA_TYPE} | ||
CONFLUENT_KEY: ${CONFLUENT_KEY} | ||
CONFLUENT_SECRET: ${CONFLUENT_SECRET} | ||
|
@@ -40,7 +49,7 @@ services: | |
deploy: | ||
resources: | ||
limits: | ||
memory: 3G | ||
memory: 1G | ||
depends_on: | ||
kafka: | ||
condition: service_healthy | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,144 @@ | ||
package us.dot.its.jpo.deduplicator.deduplicator; | ||
|
||
import lombok.extern.slf4j.Slf4j; | ||
import org.apache.kafka.streams.state.RocksDBConfigSetter; | ||
import org.rocksdb.BlockBasedTableConfig; | ||
import org.rocksdb.CompressionType; | ||
import org.rocksdb.Options; | ||
import org.rocksdb.RocksDB; | ||
import java.util.Map; | ||
|
||
/** | ||
* Bounded memory configuration for RocksDB for all topologies. | ||
* <p>References: | ||
* <ul> | ||
* <li><a href="https://docs.confluent.io/platform/current/streams/developer-guide/memory-mgmt.html#rocksdb">Confluent: RocksDB Memory Management</a></li> | ||
* <li><a href="https://docs.confluent.io/platform/current/streams/developer-guide/config-streams.html#rocksdb-config-setter">Confluent: RocksDB Config Setter</a></li> | ||
* </ul></p> | ||
* <p>Configured using environment variables: | ||
* <dl> | ||
* <dt>DEDUPLICATOR_ROCKSDB_TOTAL_OFF_HEAP_MEMORY</dt><dd>Total block cache size</dd> | ||
* <dt>DEDUPLICATOR_ROCKSDB_INDEX_FILTER_BLOCK_RATIO</dt><dd>Fraction of the block cache to use for high priority blocks (index | ||
* and filter blocks).</dd> | ||
* <dt>DEDUPLICATOR_ROCKSDB_TOTAL_MEMTABLE_MEMORY</dt><dd>Write buffer size, include in block cache</dd> | ||
* <dt>DEDUPLICATOR_ROCKSDB_BLOCK_SIZE</dt><dd>{@link org.rocksdb.BlockBasedTableConfig#blockSize()}, Default 4KB</dd> | ||
* <dt>DEDUPLICATOR_ROCKSDB_N_MEMTABLES</dt><dd>{@link org.rocksdb.Options#maxWriteBufferNumber()}, Default 2</dd> | ||
* <dt>DEDUPLICATOR_ROCKSDB_MEMTABLE_SIZE</dt><dd>{@link org.rocksdb.Options#writeBufferSize()}, Default 64MB</dd> | ||
* </dl> | ||
* </p> | ||
*/ | ||
|
||
@Slf4j | ||
public class BoundedMemoryRocksDBConfig implements RocksDBConfigSetter { | ||
|
||
private final static long TOTAL_OFF_HEAP_MEMORY; | ||
private final static double INDEX_FILTER_BLOCK_RATIO; | ||
private final static long TOTAL_MEMTABLE_MEMORY; | ||
|
||
// Block size: Default 4K | ||
private final static long BLOCK_SIZE; | ||
|
||
// MaxWriteBufferNumber: Default 2 | ||
private final static int N_MEMTABLES; | ||
|
||
// WriteBufferSize: Default 64MB | ||
private final static long MEMTABLE_SIZE; | ||
private final static long KB = 1024L; | ||
private final static long MB = KB * KB; | ||
|
||
static { | ||
RocksDB.loadLibrary(); | ||
// Initialize properties from env variables | ||
TOTAL_OFF_HEAP_MEMORY = getEnvLong("DEDUPLICATOR_ROCKSDB_TOTAL_OFF_HEAP_MEMORY", 128 * MB); | ||
INDEX_FILTER_BLOCK_RATIO = getEnvDouble("DEDUPLICATOR_ROCKSDB_INDEX_FILTER_BLOCK_RATIO", 0.1); | ||
TOTAL_MEMTABLE_MEMORY = getEnvLong("DEDUPLICATOR_ROCKSDB_TOTAL_MEMTABLE_MEMORY", 64 * MB); | ||
BLOCK_SIZE = getEnvLong("DEDUPLICATOR_ROCKSDB_BLOCK_SIZE", 4 * KB); | ||
N_MEMTABLES = getEnvInt("DEDUPLICATOR_ROCKSDB_N_MEMTABLES", 2); | ||
MEMTABLE_SIZE = getEnvLong("DEDUPLICATOR_ROCKSDB_MEMTABLE_SIZE", 16 * MB); | ||
|
||
log.info("Initialized BoundedMemoryRocksDBConfig. TOTAL_OFF_HEAP_MEMORY = {}, INDEX_FILTER_BLOCK_RATIO = {}," + | ||
" TOTAL_MEMTABLE_MEMORY = {}, BLOCK_SIZE = {}, N_MEMTABLES = {}, MEMTABLE_SIZE = {}", | ||
TOTAL_OFF_HEAP_MEMORY, INDEX_FILTER_BLOCK_RATIO, TOTAL_MEMTABLE_MEMORY, BLOCK_SIZE, N_MEMTABLES, | ||
MEMTABLE_SIZE); | ||
} | ||
|
||
// See #1 below | ||
private static final org.rocksdb.Cache cache | ||
= new org.rocksdb.LRUCache(TOTAL_OFF_HEAP_MEMORY, -1, false, INDEX_FILTER_BLOCK_RATIO); | ||
private static final org.rocksdb.WriteBufferManager writeBufferManager | ||
= new org.rocksdb.WriteBufferManager(TOTAL_MEMTABLE_MEMORY, cache); | ||
@Override | ||
public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) { | ||
log.info("Setting RocksDB config for store {}", storeName); | ||
BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig(); | ||
|
||
// These three options in combination will limit the memory used by RocksDB to the size passed to the block | ||
// cache (TOTAL_OFF_HEAP_MEMORY) | ||
tableConfig.setBlockCache(cache); | ||
tableConfig.setCacheIndexAndFilterBlocks(true); | ||
options.setWriteBufferManager(writeBufferManager); | ||
|
||
// These options are recommended to be set when bounding the total memory | ||
// See #2 below | ||
tableConfig.setCacheIndexAndFilterBlocksWithHighPriority(true); | ||
tableConfig.setPinTopLevelIndexAndFilter(true); | ||
|
||
// See #3 below | ||
tableConfig.setBlockSize(BLOCK_SIZE); | ||
options.setMaxWriteBufferNumber(N_MEMTABLES); | ||
options.setWriteBufferSize(MEMTABLE_SIZE); | ||
|
||
// Enable compression (optional). Compression can decrease the required storage | ||
// and increase the CPU usage of the machine. For CompressionType values, see | ||
// https://javadoc.io/static/org.rocksdb/rocksdbjni/6.4.6/org/rocksdb/CompressionType.html. | ||
options.setCompressionType(CompressionType.LZ4_COMPRESSION); | ||
options.setTableFormatConfig(tableConfig); | ||
log.info("Rocksdb set table config: {}, options: {}", tableConfig, options); | ||
} | ||
@Override | ||
public void close(final String storeName, final Options options) { | ||
// Cache and WriteBufferManager should not be closed here, as the same objects are shared by every store instance. | ||
} | ||
|
||
private static long getEnvLong(String name, long defaultValue) { | ||
String strValue = getEnvString(name); | ||
if (strValue == null) return defaultValue; | ||
|
||
try { | ||
return Long.parseLong(strValue); | ||
} catch (NumberFormatException nfe) { | ||
log.error("Error parsing env variable to long {}, {}", name, strValue, nfe); | ||
return defaultValue; | ||
} | ||
} | ||
|
||
private static int getEnvInt(String name, int defaultValue) { | ||
String strValue = getEnvString(name); | ||
if (strValue == null) return defaultValue; | ||
try { | ||
return Integer.parseInt(strValue); | ||
} catch (NumberFormatException nfe) { | ||
log.error("Error parsing env variable to long {}, {}", name, strValue, nfe); | ||
return defaultValue; | ||
} | ||
} | ||
|
||
private static double getEnvDouble(String name, double defaultValue) { | ||
String strValue = getEnvString(name); | ||
if (strValue == null) return defaultValue; | ||
try { | ||
return Double.parseDouble(strValue); | ||
} catch (NumberFormatException nfe) { | ||
log.error("Error parsing env variable to long {}, {}", name, strValue, nfe); | ||
return defaultValue; | ||
} | ||
} | ||
|
||
private static String getEnvString(String name) { | ||
String strValue = System.getenv(name); | ||
if (strValue == null) { | ||
log.warn("Env variable {} is not set", name); | ||
} | ||
return strValue; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this make it, so this image's JVM won't be able to use more than 1Gb of memory?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. This sets the containers memory limit to 1 GB. Currently 1 deduplicator is able to handle about 27 intersections for CDOT. Any more intersections and we should probably move to scaling horizontally instead of vertically.