diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml new file mode 100644 index 0000000..7660b5e --- /dev/null +++ b/.github/workflows/docker.yml @@ -0,0 +1,35 @@ +name: Docker build + +on: + pull_request: + types: [opened, synchronize, reopened] + +jobs: + jpo-jikkou: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v3 + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v2 + - name: Build + uses: docker/build-push-action@v3 + with: + context: jikkou + file: jikkou/Dockerfile.jikkou + cache-from: type=gha + cache-to: type=gha,mode=max + + jpo-kafka-connect: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v3 + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v2 + - name: Build + uses: docker/build-push-action@v3 + with: + context: kafka-connect + cache-from: type=gha + cache-to: type=gha,mode=max diff --git a/.github/workflows/dockerhub.yml b/.github/workflows/dockerhub.yml new file mode 100644 index 0000000..a967f05 --- /dev/null +++ b/.github/workflows/dockerhub.yml @@ -0,0 +1,62 @@ +name: "DockerHub Build and Push" + +on: + push: + branches: + - "develop" + - "master" + - "release/*" + +jobs: + dockerhub-jpo-jikkou: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v3 + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v2 + - name: Login to DockerHub + uses: docker/login-action@v2 + with: + username: ${{ secrets.DOCKERHUB_USERNAME }} + password: ${{ secrets.DOCKERHUB_TOKEN }} + + - name: Replace Docker tag + id: set_tag + run: echo "TAG=$(echo ${GITHUB_REF##*/} | sed 's/\//-/g')" >> $GITHUB_ENV + + - name: Build + uses: docker/build-push-action@v3 + with: + context: jikkou + file: jikkou/Dockerfile.jikkou + push: true + tags: usdotjpoode/jpo-jikkou:${{ env.TAG }} + cache-from: type=gha + cache-to: type=gha,mode=max + + dockerhub-jpo-kafka-connect: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v3 + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v2 + - name: Login to DockerHub + uses: docker/login-action@v2 + with: + username: ${{ secrets.DOCKERHUB_USERNAME }} + password: ${{ secrets.DOCKERHUB_TOKEN }} + + - name: Replace Docker tag + id: set_tag + run: echo "TAG=$(echo ${GITHUB_REF##*/} | sed 's/\//-/g')" >> $GITHUB_ENV + + - name: Build + uses: docker/build-push-action@v3 + with: + context: kafka-connect + push: true + tags: usdotjpoode/jpo-kafka-connect:${{ env.TAG }} + cache-from: type=gha + cache-to: type=gha,mode=max diff --git a/.gitignore b/.gitignore index 845959d..8d161e5 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,3 @@ -**/.env \ No newline at end of file +**/.env + +**/target \ No newline at end of file diff --git a/README.md b/README.md index c1c0580..4014f8c 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,9 @@ The JPO ITS utilities repository serves as a central location for deploying open - [Quick Run](#quick-run-1) - [4. MongoDB Kafka Connect](#4-mongodb-kafka-connect) - [Configuration](#configuration) + - [Configure Kafka Connector Creation](#configure-kafka-connector-creation) - [Quick Run](#quick-run-2) + - [Security Notice](#security-notice) @@ -88,7 +90,7 @@ An optional `kafka-init`, `schema-registry`, and `kafka-ui` instance can be depl ### Configure Topic Creation -The Kafka topics created by the `kafka-setup` service are configured in the [kafka-topics-values.yaml](kafka/kafka-topics-values.yaml) file. The topics in that file are organized by the application, and sorted into "Stream Topics" (those with `cleanup.policy` = `delete`) and "Table Topics" (with `cleanup.policy` = `compact`). +The Kafka topics created by the `kafka-setup` service are configured in the [kafka-topics-values.yaml](jikkou/kafka-topics-values.yaml) file. The topics in that file are organized by the application, and sorted into "Stream Topics" (those with `cleanup.policy` = `delete`) and "Table Topics" (with `cleanup.policy` = `compact`). The following enviroment variables can be used to configure Kafka Topic creation. @@ -103,8 +105,7 @@ The following enviroment variables can be used to configure Kafka Topic creation | `KAFKA_TOPIC_MIN_INSYNC_REPLICAS` | Minumum number of in-sync replicas (for use with ack=all) | | `KAFKA_TOPIC_RETENTION_MS` | Retention time for stream topics, milliseconds | | `KAFKA_TOPIC_DELETE_RETENTION_MS` | Tombstone retention time for compacted topics, milliseconds | - - +| `KAFKA_TOPIC_CONFIG_RELATIVE_PATH` | Relative path to the Kafka topic yaml configuration script, upper level directories are supported | ### Quick Run @@ -121,34 +122,49 @@ The following enviroment variables can be used to configure Kafka Topic creation ## 4. MongoDB Kafka Connect -The mongo-connector service connects to specified Kafka topics (as defined in the mongo-connector/connect_start.sh script) and deposits these messages to separate collections in the MongoDB Database. The codebase that provides this functionality comes from Confluent using their community licensed [cp-kafka-connect image](https://hub.docker.com/r/confluentinc/cp-kafka-connect). Documentation for this image can be found [here](https://docs.confluent.io/platform/current/connect/index.html#what-is-kafka-connect). +The mongo-connector service connects to specified Kafka topics and deposits these messages to separate collections in the MongoDB Database. The codebase that provides this functionality comes from Confluent using their community licensed [cp-kafka-connect image](https://hub.docker.com/r/confluentinc/cp-kafka-connect). Documentation for this image can be found [here](https://docs.confluent.io/platform/current/connect/index.html#what-is-kafka-connect). ### Configuration -Provided in the mongo-connector directory is a sample configuration shell script ([connect_start.sh](./kafka-connect/connect_start.sh)) that can be used to create kafka connectors to MongoDB. The connectors in kafka connect are defined in the format that follows: - -``` shell -declare -A config_name=([name]="topic_name" [collection]="mongo_collection_name" - [convert_timestamp]=true [timefield]="timestamp" [use_key]=true [key]="key" [add_timestamp]=true) -``` - -The format above describes the basic configuration for configuring a sink connector, this should be placed at the beginning of the connect_start.sh file. In general we recommend to keep the MongoDB collection name the same as the topic name to avoid confusion. Additionally, if there is a top level timefield set `convert_timestamp` to true and then specify the time field name that appears in the message. This will allow MongoDB to transform that message into a date object to allow for TTL creation and reduce message size. To override MongoDB's default message `_id` field, set `use_key` to true and then set the `key` property to "key". The "add_timestamp" field defines whether the connector will add a auto generated timestamp to each document. This allows for creation of Time To Live (TTL) indexes on the collections to help limit collection size growth. -After the sink connector is configured above, then make sure to call the createSink function with the config_name of the configuration like so: - -``` shell -createSink config_name -``` - -This needs to be put after the createSink function definition. To use a different `connect_start.sh` script, pass in the relative path of the new script by overriding the `CONNECT_SCRIPT_RELATIVE_PATH` environmental variable. +Kafka connectors are managed by the Set the `COMPOSE_PROFILES` environmental variable as follows: -- `kafka_connect` will only spin up the `kafka-connect` service in [docker-compose-connect](docker-compose-connect.yml) +- `kafka_connect` will only spin up the `kafka-connect` and `kafka-init` services in [docker-compose-connect](docker-compose-connect.yml) - NOTE: This implies that you will be using a separate Kafka and MongoDB cluster - `kafka_connect_standalone` will run the following: 1. `kafka-connect` service from [docker-compose-connect](docker-compose-connect.yml) - 2. `kafka` service from [docker-compose-kafka](docker-compose-kafka.yml) - 3. `mongo` and `mongo-setup` services from [docker-compose-mongo](docker-compose-mongo.yml) + 2. `kafka-init` service from [docker-compose-connect](docker-compose-connect.yml) + 3. `kafka` service from [docker-compose-kafka](docker-compose-kafka.yml) + 4. `mongo` and `mongo-setup` services from [docker-compose-mongo](docker-compose-mongo.yml) + +### Configure Kafka Connector Creation + +The Kafka connectors created by the `kafka-connect-setup` service are configured in the [kafka-connectors-values.yaml](jikkou/kafka-connectors-values.yaml) file. The connectors in that file are organized by the application, and given parameters to define the Kafka -> MongoDB sync connector: + +| Connector Variable | Required | Condition | Description| +|---|---|---|---| +| `topicName` | Yes | Always | The name of the Kafka topic to sync from | +| `collectionName` | Yes | Always | The name of the MongoDB collection to write to | +| `generateTimestamp` | No | Optional | Enable or disable adding a timestamp to each message (true/false) | +| `connectorName` | No | Optional | Override the name of the connector from the `collectionName` to this field instead | +| `useTimestamp` | No | Optional | Converts the `timestampField` field at the top level of the value to a BSON date | +| `timestampField` | No | Required if `useTimestamp` is `true` | The name of the timestamp field at the top level of the message | +| `useKey` | No | Optional | Override the document `_id` field in MongoDB to use a specified `keyField` from the message | +| `keyField` | No | Required if `useKey` is `true` | The name of the key field | + +The following environment variables can be used to configure Kafka Connectors: + +| Environment Variable | Description | +|---|---| +| `CONNECT_URL` | Kafka connect API URL | +| `CONNECT_LOG_LEVEL` | Kafka connect log level (`OFF`, `ERROR`, `WARN`, `INFO`) | +| `CONNECT_TASKS_MAX` | Number of concurrent tasks to configure on kafka connectors | +| `CONNECT_CREATE_ODE` | Whether to create kafka connectors for the ODE | +| `CONNECT_CREATE_GEOJSONCONVERTER` | Whether to create topics for the GeojsonConverter | +| `CONNECT_CREATE_CONFLICTMONITOR` | Whether to create kafka connectors for the Conflict Monitor | +| `CONNECT_CREATE_DEDUPLICATOR` | Whether to create topics for the Deduplicator | +| `CONNECT_CONFIG_RELATIVE_PATH` | Relative path to the Kafka connector yaml configuration script, upper level directories are supported | ### Quick Run @@ -171,3 +187,12 @@ Set the `COMPOSE_PROFILES` environmental variable as follows: 8. Feel free to test this with other topics or by producing to these topics using the [ODE](https://github.com/usdot-jpo-ode/jpo-ode) [Back to top](#toc) + +## Security Notice + +While default passwords are provided for development convenience, it is **strongly recommended** to: + +1. Change all passwords before deploying to any environment +2. Never use default passwords in production +3. Use secure password generation and management practices +4. Consider using Docker secrets or environment management tools for production deployments diff --git a/docker-compose-connect.yml b/docker-compose-connect.yml index c80d7be..bac2384 100644 --- a/docker-compose-connect.yml +++ b/docker-compose-connect.yml @@ -16,26 +16,33 @@ services: memory: 4G ports: - "8083:8083" + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8083/connectors"] + interval: 30s + timeout: 10s + retries: 4 depends_on: mongo: condition: service_healthy + required: false + kafka: + condition: service_healthy + required: false environment: - MONGO_URI: ${MONGO_URI} - MONGO_DB_NAME: ${MONGO_DB_NAME} CONNECT_BOOTSTRAP_SERVERS: ${KAFKA_BOOTSTRAP_SERVERS} CONNECT_REST_ADVERTISED_HOST_NAME: connect CONNECT_REST_PORT: 8083 CONNECT_GROUP_ID: kafka-connect-group - CONNECT_CONFIG_STORAGE_TOPIC: topic.kafka-connect-configs - CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 - CONNECT_CONFIG_STORAGE_CLEANUP_POLICY: compact + # Topics are created with jikkou in the kafka-setup service + CONNECT_CONFIG_STORAGE_TOPIC: topic.KafkaConnectConfigs + CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: -1 CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000 - CONNECT_OFFSET_STORAGE_TOPIC: topic.kafka-connect-offsets - CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_OFFSET_STORAGE_TOPIC: topic.KafkaConnectOffsets + CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: -1 CONNECT_OFFSET_STORAGE_CLEANUP_POLICY: compact - CONNECT_STATUS_STORAGE_TOPIC: topic.kafka-connect-status + CONNECT_STATUS_STORAGE_TOPIC: topic.KafkaConnectStatus CONNECT_STATUS_STORAGE_CLEANUP_POLICY: compact - CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: -1 CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" @@ -43,5 +50,39 @@ services: CONNECT_LOG4J_ROOT_LOGLEVEL: ${CONNECT_LOG_LEVEL} CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=${CONNECT_LOG_LEVEL},org.reflections=${CONNECT_LOG_LEVEL},com.mongodb.kafka=${CONNECT_LOG_LEVEL}" CONNECT_PLUGIN_PATH: /usr/share/confluent-hub-components + + kafka-connect-setup: + profiles: + - all + - kafka_connect + - kafka_connect_standalone + - kafka_connect_setup + image: jpo-jikkou + build: + context: jikkou + dockerfile: Dockerfile.jikkou + entrypoint: ./kafka_connector_init.sh + restart: on-failure + deploy: + resources: + limits: + cpus: '0.5' + memory: 1G + depends_on: + kafka-connect: + condition: service_healthy + required: false + environment: + CONNECT_URL: ${CONNECT_URL} + CONNECT_TASKS_MAX: ${CONNECT_TASKS_MAX} + CONNECT_CREATE_ODE: ${CONNECT_CREATE_ODE} + CONNECT_CREATE_GEOJSONCONVERTER: ${CONNECT_CREATE_GEOJSONCONVERTER} + CONNECT_CREATE_CONFLICTMONITOR: ${CONNECT_CREATE_CONFLICTMONITOR} + CONNECT_CREATE_DEDUPLICATOR: ${CONNECT_CREATE_DEDUPLICATOR} + CONNECT_CREATE_MECDEPOSIT: ${CONNECT_CREATE_MECDEPOSIT} + MONGO_CONNECTOR_USERNAME: ${MONGO_ADMIN_DB_USER} + MONGO_CONNECTOR_PASSWORD: ${MONGO_ADMIN_DB_PASS:?} + MONGO_DB_IP: ${MONGO_IP} + MONGO_DB_NAME: ${MONGO_DB_NAME} volumes: - - ${CONNECT_SCRIPT_RELATIVE_PATH}:/scripts/connect_start.sh \ No newline at end of file + - ${CONNECT_CONFIG_RELATIVE_PATH-./jikkou/kafka-connectors-values.yaml}:/app/kafka-connectors-values.yaml \ No newline at end of file diff --git a/docker-compose-kafka.yml b/docker-compose-kafka.yml index 73aab7c..345e840 100644 --- a/docker-compose-kafka.yml +++ b/docker-compose-kafka.yml @@ -19,7 +19,7 @@ services: deploy: resources: limits: - cpus: '1' + cpus: "1" memory: 4G volumes: - kafka:/bitnami @@ -37,31 +37,34 @@ services: KAFKA_CFG_DELETE_TOPIC_ENABLE: "true" KAFKA_CFG_LOG_RETENTION_HOURS: ${KAFKA_LOG_RETENTION_HOURS} KAFKA_CFG_LOG_RETENTION_BYTES: ${KAFKA_LOG_RETENTION_BYTES} + KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "false" logging: options: - max-size: "10m" + max-size: "10m" max-file: "5" - kafka-setup: profiles: - all - kafka_full - kafka_setup - image: kafka-setup + image: jpo-jikkou build: - context: kafka + context: jikkou dockerfile: Dockerfile.jikkou - restart: on-failure:3 # try up to 3 times + entrypoint: ./kafka_init.sh + restart: on-failure deploy: resources: limits: - cpus: '0.5' + cpus: "0.5" memory: 1G depends_on: kafka: condition: service_healthy + required: false environment: + KAFKA_BOOTSTRAP_SERVERS: ${KAFKA_BOOTSTRAP_SERVERS} KAFKA_TOPIC_PARTITIONS: ${KAFKA_TOPIC_PARTITIONS} KAFKA_TOPIC_REPLICAS: ${KAFKA_TOPIC_REPLICAS} KAFKA_TOPIC_MIN_INSYNC_REPLICAS: ${KAFKA_TOPIC_MIN_INSYNC_REPLICAS} @@ -71,7 +74,13 @@ services: KAFKA_TOPIC_CREATE_GEOJSONCONVERTER: ${KAFKA_TOPIC_CREATE_GEOJSONCONVERTER} KAFKA_TOPIC_CREATE_CONFLICTMONITOR: ${KAFKA_TOPIC_CREATE_CONFLICTMONITOR} KAFKA_TOPIC_CREATE_DEDUPLICATOR: ${KAFKA_TOPIC_CREATE_DEDUPLICATOR} - + KAFKA_TOPIC_CREATE_MECDEPOSIT: ${KAFKA_TOPIC_CREATE_MECDEPOSIT} + volumes: + - ${KAFKA_TOPIC_CONFIG_RELATIVE_PATH:-./jikkou/kafka-topics-values.yaml}:/app/kafka-topics-values.yaml + logging: + options: + max-size: "10m" + max-file: "5" kafka-schema-registry: profiles: @@ -83,11 +92,12 @@ services: deploy: resources: limits: - cpus: '0.5' + cpus: "0.5" memory: 1G depends_on: kafka: condition: service_healthy + required: false ports: - "8081:8081" environment: @@ -100,6 +110,10 @@ services: interval: 30s timeout: 10s retries: 4 + logging: + options: + max-size: "10m" + max-file: "5" kafka-ui: profiles: @@ -107,22 +121,29 @@ services: - kafka_full - kafka_ui hostname: kafka-ui - image: ghcr.io/kafbat/kafka-ui:v1.0.0 + image: ghcr.io/kafbat/kafka-ui:v1.1.0 restart: ${RESTART_POLICY} deploy: resources: limits: - cpus: '0.5' + cpus: "0.5" memory: 1G ports: - 8001:8080 depends_on: kafka: condition: service_healthy + required: false environment: - DYNAMIC_CONFIG_ENABLED: 'true' + DYNAMIC_CONFIG_ENABLED: "true" KAFKA_CLUSTERS_0_NAME: local KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: ${KAFKA_BOOTSTRAP_SERVERS} + KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: kafka-connect + KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: ${CONNECT_URL} + logging: + options: + max-size: "10m" + max-file: "5" volumes: - kafka: \ No newline at end of file + kafka: diff --git a/docker-compose-mongo.yml b/docker-compose-mongo.yml index aefc4f9..4a460bc 100644 --- a/docker-compose-mongo.yml +++ b/docker-compose-mongo.yml @@ -5,7 +5,7 @@ services: - kafka_connect_standalone - mongo_full - mongo - image: mongo:7 + image: mongo:8 hostname: mongo restart: ${RESTART_POLICY} deploy: @@ -17,24 +17,46 @@ services: - "27017:27017" environment: MONGO_INITDB_ROOT_USERNAME: ${MONGO_ADMIN_DB_USER} - MONGO_INITDB_ROOT_PASSWORD: ${MONGO_ADMIN_DB_PASS} - MONGO_INITDB_DATABASE: admin + MONGO_INITDB_ROOT_PASSWORD: ${MONGO_ADMIN_DB_PASS:?} + MONGO_INITDB_DATABASE: admin + MONGO_DATABASE_STORAGE_COLLECTION_NAME: ${MONGO_DATABASE_STORAGE_COLLECTION_NAME} + MONGO_DATABASE_SIZE_GB: ${MONGO_DATABASE_SIZE_GB} + MONGO_DATABASE_SIZE_TARGET_PERCENT: ${MONGO_DATABASE_SIZE_TARGET_PERCENT} + MONGO_DATABASE_DELETE_THRESHOLD_PERCENT: ${MONGO_DATABASE_DELETE_THRESHOLD_PERCENT} + MONGO_DATABASE_MAX_TTL_RETENTION_SECONDS: ${MONGO_DATABASE_MAX_TTL_RETENTION_SECONDS} + MONGO_DATABASE_MIN_TTL_RETENTION_SECONDS: ${MONGO_DATABASE_MIN_TTL_RETENTION_SECONDS} + MONGO_ENABLE_STORAGE_RECORD: ${MONGO_ENABLE_STORAGE_RECORD} + MONGO_ENABLE_DYNAMIC_TTL: ${MONGO_ENABLE_DYNAMIC_TTL} + MONGO_DB_NAME: ${MONGO_DB_NAME} + MONGO_DB_KEYFILE_STRING: ${MONGO_DB_KEYFILE_STRING:?} entrypoint: - bash - -c - | - openssl rand -base64 741 > /mongo_keyfile - chmod 400 /mongo_keyfile - chown 999:999 /mongo_keyfile - exec docker-entrypoint.sh $$@ - command: "mongod --bind_ip_all --replSet rs0 --keyFile /mongo_keyfile" + apt update + apt install -y cron gettext systemctl dos2unix + systemctl start cron + systemctl enable cron + envsubst < /data/manage-volume-cron > /etc/cron.d/manage-volume-cron + dos2unix /etc/cron.d/manage-volume-cron + chmod 644 /etc/cron.d/manage-volume-cron + systemctl restart cron + echo "$MONGO_DB_KEYFILE_STRING" > /data/keyfile.txt + chmod 400 /data/keyfile.txt + chown 999:999 /data/keyfile.txt + + exec docker-entrypoint.sh $$@ + command: ["mongod", "--replSet", "rs0", "--bind_ip_all", "--keyFile", "/data/keyfile.txt"] volumes: - mongo_data:/data/db + - ./mongo/manage-volume-cron:/data/manage-volume-cron + - ./mongo/manage_volume.js:/data/manage_volume.js healthcheck: - test: | - echo 'db.runCommand("ping").ok' | mongosh localhost:27017/test --quiet + # Removal of replica set status check as the mongo-setup container is what actually configures the replica set + test: mongosh --quiet --username ${MONGO_ADMIN_DB_USER} --password ${MONGO_ADMIN_DB_PASS} --authenticationDatabase admin --eval "db.adminCommand('ping').ok" interval: 10s - start_period: 30s + timeout: 10s + retries: 10 mongo-setup: profiles: @@ -42,10 +64,12 @@ services: - kafka_connect_standalone - mongo_full - mongo - image: mongo:7 + image: mongo:8 hostname: mongo_setup depends_on: - - mongo + mongo: + condition: service_healthy + required: false restart: on-failure deploy: resources: @@ -54,15 +78,25 @@ services: memory: 1G environment: MONGO_ADMIN_DB_USER: ${MONGO_ADMIN_DB_USER} - MONGO_ADMIN_DB_PASS: ${MONGO_ADMIN_DB_PASS} + MONGO_ADMIN_DB_PASS: ${MONGO_ADMIN_DB_PASS:?} MONGO_DB_NAME: ${MONGO_DB_NAME} MONGO_READ_WRITE_USER: ${MONGO_READ_WRITE_USER} - MONGO_READ_WRITE_PASS: ${MONGO_READ_WRITE_PASS} - MONGO_COLLECTION_TTL: ${MONGO_COLLECTION_TTL} + MONGO_READ_WRITE_PASS: ${MONGO_READ_WRITE_PASS:?} + MONGO_READ_USER: ${MONGO_READ_USER} + MONGO_READ_PASS: ${MONGO_READ_PASS:?} + MONGO_EXPORTER_USERNAME: ${MONGO_EXPORTER_USERNAME} + MONGO_EXPORTER_PASSWORD: ${MONGO_EXPORTER_PASSWORD:?} + MONGO_DATA_RETENTION_SECONDS: ${MONGO_DATA_RETENTION_SECONDS} + MONGO_ASN_RETENTION_SECONDS: ${MONGO_ASN_RETENTION_SECONDS} + CONNECT_CREATE_GEOJSONCONVERTER: ${CONNECT_CREATE_GEOJSONCONVERTER} + CONNECT_CREATE_CONFLICTMONITOR: ${CONNECT_CREATE_CONFLICTMONITOR} + CONNECT_CREATE_DEDUPLICATOR: ${CONNECT_CREATE_DEDUPLICATOR} + CONNECT_CREATE_MECDEPOSIT: ${CONNECT_CREATE_MECDEPOSIT} entrypoint: ["/bin/bash", "setup_mongo.sh"] volumes: - ${MONGO_SETUP_SCRIPT_RELATIVE_PATH}:/setup_mongo.sh - ${MONGO_CREATE_INDEXES_SCRIPT_RELATIVE_PATH}:/create_indexes.js + - ${MONGO_INIT_REPLICAS_SCRIPT_RELATIVE_PATH}:/init_replicas.js mongo-express: profiles: @@ -80,14 +114,16 @@ services: ports: - "8002:8081" depends_on: - - mongo + mongo: + condition: service_healthy + required: false environment: - ME_CONFIG_MONGODB_SERVER: "mongo" ME_CONFIG_MONGODB_ENABLE_ADMIN: "true" ME_CONFIG_BASICAUTH_USERNAME: ${MONGO_EXPRESS_USER} - ME_CONFIG_BASICAUTH_PASSWORD: ${MONGO_EXPRESS_PASS} + ME_CONFIG_BASICAUTH_PASSWORD: ${MONGO_EXPRESS_PASS:?} ME_CONFIG_MONGODB_ADMINUSERNAME: ${MONGO_ADMIN_DB_USER} - ME_CONFIG_MONGODB_ADMINPASSWORD: ${MONGO_ADMIN_DB_PASS} + ME_CONFIG_MONGODB_ADMINPASSWORD: ${MONGO_ADMIN_DB_PASS:?} + ME_CONFIG_MONGODB_URL: mongodb://${MONGO_ADMIN_DB_USER}:${MONGO_ADMIN_DB_PASS}@${MONGO_IP}:27017/?authSource=admin&directConnection=true healthcheck: test: ["CMD", "curl", "-f", "http://localhost:8081"] interval: 30s diff --git a/docs/Release_notes.md b/docs/Release_notes.md index e2914e7..30f76fe 100644 --- a/docs/Release_notes.md +++ b/docs/Release_notes.md @@ -1 +1,45 @@ -JPO-UTILS Release Notes \ No newline at end of file +## JPO-UTILS Release Notes + +## Version 2.0.0 + +### **Summary** +The first release of the jpo-utils package. This package is focused on hosting utility applications that other parts of the Conflict Monitor depend on. Many of these components are provided by 3rd party groups such as Kafka and MongoDB. This first official release formalizes the migration of multiple components from other repositories to the jpo-utils repository. The migrated components are as follows +- MongoDB - Transferred from the jpo-conflictmonitor +- Kafka - Transferred from the jpo-ode +- Deduplicator - Transferred from the jpo-conflictmonitor +- Kafka Connect - Transferred from the jpo-conflictmonitor repository + +This release also makes additional changes to the submodule components as follows +- Refactored docker-compose files to use compose profiles for modular component approach +- Adds in Jikkou for managing Kafka topic creation and Kafka connect behaviors +- Updates mongoDB indexes and collection list to support new collections +- Updated jpo-deduplicator for compatibility with jpo-ode version 4.0.0 + +Enhancements in this release: +USDOT PR 2: Kafka Connect Module Addition +USDOT PR 3: Kafka topics management +USDOT PR 5: Add TMC filtered TIM topic +USDOT PR 6: Merge Release/2024 q3 to master branch +USDOT PR 7: Sync Develop with Master branch for Q3 release 2024 +USDOT PR 8: Kafka Connect Jikkou Refactor +USDOT PR 9: Environmental Variable Fixes +USDOT PR 10: Kafka topic addition +USDOT PR 11: updates to mongo and kafka connect +USDOT PR 12: update keyfile generation to use env vars +USDOT PR 13: Added Spat and ASN Tim Deduplication +USDOT PR 14: Adding DB entries for Progression Events +USDOT PR 15: Cimms event aggregation topics +USDOT PR 16: Updating Conflict Monitor Mongo Connectors +USDOT PR 17: chore: make kafka_init.sh executable +USDOT PR 18: Adding JPO-Deduplicator to jpo-utils +CDOT PR 3: Updated Deduplicator for Compatibility with ode 4.0 +CDOT PR 4: Enable BSM deduplication by default +USDOT PR 21: Adding missing cm topic +USDOT PR 22: Adding Default Credentials +USDOT PR 23: Adding missing ode topics +USDOT PR 24: Index updates +CDOT PR 6: Adding MEC Deposit Resources +USDOT PR 25: Updating version for kafka ui to latest release +CDOT PR 7: Tim compatibility and CI updates +CDOT PR 8: Jpo deduplicator removal +USDOT PR 27: Updates to jikkou image build (for dockerhub) \ No newline at end of file diff --git a/kafka/Dockerfile.jikkou b/jikkou/Dockerfile.jikkou similarity index 53% rename from kafka/Dockerfile.jikkou rename to jikkou/Dockerfile.jikkou index 139b4a1..17bb27d 100644 --- a/kafka/Dockerfile.jikkou +++ b/jikkou/Dockerfile.jikkou @@ -1,16 +1,26 @@ -FROM streamthoughts/jikkou:0.35.2 +FROM streamthoughts/jikkou:0.35.3 # Root user is required to run the 'jikkou apply' command in the kafka_init.sh script USER root +# install yq for kafka connect status check +RUN apk add yq + COPY ./application.conf /app/application.conf COPY ./jikkouconfig /etc/jikkou/config + +# kafka topic creation files COPY ./kafka-topics-template.jinja /app/kafka-topics-template.jinja -COPY ./kafka-topics-values.yaml /app/kafka-topics-values.yaml COPY ./kafka_init.sh /app/kafka_init.sh +# kafka connect creation files +COPY ./kafka-connectors-template.jinja /app/kafka-connectors-template.jinja +COPY ./kafka_connector_init.sh /app/kafka_connector_init.sh + # Create/update topics then exit container -ENTRYPOINT ./kafka_init.sh +# Disabled by default to be applied in the docker-compose.yml instead +# ENTRYPOINT ./kafka_init.sh +# ENTRYPOINT ./kafka_connector_init.sh ## For dev & testing, uncomment to keep the container running to be be able to ## use the Jikkou command line within Docker Desktop: diff --git a/kafka/application.conf b/jikkou/application.conf similarity index 98% rename from kafka/application.conf rename to jikkou/application.conf index 9956fce..54635df 100644 --- a/kafka/application.conf +++ b/jikkou/application.conf @@ -22,7 +22,7 @@ jikkou { # The default Kafka Client configuration client { bootstrap.servers = "kafka:9092" - bootstrap.servers = ${?JIKKOU_DEFAULT_KAFKA_BOOTSTRAP_SERVERS} + bootstrap.servers = ${?KAFKA_BOOTSTRAP_SERVERS} # security.protocol = "SSL" # ssl.keystore.location = "/tmp/client.keystore.p12" # ssl.keystore.password = "password" @@ -55,6 +55,7 @@ jikkou { name = "kafka-connect" # URL of the Kafka Connect service url = "http://kafka-connect:8083" + url = ${?CONNECT_URL} # Method to use for authenticating on Kafka Connect. Available values are: [none, basicauth, ssl] authMethod = none # Use when 'authMethod' is 'basicauth' to specify the username for Authorization Basic header diff --git a/kafka/jikkouconfig b/jikkou/jikkouconfig similarity index 100% rename from kafka/jikkouconfig rename to jikkou/jikkouconfig diff --git a/jikkou/kafka-connectors-template.jinja b/jikkou/kafka-connectors-template.jinja new file mode 100644 index 0000000..bb97cb0 --- /dev/null +++ b/jikkou/kafka-connectors-template.jinja @@ -0,0 +1,76 @@ +{# ----------------- Create Kafka Connectors to MongoDB per app ----------------- #} +{% macro create_connector(app) %} + +{# Table Topics #} +{% for connector in app.connectors %} +--- +apiVersion: "kafka.jikkou.io/v1beta1" {# The api version (required) #} +kind: "KafkaConnector" {# The resource kind (required) #} +metadata: + name: sink.{{ connector.connectorName | default(connector.collectionName) }} + labels: + kafka.jikkou.io/connect-cluster: {{ values.clusterName }} +spec: + connectorClass: {{ system.env.KAFKA_CONNECT_CONNECTOR_CLASS | default(values.connectorClass) }} + tasksMax: {{ system.env.KAFKA_TOPIC_PARTITIONS | default(values.tasksMax) }} + config: + collection: {{ connector.collectionName }} + connection.uri: mongodb://{{ system.env.MONGO_CONNECTOR_USERNAME }}:{{ system.env.MONGO_CONNECTOR_PASSWORD }}@{{ system.env.MONGO_DB_IP }}:27017/ + connector.class: com.mongodb.kafka.connect.MongoSinkConnector + database: {{ system.env.MONGO_DB_NAME }} + errors.log.enable: false + errors.log.include.messages: false + errors.tolerance: all + group.id: connector-consumer + key.converter: org.apache.kafka.connect.storage.StringConverter + key.converter.schemas.enable: false + mongo.errors.tolerance: all + topics: {{ connector.topicName }} + value.converter: org.apache.kafka.connect.json.JsonConverter + value.converter.schemas.enable: false + + {%+ if connector.generateTimestamp %}transforms: AddTimestamp,AddedTimestampConverter{% endif %} + {%+ if connector.generateTimestamp %}transforms.AddedTimestampConverter.field: recordGeneratedAt{% endif %} + {%+ if connector.generateTimestamp %}transforms.AddedTimestampConverter.target.type: Timestamp{% endif %} + {%+ if connector.generateTimestamp %}transforms.AddedTimestampConverter.type: org.apache.kafka.connect.transforms.TimestampConverter$Value{% endif %} + {%+ if connector.generateTimestamp %}transforms.AddTimestamp.timestamp.field: recordGeneratedAt{% endif %} + {%+ if connector.generateTimestamp %}transforms.AddTimestamp.type: org.apache.kafka.connect.transforms.InsertField$Value{% endif %} + + {%+ if connector.useTimestamp %}transforms: TimestampConverter{% endif %} + {%+ if connector.useTimestamp %}transforms.TimestampConverter.field: {{ connector.timestampField }}{% endif %} + {%+ if connector.useTimestamp %}transforms.TimestampConverter.type: org.apache.kafka.connect.transforms.TimestampConverter$Value{% endif %} + {%+ if connector.useTimestamp %}transforms.TimestampConverter.target.type: Timestamp{% endif %} + + {%+ if connector.useKey %}document.id.strategy: com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy{% endif %} + {%+ if connector.useKey %}document.id.strategy.partial.value.projection.list: {{ connector.keyField }}{% endif %} + {%+ if connector.useKey %}document.id.strategy.partial.value.projection.type: AllowList{% endif %} + {%+ if connector.useKey %}document.id.strategy.overwrite.existing: true{% endif %} + + state: "RUNNING" +{% endfor %} + +{% endmacro %} + +{#------- Create topics for apps with env variable = true ----------#} +{% if system.env.CONNECT_CREATE_ODE %} +{{ create_connector(values.apps.ode) }} +{% endif %} + +{% if system.env.CONNECT_CREATE_GEOJSONCONVERTER %} +{{ create_connector(values.apps.geojsonconverter) }} +{% endif %} + +{% if system.env.CONNECT_CREATE_CONFLICTMONITOR %} +{{ create_connector(values.apps.conflictmonitor) }} +{% endif %} + +{% if system.env.CONNECT_CREATE_DEDUPLICATOR %} +{{ create_connector(values.apps.deduplicator) }} +{% else %} +{{ create_connector(values.apps.ode_duplicated) }} +{{ create_connector(values.apps.geojsonconverter_duplicated) }} +{% endif %} + +{% if system.env.CONNECT_CREATE_MECDEPOSIT %} +{{ create_connector(values.apps.mecdeposit) }} +{% endif %} diff --git a/jikkou/kafka-connectors-values.yaml b/jikkou/kafka-connectors-values.yaml new file mode 100644 index 0000000..8034c4b --- /dev/null +++ b/jikkou/kafka-connectors-values.yaml @@ -0,0 +1,358 @@ +#====================================================================================== +# Kafka Connectors configuration settings +#====================================================================================== + +#-------------------------------------------------------------------------------------- +# Configuration parameters used to create kafka connectors to sync data from Kafka topics +# to MongoDB collections. +# +# Ref: https://docs.confluent.io/platform/current/connect/index.html#connect-connectors +#-------------------------------------------------------------------------------------- + +clusterName: "kafka-connect" +connectorClass: "com.mongodb.kafka.connect.MongoSinkConnector" +tasksMax: 1 + + +#-------------------------------------------------------------------------------------- +# Kafka Connectors are grouped by application. Apps with the corresponding environment variable +# equal to true are create or updated. +# - ode +# - geojsonconverter +# - conflictmonitor +# +# Some Kafka topics have a non-duplicated topic and a duplicated topic. The duplicated topic will +# used if the deduplicator is not enabled. +# IF env var: KAFKA_TOPIC_CREATE_DEDUPLICATOR = true +# - deduplicator +# ELSE +# - ode_duplicated +# - geojsonconverter_duplicated +# +# The Kafka Connectors have the following configuration settings: +# Required settings: +# - topicName: The name of the Kafka topic to read from +# - collectionName: The name of the MongoDB collection to write to +# Optional settings: +# - generateTimestamp: If true, the connector will add a timestamp field to the document +# - connectorName: The name of the connector +# - useTimestamp: converts the "timestampField" field at the top level of the value to a BSON date +# - timestampField: The name of the timestamp field +# - useKey: If true, the connector will use the "keyField" as the document _id in MongoDB +# - keyField: The name of the key field +# +#-------------------------------------------------------------------------------------- +apps: + ode: + name: jpo-ode + connectors: + - topicName: topic.OdeRawEncodedBSMJson + collectionName: OdeRawEncodedBSMJson + generateTimestamp: true + - topicName: topic.OdeRawEncodedMAPJson + collectionName: OdeRawEncodedMAPJson + generateTimestamp: true + - topicName: topic.OdeRawEncodedSPATJson + collectionName: OdeRawEncodedSPATJson + generateTimestamp: true + - topicName: topic.OdeSpatJson + collectionName: OdeSpatJson + generateTimestamp: true + - topicName: topic.OdeTimJsonTMCFiltered + collectionName: OdeTimJsonTMCFiltered + generateTimestamp: true + - topicName: topic.OdeTimBroadcastJson + collectionName: OdeTimBroadcastJson + generateTimestamp: true + - topicName: topic.OdeTIMCertExpirationTimeJson + collectionName: OdeTIMCertExpirationTimeJson + generateTimestamp: true + - topicName: topic.OdeRawEncodedPSMJson + collectionName: OdeRawEncodedPSMJson + generateTimestamp: true + - topicName: topic.OdePsmJson + collectionName: OdePsmJson + generateTimestamp: true + - topicName: topic.OdeRawEncodedSRMJson + collectionName: OdeRawEncodedSRMJson + generateTimestamp: true + - topicName: topic.OdeSrmJson + collectionName: OdeSrmJson + generateTimestamp: true + - topicName: topic.OdeRawEncodedSSMJson + collectionName: OdeRawEncodedSSMJson + generateTimestamp: true + - topicName: topic.OdeSsmJson + collectionName: OdeSsmJson + generateTimestamp: true + - topicName: topic.OdeDriverAlertJson + collectionName: OdeDriverAlertJson + generateTimestamp: true + ode_duplicated: + name: ode-duplicated + connectors: + - topicName: topic.OdeMapJson + collectionName: OdeMapJson + generateTimestamp: true + - topicName: topic.OdeTimJson + collectionName: OdeTimJson + generateTimestamp: true + - topicName: topic.OdeBsmJson + collectionName: OdeBsmJson + generateTimestamp: true + - topicName: topic.OdeRawEncodedTIMJson + collectionName: OdeRawEncodedTIMJson + generateTimestamp: true + geojsonconverter: + name: geojsonconverter + connectors: + - topicName: topic.ProcessedBsm + collectionName: ProcessedBsm + generateTimestamp: true + geojsonconverter_duplicated: + name: geojsonconverter-duplicated + connectors: + - topicName: topic.ProcessedMap + collectionName: ProcessedMap + generateTimestamp: true + - topicName: topic.ProcessedSpat + collectionName: ProcessedSpat + generateTimestamp: true + deduplicator: + name: deduplicator + connectors: + - topicName: topic.DeduplicatedProcessedMap + collectionName: ProcessedMap + generateTimestamp: true + connectorName: DeduplicatedProcessedMap + - topicName: topic.DeduplicatedOdeMapJson + collectionName: OdeMapJson + generateTimestamp: true + connectorName: DeduplicatedOdeMapJson + - topicName: topic.DeduplicatedOdeTimJson + collectionName: OdeTimJson + generateTimestamp: true + connectorName: DeduplicatedOdeTimJson + - topicName: topic.DeduplicatedOdeRawEncodedTIMJson + collectionName: OdeRawEncodedTIMJson + generateTimestamp: true + connectorName: DeduplicatedOdeRawEncodedTIMJson + - topicName: topic.DeduplicatedOdeBsmJson + collectionName: OdeBsmJson + generateTimestamp: true + connectorName: DeduplicatedOdeBsmJson + - topicName: topic.DeduplicatedProcessedSpat + collectionName: ProcessedSpat + generateTimestamp: true + connectorName: DeduplicatedProcessedSpat + conflictmonitor: + name: conflictmonitor + connectors: + # Record Events + - topicName: topic.CmStopLinePassageEvent + collectionName: CmStopLinePassageEvent + useTimestamp: true + timestampField: eventGeneratedAt + - topicName: topic.CmStopLineStopEvent + collectionName: CmStopLineStopEvent + useTimestamp: true + timestampField: eventGeneratedAt + - topicName: topic.CmSignalStateConflictEvents + collectionName: CmSignalStateConflictEvents + useTimestamp: true + timestampField: eventGeneratedAt + - topicName: topic.CmIntersectionReferenceAlignmentEvents + collectionName: CmIntersectionReferenceAlignmentEvents + useTimestamp: true + timestampField: eventGeneratedAt + - topicName: topic.CmSignalGroupAlignmentEvents + collectionName: CmSignalGroupAlignmentEvents + useTimestamp: true + timestampField: eventGeneratedAt + - topicName: topic.CmConnectionOfTravelEvent + collectionName: CmConnectionOfTravelEvent + useTimestamp: true + timestampField: eventGeneratedAt + - topicName: topic.CmLaneDirectionOfTravelEvent + collectionName: CmLaneDirectionOfTravelEvent + useTimestamp: true + timestampField: eventGeneratedAt + - topicName: topic.CmSpatTimeChangeDetailsEvent + collectionName: CmSpatTimeChangeDetailsEvent + useTimestamp: true + timestampField: eventGeneratedAt + - topicName: topic.CmSpatMinimumDataEvents + collectionName: CmSpatMinimumDataEvents + useTimestamp: true + timestampField: eventGeneratedAt + - topicName: topic.CmMapBroadcastRateEvents + collectionName: CmMapBroadcastRateEvents + useTimestamp: true + timestampField: eventGeneratedAt + - topicName: topic.CmMapMinimumDataEvents + collectionName: CmMapMinimumDataEvents + useTimestamp: true + timestampField: eventGeneratedAt + - topicName: topic.CmSpatBroadcastRateEvents + collectionName: CmSpatBroadcastRateEvents + useTimestamp: true + timestampField: eventGeneratedAt + - topicName: topic.CmTimestampDeltaEvent + collectionName: CmTimestampDeltaEvent + useTimestamp: true + timestampField: eventGeneratedAt + - topicName: topic.CmSpatMessageCountProgressionEvents + collectionName: CmSpatMessageCountProgressionEvents + useTimestamp: true + timestampField: eventGeneratedAt + - topicName: topic.CmMapMessageCountProgressionEvents + collectionName: CmMapMessageCountProgressionEvents + useTimestamp: true + timestampField: eventGeneratedAt + - topicName: topic.CmBsmMessageCountProgressionEvents + collectionName: CmBsmMessageCountProgressionEvents + useTimestamp: true + timestampField: eventGeneratedAt + - topicName: topic.CmSpatMinimumDataEventAggregation + collectionName: CmSpatMinimumDataEventAggregation + generateTimestamp: true + timestampField: eventGeneratedAt + - topicName: topic.CmMapMinimumDataEventAggregation + collectionName: CmMapMinimumDataEventAggregation + generateTimestamp: true + timestampField: eventGeneratedAt + - topicName: topic.CmIntersectionReferenceAlignmentEventAggregation + collectionName: CmIntersectionReferenceAlignmentEventAggregation + generateTimestamp: true + timestampField: eventGeneratedAt + - topicName: topic.CmSignalGroupAlignmentEventAggregation + collectionName: CmSignalGroupAlignmentEventAggregation + generateTimestamp: true + timestampField: eventGeneratedAt + - topicName: topic.CmSignalStateConflictEventAggregation + collectionName: CmSignalStateConflictEventAggregation + generateTimestamp: true + timestampField: eventGeneratedAt + - topicName: topic.CmSpatTimeChangeDetailsEventAggregation + collectionName: CmSpatTimeChangeDetailsEventAggregation + generateTimestamp: true + timestampField: eventGeneratedAt + - topicName: topic.CmEventStateProgressionEventAggregation + collectionName: CmEventStateProgressionEventAggregation + generateTimestamp: true + timestampField: eventGeneratedAt + - topicName: topic.CmBsmMessageCountProgressionEventAggregation + collectionName: CmBsmMessageCountProgressionEventAggregation + generateTimestamp: true + timestampField: eventGeneratedAt + - topicName: topic.CmMapMessageCountProgressionEventAggregation + collectionName: CmMapMessageCountProgressionEventAggregation + generateTimestamp: true + timestampField: eventGeneratedAt + - topicName: topic.CmSpatMessageCountProgressionEventAggregation + collectionName: CmSpatMessageCountProgressionEventAggregation + generateTimestamp: true + timestampField: eventGeneratedAt + + # Record BSM events: + - topicName: topic.CmBsmEvents + collectionName: CmBsmEvents + generateTimestamp: true + + # Record Assessments: + - topicName: topic.CmLaneDirectionOfTravelAssessment + collectionName: CmLaneDirectionOfTravelAssessment + useTimestamp: true + timestampField: assessmentGeneratedAt + - topicName: topic.CmConnectionOfTravelAssessment + collectionName: CmConnectionOfTravelAssessment + useTimestamp: true + timestampField: assessmentGeneratedAt + - topicName: topic.CmSignalStateEventAssessment + collectionName: CmSignalStateEventAssessment + useTimestamp: true + timestampField: assessmentGeneratedAt + - topicName: topic.CmStopLineStopAssessment + collectionName: CmStopLineStopAssessment + useTimestamp: true + timestampField: assessmentGeneratedAt + + # Record Notifications + - topicName: topic.CmSpatTimeChangeDetailsNotification + collectionName: CmSpatTimeChangeDetailsNotification + useTimestamp: true + timestampField: notificationGeneratedAt + - topicName: topic.CmLaneDirectionOfTravelNotification + collectionName: CmLaneDirectionOfTravelNotification + useTimestamp: true + timestampField: notificationGeneratedAt + - topicName: topic.CmConnectionOfTravelNotification + collectionName: CmConnectionOfTravelNotification + useTimestamp: true + timestampField: notificationGeneratedAt + - topicName: topic.CmAppHealthNotifications + collectionName: CmAppHealthNotifications + useTimestamp: true + timestampField: notificationGeneratedAt + - topicName: topic.CmSignalStateConflictNotification + collectionName: CmSignalStateConflictNotification + useTimestamp: true + timestampField: notificationGeneratedAt + - topicName: topic.CmSignalGroupAlignmentNotification + collectionName: CmSignalGroupAlignmentNotification + useTimestamp: true + timestampField: notificationGeneratedAt + - topicName: topic.CmNotification + collectionName: CmNotification + useTimestamp: true + timestampField: notificationGeneratedAt + useKey: true + keyField: key + - topicName: topic.CmStopLineStopNotification + collectionName: CmStopLineStopNotification + useTimestamp: true + timestampField: notificationGeneratedAt + useKey: true + keyField: key + - topicName: topic.CmStopLinePassageNotification + collectionName: CmStopLinePassageNotification + useTimestamp: true + timestampField: notificationGeneratedAt + useKey: true + keyField: key + - topicName: topic.CmTimestampDeltaNotification + collectionName: CmTimestampDeltaNotification + useTimestamp: true + timestampField: notificationGeneratedAt + useKey: true + keyField: key + - topicName: topic.CmEventStateProgressionNotification + collectionName: CmEventStateProgressionNotification + generateTimestamp: true + timestampField: eventGeneratedAt + - topicName: topic.CmIntersectionReferenceAlignmentNotificationAggregation + collectionName: CmIntersectionReferenceAlignmentNotificationAggregation + generateTimestamp: true + timestampField: notificationGeneratedAt + - topicName: topic.CmSignalGroupAlignmentNotificationAggregation + collectionName: CmSignalGroupAlignmentNotificationAggregation + generateTimestamp: true + timestampField: notificationGeneratedAt + - topicName: topic.CmSignalStateConflictNotificationAggregation + collectionName: CmSignalStateConflictNotificationAggregation + generateTimestamp: true + timestampField: notificationGeneratedAt + - topicName: topic.CmSpatTimeChangeDetailsNotificationAggregation + collectionName: CmSpatTimeChangeDetailsNotificationAggregation + generateTimestamp: true + timestampField: notificationGeneratedAt + - topicName: topic.CmEventStateProgressionNotificationAggregation + collectionName: CmEventStateProgressionNotificationAggregation + generateTimestamp: true + timestampField: notificationGeneratedAt + mecdeposit: + name: mecdeposit + connectors: + - topicName: topic.MecDepositMetrics + collectionName: MecDepositMetrics + generateTimestamp: true diff --git a/kafka/kafka-topics-template.jinja b/jikkou/kafka-topics-template.jinja similarity index 62% rename from kafka/kafka-topics-template.jinja rename to jikkou/kafka-topics-template.jinja index 27f348e..1db6726 100644 --- a/kafka/kafka-topics-template.jinja +++ b/jikkou/kafka-topics-template.jinja @@ -2,7 +2,7 @@ {% macro create_topics(app) %} {# Stream Topics #} -{% for topicName in app.streamTopics %} +{% for topicName in app.streamTopics | default([]) %} --- apiVersion: "kafka.jikkou.io/v1beta2" kind: KafkaTopic @@ -20,7 +20,7 @@ spec: {% endfor %} {# Table Topics #} -{% for topicName in app.tableTopics %} +{% for topicName in app.tableTopics | default([]) %} --- apiVersion: "kafka.jikkou.io/v1beta2" kind: KafkaTopic @@ -37,8 +37,28 @@ spec: delete.retention.ms: {{ system.env.KAFKA_TOPIC_DELETE_RETENTION_MS | default(values.deleteRetentionMs) }} {% endfor %} +{# macro not needed at the moment but it allows for more custom topic creation #} +{% for topic in app.customTopics | default([]) %} +--- +apiVersion: "kafka.jikkou.io/v1beta2" +kind: KafkaTopic +metadata: + name: "{{ topic.topicName }}" + labels: + app: "{{ app.name }}" +spec: + partitions: {{ topic.partitions | default(values.partitions) }} + replicas: {{ system.env.KAFKA_TOPIC_REPLICAS | default(values.replicas) }} + configs: + cleanup.policy: {{ topic.cleanUpPolicy | default(delete) }} + min.insync.replicas: {{ system.env.KAFKA_TOPIC_MIN_INSYNC_REPLICAS | default(values.minInsyncReplicas) }} + delete.retention.ms: {{ system.env.KAFKA_TOPIC_DELETE_RETENTION_MS | default(values.deleteRetentionMs) }} +{% endfor %} + {% endmacro %} +{{ create_topics(values.apps.kafkaconnect) }} + {#------- Create topics for apps with env variable = true ----------#} {% if system.env.KAFKA_TOPIC_CREATE_ODE %} {{ create_topics(values.apps.ode) }} @@ -56,5 +76,6 @@ spec: {{ create_topics(values.apps.deduplicator) }} {% endif %} - - +{% if system.env.KAFKA_TOPIC_CREATE_MECDEPOSIT %} +{{ create_topics(values.apps.mecdeposit) }} +{% endif %} diff --git a/kafka/kafka-topics-values.yaml b/jikkou/kafka-topics-values.yaml similarity index 76% rename from kafka/kafka-topics-values.yaml rename to jikkou/kafka-topics-values.yaml index f60bdf1..3a4cc48 100644 --- a/kafka/kafka-topics-values.yaml +++ b/jikkou/kafka-topics-values.yaml @@ -59,6 +59,7 @@ apps: - topic.OdeTimJsonTMCFiltered - topic.OdeTimBroadcastJson - topic.J2735TimBroadcastJson + - topic.FilteredOdeTimJson - topic.OdeDriverAlertJson - topic.Asn1DecoderInput - topic.Asn1DecoderOutput @@ -81,7 +82,9 @@ apps: - topic.OdeRawEncodedPSMJson - topic.OdePsmTxPojo - topic.OdePsmJson + - topic.OdeTimRxJson tableTopics: {} + customTopics: {} geojsonconverter: name: jpo-geojsonconverter streamTopics: @@ -90,6 +93,7 @@ apps: - topic.ProcessedMapWKT - topic.ProcessedBsm tableTopics: {} + customTopics: {} conflictmonitor: name: jpo-conflictmonitor streamTopics: @@ -111,6 +115,16 @@ apps: - topic.CmBsmIntersection - topic.CmKafkaStateChangeEvents - topic.CmTimestampDeltaEvent + - topic.CmSpatMinimumDataEventAggregation + - topic.CmMapMinimumDataEventAggregation + - topic.CmIntersectionReferenceAlignmentEventAggregation + - topic.CmSignalGroupAlignmentEventAggregation + - topic.CmSignalStateConflictEventAggregation + - topic.CmSpatTimeChangeDetailsEventAggregation + - topic.CmEventStateProgressionEventAggregation + - topic.CmBsmMessageCountProgressionEventAggregation + - topic.CmMapMessageCountProgressionEventAggregation + - topic.CmSpatMessageCountProgressionEventAggregation tableTopics: - topic.CmLaneDirectionOfTravelNotification - topic.CmConnectionOfTravelNotification @@ -137,6 +151,13 @@ apps: - topic.CmSpatRevisionCounterEvents - topic.CmBsmRevisionCounterEvents - topic.CmTimestampDeltaNotification + - topic.CmIntersectionReferenceAlignmentNotificationAggregation + - topic.CmSignalGroupAlignmentNotificationAggregation + - topic.CmSignalStateConflictNotificationAggregation + - topic.CmSpatTimeChangeDetailsNotificationAggregation + - topic.CmEventStateProgressionNotificationAggregation + - topic.CmEventStateProgressionNotification + customTopics: {} deduplicator: name: jpo-deduplicator streamTopics: @@ -146,4 +167,25 @@ apps: - topic.DeduplicatedOdeTimJson - topic.DeduplicatedOdeRawEncodedTIMJson - topic.DeduplicatedOdeBsmJson - tableTopics: {} \ No newline at end of file + tableTopics: {} + customTopics: {} + kafkaconnect: + name: jpo-kafka-connect + streamTopics: {} + tableTopics: {} + customTopics: + - topicName: topic.KafkaConnectConfigs + partitions: 1 + cleanUpPolicy: compact + - topicName: topic.KafkaConnectOffsets + partitions: 20 + cleanUpPolicy: compact + - topicName: topic.KafkaConnectStatus + partitions: 10 + cleanUpPolicy: compact + mecdeposit: + name: jpo-mecdeposit + streamTopics: + - topic.MecDepositMetrics + tableTopics: {} + customTopics: {} diff --git a/jikkou/kafka_connector_init.sh b/jikkou/kafka_connector_init.sh new file mode 100755 index 0000000..c9e13eb --- /dev/null +++ b/jikkou/kafka_connector_init.sh @@ -0,0 +1,25 @@ +#!/bin/sh + +echo "CONNECT_CREATE_ODE=$CONNECT_CREATE_ODE" +echo "CONNECT_CREATE_GEOJSONCONVERTER=$CONNECT_CREATE_GEOJSONCONVERTER" +echo "CONNECT_CREATE_CONFLICTMONITOR=$CONNECT_CREATE_CONFLICTMONITOR" +echo "CONNECT_CREATE_DEDUPLICATOR=$CONNECT_CREATE_DEDUPLICATOR" + +# Set the maximum number of retries +MAX_RETRIES=5 +RETRY_COUNT=0 + +# Retry the health check until it is ready or the retry limit is reached +until ./jikkou health get kafkaconnect | yq -e '.status.name == "UP"' > /dev/null; do + echo "Waiting 10 sec for Kafka Connect to be ready (Attempt: $((RETRY_COUNT+1))/$MAX_RETRIES)" + RETRY_COUNT=$((RETRY_COUNT+1)) + sleep 10 +done + +./jikkou validate \ + --files kafka-connectors-template.jinja \ + --values-files kafka-connectors-values.yaml + +./jikkou apply \ + --files kafka-connectors-template.jinja \ + --values-files kafka-connectors-values.yaml \ No newline at end of file diff --git a/kafka/kafka_init.sh b/jikkou/kafka_init.sh old mode 100644 new mode 100755 similarity index 84% rename from kafka/kafka_init.sh rename to jikkou/kafka_init.sh index aa4669d..ed21935 --- a/kafka/kafka_init.sh +++ b/jikkou/kafka_init.sh @@ -12,8 +12,5 @@ echo "KAFKA_TOPIC_CREATE_DEDUPLICATOR=$KAFKA_TOPIC_CREATE_DEDUPLICATOR" # Create or update topics ./jikkou apply \ - --files kafka-topics-template.jinja \ - --values-files kafka-topics-values.yaml - - - + --files kafka-topics-template.jinja \ + --values-files kafka-topics-values.yaml \ No newline at end of file diff --git a/kafka-connect/Dockerfile b/kafka-connect/Dockerfile index 0be3886..0ba8d69 100644 --- a/kafka-connect/Dockerfile +++ b/kafka-connect/Dockerfile @@ -1,10 +1,6 @@ FROM confluentinc/cp-kafka-connect:7.7.0 -COPY connect_wait.sh /scripts/connect_wait.sh - # Docs: https://www.mongodb.com/docs/kafka-connector/current/ RUN confluent-hub install --no-prompt mongodb/kafka-connect-mongodb:1.13.0 # Docs: https://docs.confluent.io/platform/current/connect/transforms/overview.html -RUN confluent-hub install --no-prompt confluentinc/connect-transforms:1.4.7 - -CMD ["bash", "-c", "/scripts/connect_wait.sh"] \ No newline at end of file +RUN confluent-hub install --no-prompt confluentinc/connect-transforms:1.4.7 \ No newline at end of file diff --git a/kafka-connect/connect_start.sh b/kafka-connect/connect_start.sh deleted file mode 100644 index 2a487ec..0000000 --- a/kafka-connect/connect_start.sh +++ /dev/null @@ -1,105 +0,0 @@ -# bin/bash -echo "------------------------------------------" -echo "Kafka connector creation started." -echo "------------------------------------------" - - -declare -A OdeBsmJson=([name]="topic.OdeBsmJson" [collection]="OdeBsmJson" - [convert_timestamp]=false [timefield]="" [use_key]=false [key]="" [add_timestamp]=true) - -declare -A OdeMapJson=([name]="topic.OdeMapJson" [collection]="OdeMapJson" - [convert_timestamp]=false [timefield]="" [use_key]=false [key]="" [add_timestamp]=true) - -declare -A OdeSpatJson=([name]="topic.OdeSpatJson" [collection]="OdeSpatJson" - [convert_timestamp]=false [timefield]="" [use_key]=false [key]="" [add_timestamp]=true) - -declare -A OdeTimJson=([name]="topic.OdeTimJson" [collection]="OdeTimJson" - [convert_timestamp]=false [timefield]="" [use_key]=false [key]="" [add_timestamp]=true) - -declare -A OdePsmJson=([name]="topic.OdePsmJson" [collection]="OdePsmJson" - [convert_timestamp]=false [timefield]="" [use_key]=false [key]="" [add_timestamp]=true) - -function createSink() { - local -n topic=$1 - local name=${topic[name]} - local collection=${topic[collection]} - local timefield=${topic[timefield]} - local convert_timestamp=${topic[convert_timestamp]} - local use_key=${topic[use_key]} - local key=${topic[key]} - local add_timestamp=${topic[add_timestamp]} - - echo "Creating sink connector with parameters:" - echo "name=$name" - echo "collection=$collection" - echo "timefield=$timefield" - echo "convert_timestamp=$convert_timestamp" - echo "use_key=$use_key" - echo "key=$key" - echo "add_timestamp=$add_timestamp" - - local connectConfig=' { - "group.id":"connector-consumer", - "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector", - "tasks.max":3, - "topics":"'$name'", - "connection.uri":"'$MONGO_URI'", - "database":"'$MONGO_DB_NAME'", - "collection":"'$collection'", - "key.converter":"org.apache.kafka.connect.storage.StringConverter", - "key.converter.schemas.enable":false, - "value.converter":"org.apache.kafka.connect.json.JsonConverter", - "value.converter.schemas.enable":false, - "errors.tolerance": "all", - "mongo.errors.tolerance": "all", - "errors.deadletterqueue.topic.name": "", - "errors.log.enable": false, - "errors.log.include.messages": false, - "errors.deadletterqueue.topic.replication.factor": 0' - - - if [ "$convert_timestamp" == true ] - then - local connectConfig=''$connectConfig', - "transforms": "TimestampConverter", - "transforms.TimestampConverter.field": "'$timefield'", - "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", - "transforms.TimestampConverter.target.type": "Timestamp"' - fi - - if [ "$add_timestamp" == true ] - then - local connectConfig=''$connectConfig', - "transforms": "AddTimestamp,AddedTimestampConverter", - "transforms.AddTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value", - "transforms.AddTimestamp.timestamp.field": "recordGeneratedAt", - "transforms.AddedTimestampConverter.field": "recordGeneratedAt", - "transforms.AddedTimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", - "transforms.AddedTimestampConverter.target.type": "Timestamp"' - fi - - if [ "$use_key" == true ] - then - local connectConfig=''$connectConfig', - "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy", - "document.id.strategy.partial.value.projection.list": "'$key'", - "document.id.strategy.partial.value.projection.type": "AllowList", - "document.id.strategy.overwrite.existing": true' - fi - - local connectConfig=''$connectConfig' }' - - echo " Creating connector with Config : $connectConfig" - - curl -X PUT http://localhost:8083/connectors/MongoSink.${name}/config -H "Content-Type: application/json" -d "$connectConfig" -} - -createSink OdeBsmJson -createSink OdeMapJson -createSink OdeSpatJson -createSink OdeTimJson -createSink OdePsmJson - -echo "----------------------------------" -echo "ODE Kafka connector creation complete!" -echo "----------------------------------" \ No newline at end of file diff --git a/kafka-connect/connect_wait.sh b/kafka-connect/connect_wait.sh deleted file mode 100644 index 421faef..0000000 --- a/kafka-connect/connect_wait.sh +++ /dev/null @@ -1,20 +0,0 @@ -#!/bin/bash - -/etc/confluent/docker/run & -echo "Waiting for Kafka Connect to start listening on kafka-connect" -while [ $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) -eq 000 ] ; do - echo -e $(date) " Kafka Connect listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) " (waiting for 200)" - sleep 5 -done -sleep 10 -echo -e "\n--\n+> Creating Kafka Connect MongoDB sink" - -# Check if connect_start.sh exists -if [ ! -f /scripts/connect_start.sh ]; then - echo "Error: connect_start.sh does not exist, starting without any connectors." -else - echo "Connect_start.sh exists, starting with connectors." - bash /scripts/connect_start.sh -fi - -sleep infinity \ No newline at end of file diff --git a/mongo/create_indexes.js b/mongo/create_indexes.js index 4457a5b..db0090d 100644 --- a/mongo/create_indexes.js +++ b/mongo/create_indexes.js @@ -1,150 +1,259 @@ // Create indexes on all collections /* -This script is responsible for initializing the replica set, creating collections, adding indexes and TTLs +This is the second script responsible for configuring mongoDB automatically on startup. +This script is responsible for creating users, creating collections, adding indexes, and configuring TTLs +For more information see the header in a_init_replicas.js */ + +console.log(""); console.log("Running create_indexes.js"); -const ode_db = process.env.MONGO_DB_NAME; -const rw_user = process.env.MONGO_READ_WRITE_USER; -const rw_pass = process.env.MONGO_READ_WRITE_PASS; +Object.keys(process.env).forEach(key => { + console.log(`${key}: ${process.env[key]}`); +}); -const ttlInDays = process.env.MONGO_COLLECTION_TTL; // TTL in days -const expire_seconds = ttlInDays * 24 * 60 * 60; -const retry_milliseconds = 5000; -console.log("DB Name: " + ode_db); +// Setup Username and Password Definitions +const MONGO_ROOT_USERNAME = process.env['MONGO_ADMIN_DB_USER']; +const MONGO_ROOT_PASSWORD = process.env['MONGO_ADMIN_DB_PASS']; -try { - console.log("Initializing replica set..."); +const MONGO_READ_WRITE_USER=process.env['MONGO_READ_WRITE_USER']; +const MONGO_READ_WRITE_PASS=process.env['MONGO_READ_WRITE_PASS']; + +const MONGO_READ_USER = process.env['MONGO_READ_USER']; +const MONGO_READ_PASS = process.env['MONGO_READ_PASS']; + +// Prometheus Exporter User +const MONGO_EXPORTER_USERNAME = process.env['MONGO_EXPORTER_USERNAME']; +const MONGO_EXPORTER_PASSWORD = process.env['MONGO_EXPORTER_PASSWORD']; + +const MONGO_DB_NAME = process.env['MONGO_DB_NAME'] || "CV"; + +const expireSeconds = Number(process.env['MONGO_DATA_RETENTION_SECONDS']) || 5184000; // 2 months +const ttlExpireSeconds = Number(process.env['MONGO_ASN_RETENTION_SECONDS']) || 86400; // 24 hours +const retryMilliseconds = 10000; + +const CONNECT_CREATE_ODE = process.env['CONNECT_CREATE_ODE'] || true; +const CONNECT_CREATE_GEOJSONCONVERTER = process.env['CONNECT_CREATE_GEOJSONCONVERTER'] || true; +const CONNECT_CREATE_CONFLICTMONITOR = process.env['CONNECT_CREATE_CONFLICTMONITOR'] || true; +const CONNECT_CREATE_DEDUPLICATOR = process.env['CONNECT_CREATE_DEDUPLICATOR'] || true; + + +const users = [ + // {username: CM_MONGO_ROOT_USERNAME, password: CM_MONGO_ROOT_PASSWORD, roles: "root", database: "admin" }, + {username: MONGO_READ_WRITE_USER, password: MONGO_READ_WRITE_PASS, permissions: [{role: "readWrite", database: MONGO_DB_NAME}]}, + {username: MONGO_READ_USER, password: MONGO_READ_PASS, permissions: [{role: "read", database: MONGO_DB_NAME}]}, + {username: MONGO_EXPORTER_USERNAME, password: MONGO_EXPORTER_PASSWORD, permissions: [{role: "clusterMonitor", database: "admin"}, {role: "read", database: MONGO_DB_NAME}]} +]; + +console.log("\n\n\n\nMONGO_READ_WRITE_USER: " + MONGO_READ_WRITE_USER); +console.log("MONGO_READ_WRITE_PASS: " + MONGO_READ_WRITE_PASS + "\n\n\n\n"); - var config = { - "_id": "rs0", - "version": 1, - "members": [ - { - "_id": 0, - "host": "mongo:27017", - "priority": 2 - }, - ] - }; - rs.initiate(config, { force: true }); - rs.status(); -} catch(e) { - rs.status().ok -} // name -> collection name // ttlField -> field to perform ttl on // timeField -> field to index for time queries +// intersectionField -> field containing intersection id for id queries +// rsuIP -> field containing an rsuIP if available +// expireTime -> the number of seconds after the ttl field at which the record should be deleted +const odeCollections = [ + // ODE Json data + {name: "OdeDriverAlertJson", ttlField: "recordGeneratedAt", "timeField": "metadata.odeReceivedAt", intersectionField: null, rsuIP:"metadata.originIp", expireTime: expireSeconds}, + {name: "OdeBsmJson", ttlField: "recordGeneratedAt", "timeField": "metadata.odeReceivedAt", intersectionField: null, rsuIP:"metadata.originIp", expireTime: expireSeconds}, + {name: "OdeBsmJson", "timeField": "recordGeneratedAt", rsuIP:"metadata.originIp", expireTime: expireSeconds}, + {name: "OdeMapJson", ttlField: "recordGeneratedAt", "timeField": "metadata.odeReceivedAt", intersectionField: null, rsuIP:"metadata.originIp", expireTime: expireSeconds}, + {name: "OdeMapJson", "timeField": "recordGeneratedAt", rsuIP:"metadata.originIp", expireTime: expireSeconds}, + {name: "OdeSpatJson", ttlField: "recordGeneratedAt", "timeField": "metadata.odeReceivedAt", intersectionField: null, rsuIP:"metadata.originIp", expireTime: expireSeconds}, + {name: "OdeSpatRxJson", ttlField: "recordGeneratedAt", "timeField": "metadata.odeReceivedAt", intersectionField: null, rsuIP:"metadata.originIp", expireTime: expireSeconds}, + {name: "OdeSrmJson", ttlField: "recordGeneratedAt", "timeField": "metadata.odeReceivedAt", intersectionField: null, rsuIP:"metadata.originIp", expireTime: expireSeconds}, + {name: "OdeSrmJson", "timeField": "recordGeneratedAt", rsuIP:"metadata.originIp", expireTime: expireSeconds}, + {name: "OdeSsmJson", ttlField: "recordGeneratedAt", "timeField": "metadata.odeReceivedAt", intersectionField: null, rsuIP:"metadata.originIp", expireTime: expireSeconds}, + {name: "OdeSsmJson", "timeField": "recordGeneratedAt", rsuIP:"metadata.originIp", expireTime: expireSeconds}, + {name: "OdeTimJson", ttlField: "recordGeneratedAt", "timeField": "metadata.odeReceivedAt", intersectionField: null, rsuIP:"metadata.originIp", expireTime: expireSeconds}, + {name: "OdeTimJson", "timeField": "recordGeneratedAt", rsuIP:"metadata.originIp", expireTime: expireSeconds}, + {name: "OdeTimBroadcastJson", ttlField: "recordGeneratedAt", "timeField": "metadata.odeReceivedAt", intersectionField: null, rsuIP:"metadata.originIp", expireTime: expireSeconds}, + {name: "OdeTIMCertExpirationTimeJson", ttlField: "recordGeneratedAt", "timeField": "metadata.odeReceivedAt", intersectionField: null, rsuIP:"metadata.originIp", expireTime: expireSeconds}, -const collections = [ - {name: "OdeBsmJson", ttlField: "recordGeneratedAt", timeField: "metadata.odeReceivedAt"}, - {name: "OdeMapJson", ttlField: "recordGeneratedAt", timeField: "metadata.odeReceivedAt"}, - {name: "OdeSpatJson", ttlField: "recordGeneratedAt", timeField: "metadata.odeReceivedAt"}, - {name: "OdeTimJson", ttlField: "recordGeneratedAt", timeField: "metadata.odeReceivedAt"}, - {name: "OdePsmJson", ttlField: "recordGeneratedAt", timeField: "metadata.odeReceivedAt"}, + // Ode Raw ASN + {name: "OdeRawEncodedBSMJson", ttlField: "recordGeneratedAt", "timeField": "metadata.odeReceivedAt", intersectionField: null, rsuIP:"metadata.originIp", expireTime: ttlExpireSeconds}, + {name: "OdeRawEncodedMAPJson", ttlField: "recordGeneratedAt", "timeField": "metadata.odeReceivedAt", intersectionField: null, rsuIP:"metadata.originIp", expireTime: ttlExpireSeconds}, + {name: "OdeRawEncodedSPATJson", ttlField: "recordGeneratedAt", "timeField": "metadata.odeReceivedAt", intersectionField: null, rsuIP:"metadata.originIp", expireTime: ttlExpireSeconds}, + {name: "OdeRawEncodedSRMJson", ttlField: "recordGeneratedAt", "timeField": "metadata.odeReceivedAt", intersectionField: null, rsuIP:"metadata.originIp", expireTime: ttlExpireSeconds}, + {name: "OdeRawEncodedSSMJson", ttlField: "recordGeneratedAt", "timeField": "metadata.odeReceivedAt", intersectionField: null, rsuIP:"metadata.originIp", expireTime: ttlExpireSeconds}, + {name: "OdeRawEncodedTIMJson", ttlField: "recordGeneratedAt", "timeField": "metadata.odeReceivedAt", intersectionField: null, rsuIP:"metadata.originIp", expireTime: ttlExpireSeconds}, ]; -// Function to check if the replica set is ready -function isReplicaSetReady() { - let status; - try { - status = rs.status(); - } catch (error) { - console.error("Error getting replica set status: " + error); - return false; - } +// GeoJson Converter Data +const geoJsonConverterCollections = [ + {name: "ProcessedMap", ttlField: "recordGeneratedAt", timeField: "properties.timeStamp", intersectionField: "properties.intersectionId", expireTime: expireSeconds}, + {name: "ProcessedSpat", ttlField: "recordGeneratedAt", timeField: "utcTimeStamp", intersectionField: "intersectionId", expireTime: expireSeconds}, + {name: "ProcessedBsm", ttlField: "recordGeneratedAt", timeField: "timeStamp", geoSpatialField: "features.geometry.coordinates", expireTime: expireSeconds}, +]; - // Check if the replica set has a primary - if (!status.hasOwnProperty('myState') || status.myState !== 1) { - console.log("Replica set is not ready yet"); - return false; - } - console.log("Replica set is ready"); - return true; +const conflictMonitorCollections = [ + // Conflict Monitor Events + { name: "CmStopLineStopEvent", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + { name: "CmStopLinePassageEvent", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + { name: "CmIntersectionReferenceAlignmentEvents", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + { name: "CmSignalGroupAlignmentEvents", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + { name: "CmConnectionOfTravelEvent", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + { name: "CmSignalStateConflictEvents", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + { name: "CmLaneDirectionOfTravelEvent", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + { name: "CmSpatTimeChangeDetailsEvent", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + { name: "CmSpatMinimumDataEvents", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + { name: "CmMapBroadcastRateEvents", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + { name: "CmMapMinimumDataEvents", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + { name: "CmSpatBroadcastRateEvents", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + { name: "CmBsmEvents", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + + + { name: "CmSpatMessageCountProgressionEvents", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + { name: "CmMapMessageCountProgressionEvents", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + { name: "CmBsmMessageCountProgressionEvents", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + + { name: "CmSpatMinimumDataAggregation", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + { name: "CmMapMinimumDataAggregation", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + { name: "CmIntersectionReferenceAlignmentAggregation", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + { name: "CmSignalGroupAlignmentAggregation", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + { name: "CmSignalStateConflictAggregation", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + { name: "CmTimeChangeDetailsAggregation", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + { name: "CmEventStateProgressionAggregation", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + { name: "CmBsmMessageCountProgressionAggregation", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + { name: "CmMapMessageCountProgressionAggregation", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + { name: "CmSpatMessageCountProgressionAggregation", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + + + // Conflict Monitor Assessments + { name: "CmLaneDirectionOfTravelAssessment", ttlField: "assessmentGeneratedAt", timeField: "assessmentGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + { name: "CmConnectionOfTravelAssessment", ttlField: "assessmentGeneratedAt", timeField: "assessmentGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + { name: "CmSignalStateEventAssessment", ttlField: "assessmentGeneratedAt", timeField: "assessmentGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + { name: "CmStopLineStopAssessment", ttlField: "assessmentGeneratedAt", timeField: "assessmentGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + + // Conflict Monitor Notifications + { name: "CmSpatTimeChangeDetailsNotification", ttlField: "notificationGeneratedAt", timeField: "notificationGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + { name: "CmLaneDirectionOfTravelNotification", ttlField: "notificationGeneratedAt", timeField: "notificationGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + { name: "CmConnectionOfTravelNotification", ttlField: "notificationGeneratedAt", timeField: "notificationGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + { name: "CmAppHealthNotifications", ttlField: "notificationGeneratedAt", timeField: "notificationGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + { name: "CmSignalStateConflictNotification", ttlField: "notificationGeneratedAt", timeField: "notificationGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + { name: "CmSignalGroupAlignmentNotification", ttlField: "notificationGeneratedAt", timeField: "notificationGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + { name: "CmStopLinePassageNotification", ttlField: "notificationGeneratedAt", timeField: "notificationGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + { name: "CmStopLineStopNotification", ttlField: "notificationGeneratedAt", timeField: "notificationGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + { name: "CmNotification", ttlField: "notificationGeneratedAt", timeField: "notificationGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + + { name: "CmEventStateProgressionNotificationAggregation", ttlField: "notificationGeneratedAt", timeField: "notificationGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + { name: "CmIntersectionReferenceAlignmentNotificationAggregation", ttlField: "notificationGeneratedAt", timeField: "notificationGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + { name: "CmSignalGroupAlignmentNotificationAggregation", ttlField: "notificationGeneratedAt", timeField: "notificationGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + { name: "CmSignalStateConflictNotificationAggregation", ttlField: "notificationGeneratedAt", timeField: "notificationGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + { name: "CmTimeChangeDetailsNotificationAggregation", ttlField: "notificationGeneratedAt", timeField: "notificationGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + + // Reports + { name: "CmReport", timeField: "reportGeneratedAt", intersectionField: "intersectionID"}, + +]; + +let collections = []; + +if(CONNECT_CREATE_ODE){ + collections = collections.concat(odeCollections); } -try{ +if(CONNECT_CREATE_GEOJSONCONVERTER){ + collections = collections.concat(geoJsonConverterCollections); +} - // Wait for the replica set to be ready - while (!isReplicaSetReady()) { - sleep(retry_milliseconds); +if(CONNECT_CREATE_CONFLICTMONITOR){ + collections = collections.concat(conflictMonitorCollections); +} + + +try{ + + db.getMongo().setReadPref("primaryPreferred"); + db = db.getSiblingDB("admin"); + db.runCommand({autoCompact: true}); + + // Create Users in Database + for(user of users){ + createUser(user); } - sleep(retry_milliseconds); - // creates another user - console.log("Creating Read Write user..."); - admin = db.getSiblingDB("admin"); - // Check if user already exists - var user = admin.getUser(rw_user); - if (user == null) { - admin.createUser( - { - user: rw_user, - pwd: rw_pass, - roles: [ - { role: "readWrite", db: ode_db }, - ] - } - ); + + db = db.getSiblingDB(MONGO_DB_NAME); + db.getMongo().setReadPref("primaryPreferred"); + var isMaster = db.isMaster(); + if (isMaster.primary) { + console.log("Connected to the primary replica set member."); } else { - console.log("User \"" + rw_user + "\" already exists."); + console.log("Not connected to the primary replica set member. Current node: " + isMaster.host); } - -} catch (error) { - print("Error connecting to the MongoDB instance: " + error); +} +catch(err){ + console.log("Could not switch DB to Sibling DB"); + console.log(err); } + // Wait for the collections to exist in mongo before trying to create indexes on them let missing_collection_count; -const db = db.getSiblingDB(ode_db); do { try { missing_collection_count = 0; - const collection_names = db.getCollectionNames(); + const collectionNames = db.getCollectionNames(); for (collection of collections) { - console.log("Creating Indexes for Collection" + collection["name"]); - // Create Collection if It doesn't exist + // Create Collection if it doesn't exist let created = false; - if(!collection_names.includes(collection.name)){ + if(!collectionNames.includes(collection['name'])){ created = createCollection(collection); - // created = true; }else{ created = true; } if(created){ - if (collection.hasOwnProperty('ttlField') && collection.ttlField !== 'none') { - createTTLIndex(collection); - } - - + createTTLIndex(collection); + createTimeIntersectionIndex(collection); + createTimeRsuIpIndex(collection); + createGeoSpatialIndex(collection); }else{ missing_collection_count++; - console.log("Collection " + collection.name + " does not exist yet"); + console.log("Collection " + collection['name'] + " does not exist yet"); } } if (missing_collection_count > 0) { - print("Waiting on " + missing_collection_count + " collections to be created...will try again in " + retry_milliseconds + " ms"); - sleep(retry_milliseconds); + console.log("Waiting on " + missing_collection_count + " collections to be created...will try again in " + retryMilliseconds + " ms"); + sleep(retryMilliseconds); } } catch (err) { console.log("Error while setting up TTL indexes in collections"); console.log(rs.status()); console.error(err); - sleep(retry_milliseconds); + sleep(retryMilliseconds); } } while (missing_collection_count > 0); console.log("Finished Creating All TTL indexes"); +function createUser(user){ + try{ + console.log("Creating User: " + user['username'] + " with Permissions: " + JSON.stringify(user['permissions'])); + db.createUser( + { + user: user['username'], + pwd: user['password'], + roles: user['permissions'].map(permission => ({ + role: permission['role'], + db: permission['database'] + })) + }); + } catch (error) { + console.error("Error creating user: ", error); + } +} function createCollection(collection){ try { - db.createCollection(collection.name); + db.createCollection(collection['name']); return true; } catch (err) { console.log("Unable to Create Collection: " + collection.name); @@ -155,43 +264,181 @@ function createCollection(collection){ // Create TTL Indexes function createTTLIndex(collection) { - if (ttlIndexExists(collection)) { - console.log("TTL index already exists for " + collection.name); + if(collection.hasOwnProperty("ttlField") && collection['ttlField'] != null){ + const ttlField = collection['ttlField']; + const collectionName = collection['name']; + const duration = collection['expireTime']; + + let indexJson = {}; + indexJson[ttlField] = 1; + + + console.log(collectionName, duration, ttlField); + try{ + if (ttlIndexExists(collection)) { + db.runCommand({ + "collMod": collectionName, + "index": { + keyPattern: indexJson, + expireAfterSeconds: duration + } + }); + console.log("Updated TTL index for " + collectionName + " using the field: " + ttlField + " as the timestamp"); + }else{ + db.getCollection(collectionName).createIndex(indexJson, + {expireAfterSeconds: duration} + ); + console.log("Created TTL index for " + collectionName + " using the field: " + ttlField + " as the timestamp"); + } + } catch(err){ + console.log("Failed to Create or Update index for " + collectionName + " using the field: " + ttlField + " as the timestamp"); + console.log(err); + } + } +} + +function createTimeIndex(collection){ + if(timeIndexExists(collection)){ + // Skip if Index already Exists return; } - const collection_name = collection.name; - const timeField = collection.ttlField; + if(collection.hasOwnProperty("timeField") && collection['timeField'] != null){ + const collectionName = collection['name']; + const timeField = collection['timeField']; + console.log("Creating Time Index for " + collectionName); - console.log( - "Creating TTL index for " + collection_name + " to remove documents after " + - expire_seconds + - " seconds" - ); + var indexJson = {}; + indexJson[timeField] = -1; - try { - var index_json = {}; - index_json[timeField] = 1; - db[collection_name].createIndex(index_json, - {expireAfterSeconds: expire_seconds} - ); - console.log("Created TTL index for " + collection_name + " using the field: " + timeField + " as the timestamp"); - } catch (err) { - var pattern_json = {}; - pattern_json[timeField] = 1; - db.runCommand({ - "collMod": collection_name, - "index": { - keyPattern: pattern_json, - expireAfterSeconds: expire_seconds - } - }); - console.log("Updated TTL index for " + collection_name + " using the field: " + timeField + " as the timestamp"); + try { + db[collectionName].createIndex(indexJson); + console.log("Created Time Intersection index for " + collectionName + " using the field: " + timeField + " as the timestamp"); + } catch (err) { + db.runCommand({ + "collMod": collectionName, + "index": { + keyPattern: indexJson + } + }); + console.log("Updated Time index for " + collectionName + " using the field: " + timeField + " as the timestamp"); + } + } +} + +function createTimeRsuIpIndex(){ + if(timeRsuIpIndexExists(collection)){ + // Skip if Index already Exists + return; + } + + if(collection.hasOwnProperty("timeField") && collection.timeField != null && collection.hasOwnProperty("rsuIP") && collection.rsuIP != null){ + const collectionName = collection['name']; + const timeField = collection['timeField']; + const rsuIP = collection['rsuIP']; + console.log("Creating Time rsuIP Index for " + collectionName); + + var indexJson = {}; + indexJson[rsuIP] = -1; + indexJson[timeField] = -1; + + + try { + db[collectionName].createIndex(indexJson); + console.log("Created Time rsuIP Intersection index for " + collectionName + " using the field: " + timeField + " as the timestamp and : " + rsuIP+" as the rsuIP"); + } catch (err) { + db.runCommand({ + "collMod": collectionName, + "index": { + keyPattern: indexJson + } + }); + console.log("Updated Time rsuIP index for " + collectionName + " using the field: " + timeField + " as the timestamp and : " + rsuIP+" as the rsuIP"); + } + } +} + + +function createTimeIntersectionIndex(collection){ + if(timeIntersectionIndexExists(collection)){ + // Skip if Index already Exists + return; } + if(collection.hasOwnProperty("timeField") && collection.timeField != null && collection.hasOwnProperty("intersectionField") && collection.intersectionField != null){ + const collectionName = collection['name']; + const timeField = collection['timeField']; + const intersectionField = collection['intersectionField']; + console.log("Creating time intersection index for " + collectionName); + + var indexJson = {}; + indexJson[intersectionField] = -1; + indexJson[timeField] = -1; + + + try { + db[collectionName].createIndex(indexJson); + console.log("Created time intersection index for " + collectionName + " using the field: " + timeField + " as the timestamp and : " + intersectionField + " as the rsuIP"); + } catch (err) { + db.runCommand({ + "collMod": collectionName, + "index": { + keyPattern: indexJson + } + }); + console.log("Updated time intersection index for " + collectionName + " using the field: " + timeField + " as the timestamp and : " + intersectionField + " as the rsuIP"); + } + } } +function createGeoSpatialIndex(collection){ + if(geoSpatialIndexExists(collection)){ + return; + } + + if(collection.hasOwnProperty("timeField") && collection['timeField'] != null && collection.hasOwnProperty("geoSpatialField") && collection['geoSpatialField'] != null){ + const collectionName = collection['name']; + const timeField = collection['timeField']; + const geoSpatialField = collection['geoSpatialField']; + console.log("Creating GeoSpatial index for " + collectionName); + + var indexJson = {}; + indexJson[geoSpatialField] = "2dsphere"; + indexJson[timeField] = -1; + + + try { + db[collectionName].createIndex(indexJson); + console.log("Created time geospatial index for " + collectionName + " using the field: " + timeField + " as the timestamp and : " + geoSpatialField + " as the GeoSpatial Field"); + } catch (err) { + db.runCommand({ + "collMod": collectionName, + "index": { + keyPattern: indexJson + } + }); + console.log("Updated time geospatial index for " + collectionName + " using the field: " + timeField + " as the timestamp and : " + geoSpatialField + " as the GeoSpatial Field"); + } + } + +} function ttlIndexExists(collection) { - return db[collection.name].getIndexes().find((idx) => idx.hasOwnProperty('expireAfterSeconds')) !== undefined; -} \ No newline at end of file + return db[collection['name']].getIndexes().find((idx) => idx.hasOwnProperty("expireAfterSeconds")) !== undefined; +} + +function timeIntersectionIndexExists(collection){ + return db[collection['name']].getIndexes().find((idx) => idx.name == collection['intersectionField'] + "_-1_" + collection['timeField'] + "_-1") !== undefined; +} + +function timeRsuIpIndexExists(collection){ + return db[collection['name']].getIndexes().find((idx) => idx.name == collection['rsuIP'] + "_-1_" + collection['timeField'] + "_-1") !== undefined; +} + +function timeIndexExists(collection){ + return db[collection['name']].getIndexes().find((idx) => idx.name == collection['timeField'] + "_-1") !== undefined; +} + +function geoSpatialIndexExists(collection){ + return db[collection['name']].getIndexes().find((idx) => idx.name == collection['geoSpatialField'] + "_2dsphere_timeStamp_-1") !== undefined; +} diff --git a/mongo/init_replicas.js b/mongo/init_replicas.js new file mode 100644 index 0000000..9578377 --- /dev/null +++ b/mongo/init_replicas.js @@ -0,0 +1,33 @@ + +/* + +This script is the first of two scripts responsible for setting up mongoDB on initial boot. +These scripts should be copied to the /docker-entrypoint-initdb.d/ directory within the docker image; +the docker image will then execute these scripts automatically when the database is first created. +This script and is partner are prefixed with the letters a and b respectively to ensure they are run +in the proper order when copied into the mongoDB docker image. + +Since the conflict monitor uses a replica set in its mongoDB configuration. Initializing the replica set +and configuring the collections are separated from one another. The purpose of this to force a reconnect +to the database. This in turn allows the new connection to be connected to the primary replica which is +required for creating indexes on all the collections. This script is only responsible for creating the +replica set. Almost all other configuration should go in b_create_indexes.js + +Documentation on how the mongoDB docker image runs startup scripts can be found here +https://hub.docker.com/_/mongo/ + +*/ + + + +console.log("Initializing Replicas"); +try{ + db_status = rs.status(); +} catch(err){ + console.log("Initializing New DB"); + try{ + rs.initiate({ _id: 'rs0', members: [{ _id: 0, host: 'localhost:27017' }] }).ok + }catch(err){ + console.log("Unable to Initialize DB"); + } +} diff --git a/mongo/manage-volume-cron b/mongo/manage-volume-cron new file mode 100644 index 0000000..57ca0fc --- /dev/null +++ b/mongo/manage-volume-cron @@ -0,0 +1,12 @@ +MONGO_DATABASE_NAME=${MONGO_DB_NAME} +MONGO_DATABASE_STORAGE_COLLECTION_NAME=${MONGO_DATABASE_STORAGE_COLLECTION_NAME} +MONGO_DATABASE_SIZE_GB=${MONGO_DATABASE_SIZE_GB} +MONGO_DATABASE_SIZE_TARGET_PERCENT=${MONGO_DATABASE_SIZE_TARGET_PERCENT} +MONGO_DATABASE_DELETE_THRESHOLD_PERCENT=${MONGO_DATABASE_DELETE_THRESHOLD_PERCENT} +MONGO_DATABASE_MAX_TTL_RETENTION_SECONDS=${MONGO_DATABASE_MAX_TTL_RETENTION_SECONDS} +MONGO_DATABASE_MIN_TTL_RETENTION_SECONDS=${MONGO_DATABASE_MIN_TTL_RETENTION_SECONDS} +MONGO_INITDB_ROOT_USERNAME=${MONGO_INITDB_ROOT_USERNAME} +MONGO_INITDB_ROOT_PASSWORD=${MONGO_INITDB_ROOT_PASSWORD} + +* * * * * root mongosh /data/manage_volume.js > /var/log/cron.log 2>&1 +# An empty line is required at the end of this file for a valid cron file. \ No newline at end of file diff --git a/mongo/manage_volume.js b/mongo/manage_volume.js new file mode 100644 index 0000000..899caa8 --- /dev/null +++ b/mongo/manage_volume.js @@ -0,0 +1,250 @@ + +// Mongo Data Managment Script + +// Features +// Automatically Logs Collection Sizes +// Automatically Updates Data Retention Periods to prevent overflow +// Performs Emergency Record Deletion when Collections get to large + +// Database to Perform operation on. +const MONGO_DATABASE_NAME = process.env.MONGO_DATABASE_NAME || "CV"; + +// The name of the collection to store the data as. +const MONGO_DATABASE_STORAGE_COLLECTION_NAME = process.env.MONGO_DATABASE_STORAGE_COLLECTION_NAME || "MongoStorage"; + +// Total Size of the database disk in GB. This script will work to ensure all data fits within this store. +const MONGO_DATABASE_SIZE_GB = process.env.MONGO_DATABASE_SIZE_GB || 1000; + +// Specified as a percent of total database size. This is the storage target the database will try to hit. +const MONGO_DATABASE_SIZE_TARGET_PERCENT = process.env.MONGO_DATABASE_SIZE_TARGET_PERCENT || 0.8; + +// Specified as a percent of total database size. +const MONGO_DATABASE_DELETE_THRESHOLD_PERCENT = process.env.MONGO_DATABASE_DELETE_THRESHOLD_PERCENT || 0.9; + +// The maximum amount of time data should be retained. Measured in Seconds. +const MONGO_DATABASE_MAX_TTL_RETENTION_SECONDS = process.env.MONGO_DATABASE_MAX_TTL_RETENTION_SECONDS || 5184000; // 60 Days + +// The minimum amount of time data should be retained. Measured in Seconds. This only effects TTL's set on the data. It will not prevent the database from manual data deletion. +const MONGO_DATABASE_MIN_TTL_RETENTION_SECONDS = process.env.MONGO_DATABASE_MIN_TTL_RETENTION_SECONDS || 604800; // 7 Days + + +const MONGO_ROOT_USERNAME = process.env.MONGO_INITDB_ROOT_USERNAME || "root"; +const MONGO_ROOT_PASSWORD = process.env.MONGO_INITDB_ROOT_PASSWORD || "root"; + +const MONGO_ENABLE_STORAGE_RECORD = process.env.MONGO_ENABLE_STORAGE_RECORD || true; +const MONGO_ENABLE_DYNAMIC_TTL = (process.env.MONGO_ENABLE_DYNAMIC_TTL || true) && MONGO_ENABLE_STORAGE_RECORD; //Storage record must be enabled for Dynamic TTL + + +const MS_PER_HOUR = 60 * 60 * 1000; +const BYTE_TO_GB = 1024 * 1024 * 1024; +const DB_TARGET_SIZE_BYTES = MONGO_DATABASE_SIZE_GB * MONGO_DATABASE_SIZE_TARGET_PERCENT * BYTE_TO_GB; +const DB_DELETE_SIZE_BYETS = MONGO_DATABASE_SIZE_GB * MONGO_DATABASE_DELETE_THRESHOLD_PERCENT * BYTE_TO_GB; + + + + +print("Managing Mongo Data Volumes"); + +db = db.getSiblingDB("admin"); +db.auth(MONGO_ROOT_USERNAME, MONGO_ROOT_PASSWORD); +db = db.getSiblingDB(MONGO_DATABASE_NAME); + +class CollectionStats{ + constructor(name, allocatedSpace, freeSpace, indexSpace){ + this.name = name; + this.allocatedSpace = allocatedSpace; + this.freeSpace = freeSpace; + this.indexSize = indexSpace; + } +} + + +class StorageRecord{ + constructor(collectionStats, totalAllocatedStorage, totalFreeSpace, totalIndexSize){ + this.collectionStats = collectionStats; + this.recordGeneratedAt = ISODate(); + this.totalAllocatedStorage = totalAllocatedStorage; + this.totalFreeSpace = totalFreeSpace; + this.totalIndexSize = totalIndexSize; + this.totalSize = totalAllocatedStorage + totalFreeSpace + totalIndexSize; + } +} + +function ema_deltas(records){ + const a = 0.5; + let average_delta = 0; + + for(let i=0; i< records.length-1; i++){ + const delta = records[i+1] - records[i]; + average_delta += Math.pow(a, records.length -i -1) * delta; + } + + return average_delta; + +} + +function updateTTL(){ + + print("Updating TTL") + const ttl = getLatestTTL(); + if(ttl == 0){ + print("Skipping TTL Update") + // Do not update TTL's + return; + } + + + const newestRecords = db.getCollection(MONGO_DATABASE_STORAGE_COLLECTION_NAME).find().sort({"recordGeneratedAt":-1}).limit(10); + + let sizes = []; + newestRecords.forEach(doc => { + let total = 0; + for(let i=0; i < doc.collectionStats.length; i++){ + total += doc.collectionStats[i].allocatedSpace + doc.collectionStats[i].freeSpace + doc.collectionStats[i].indexSize; + } + + sizes.push(total); + }); + + + // Overshoot Prevention + const growth = ema_deltas(sizes); + const oldestSpat = db.getCollection("ProcessedSpat").find().sort({"recordGeneratedAt":1}).limit(1); + + let new_ttl = ttl; + let possible_ttl = ttl; + + // Check if collection is still growing to capacity, or if it in steady state + if(oldestSpat.recordGeneratedAt > ISODate() - ttl + MS_PER_HOUR && growth > 0){ + possible_ttl = DB_TARGET_SIZE_BYTES / growth; + }else{ + possible_ttl = 3600 * ((DB_TARGET_SIZE_BYTES - sizes[0])/BYTE_TO_GB) + ttl; // Shift the TTL by roughly 1 hour for every GB of data over or under + } + + // Clamp TTL and assign to new TTL; + + if(!isNaN(possible_ttl) && possible_ttl != 0){ + if(possible_ttl > MONGO_DATABASE_MAX_TTL_RETENTION_SECONDS){ + new_ttl = MONGO_DATABASE_MAX_TTL_RETENTION_SECONDS; + }else if(possible_ttl < MONGO_DATABASE_MIN_TTL_RETENTION_SECONDS){ + new_ttl = MONGO_DATABASE_MIN_TTL_RETENTION_SECONDS; + }else{ + new_ttl = Math.round(possible_ttl); + } + new_ttl = Number(new_ttl); + print("Calculated New TTL for MongoDB: " + new_ttl); + applyNewTTL(new_ttl); + }else{ + print("Not Updating TTL New TTL is NaN"); + } +} + +function getLatestTTL(){ + const indexes = db.getCollection("ProcessedSpat").getIndexes(); + for (let i=0; i < indexes.length; i++){ + if(indexes[i].hasOwnProperty("expireAfterSeconds")){ + return indexes[i]["expireAfterSeconds"]; + } + } + return 0; +} + +function getTTLKey(collection){ + const indexes = db.getCollection(collection).getIndexes(); + for (let i=0; i < indexes.length; i++){ + if(indexes[i].hasOwnProperty("expireAfterSeconds")){ + return [indexes[i]["key"], indexes[i]["expireAfterSeconds"]]; + } + } + return [null, null]; +} + +function applyNewTTL(ttl){ + var collections = db.getCollectionNames(); + for(let i=0; i< collections.length; i++){ + const collection = collections[i]; + let [key, oldTTL] = getTTLKey(collection); + if(oldTTL != ttl && key != null){ + print("Updating TTL For Collection: " + collection, ttl); + db.runCommand({ + "collMod": collection, + "index": { + keyPattern: key, + expireAfterSeconds: ttl + }}); + } + } +} + + +function addNewStorageRecord(){ + var collections = db.getCollectionNames(); + let totalAllocatedStorage = 0; + let totalFreeSpace = 0; + let totalIndexSize = 0; + + let records = []; + + for (var i = 0; i < collections.length; i++) { + let stats = db.getCollection(collections[i]).stats(); + let colStats = db.runCommand({"collstats": collections[i]}); + let blockManager = colStats["wiredTiger"]["block-manager"]; + + let freeSpace = Number(blockManager["file bytes available for reuse"]); + let allocatedStorage = Number(blockManager["file size in bytes"]); + let indexSize = Number(stats.totalIndexSize); + + records.push(new CollectionStats(collections[i], allocatedStorage, freeSpace, indexSize)); + + totalAllocatedStorage += allocatedStorage + totalFreeSpace += freeSpace; + totalIndexSize += indexSize; + + print(collections[i], allocatedStorage / BYTE_TO_GB, freeSpace/ BYTE_TO_GB, indexSize / BYTE_TO_GB); + } + + const storageRecord = new StorageRecord(records, totalAllocatedStorage, totalFreeSpace, totalIndexSize); + db.getCollection(MONGO_DATABASE_STORAGE_COLLECTION_NAME).insertOne(storageRecord); +} + +function compactCollections(){ + print("Checking Collection Compaction"); + + var collections = db.getCollectionNames(); + + let activeCompactions = []; + db.currentOp({ "active": true, "secs_running": { "$gt": 0 } }).inprog.forEach(op => { + if (op.msg && op.msg.includes("compact")) { + print("Found Active Compactions"); + activeCompactions.push(op.command.compact); + } + }); + + for (var i = 0; i < collections.length; i++) { + let colStats = db.runCommand({"collstats": collections[i]}); + let blockManager = colStats["wiredTiger"]["block-manager"]; + + let freeSpace = Number(blockManager["file bytes available for reuse"]); + let allocatedStorage = Number(blockManager["file size in bytes"]); + + // If free space makes up a significant proportion of allocated storage + if(freeSpace > allocatedStorage * 0.5 && allocatedStorage > (1 * BYTE_TO_GB)){ + if(!activeCompactions.includes(collections[i])){ + print("Compacting Collection", collections[i]); + db.runCommand({compact: collections[i], force:true}); + }else{ + print("Skipping Compaction, Collection Compaction is already scheduled"); + } + } + } +} + +if(MONGO_ENABLE_STORAGE_RECORD){ + addNewStorageRecord(); +} + +if(MONGO_ENABLE_DYNAMIC_TTL){ + updateTTL(); +} + + diff --git a/mongo/setup_mongo.sh b/mongo/setup_mongo.sh old mode 100644 new mode 100755 index 13cb4e7..c6f0a77 --- a/mongo/setup_mongo.sh +++ b/mongo/setup_mongo.sh @@ -8,4 +8,5 @@ echo "MongoDB is up and running!" cd / -mongosh -u $MONGO_ADMIN_DB_USER -p $MONGO_ADMIN_DB_PASS --authenticationDatabase admin --host mongo:27017 /create_indexes.js +mongosh -u $MONGO_ADMIN_DB_USER -p $MONGO_ADMIN_DB_PASS --authenticationDatabase admin --host mongo:27017 /init_replicas.js +mongosh -u $MONGO_ADMIN_DB_USER -p $MONGO_ADMIN_DB_PASS --authenticationDatabase admin --host mongo:27017 /create_indexes.js diff --git a/sample.env b/sample.env index 3f471bd..2237c0c 100644 --- a/sample.env +++ b/sample.env @@ -3,6 +3,7 @@ # Hint: look for "inet addr:" within "eth0" or "en0" for OSX DOCKER_HOST_IP="" + # Docker compose restart policy: https://docs.docker.com/engine/containers/start-containers-automatically/ RESTART_POLICY="on-failure:3" @@ -20,12 +21,18 @@ RESTART_POLICY="on-failure:3" # - kafka # - mongo # - kafka_connect +# - kafka_connect_setup # - kafka_connect # - kafka_connect +# - kafka_connect_setup # EXAMPLE: COMPOSE_PROFILES=kafka_connect_standalone,kafka_ui,mongo_express COMPOSE_PROFILES=all + + ### COMMON variables - END ### + + ### KAFKA variables - START ### KAFKA_BOOTSTRAP_SERVERS=${DOCKER_HOST_IP}:9092 KAFKA_LOG_RETENTION_HOURS=3 @@ -40,37 +47,78 @@ KAFKA_TOPIC_DELETE_RETENTION_MS=3600000 KAFKA_TOPIC_CREATE_ODE=true # Create topics for ODE KAFKA_TOPIC_CREATE_GEOJSONCONVERTER=true # Create topics for GeoJSON Converter KAFKA_TOPIC_CREATE_CONFLICTMONITOR=true # Create topics for Conflict Monitor -KAFKA_TOPIC_CREATE_DEDUPLICATOR=true # Create topics for Deduplicator +KAFKA_TOPIC_CREATE_DEDUPLICATOR=false # Create topics for Deduplicator +KAFKA_TOPIC_CREATE_MECDEPOSIT=false # Create topics for MecDeposit +# Relative path to the Kafka topic yaml configuration script, upper level directories are supported +# NOTE: This script is used to create kafka topics +KAFKA_TOPIC_CONFIG_RELATIVE_PATH="./jikkou/kafka-topics-values.yaml" ### KAFKA variables - END ### ### MONGODB variables - START ### # NOTE: Must set a password for the container to start up properly MONGO_IP=${DOCKER_HOST_IP} -MONGO_DB_NAME=ode +MONGO_DB_NAME=CV + +# Generate a random string for the MongoDB keyfile using the following command: +# $ openssl rand -base64 32 +MONGO_DB_KEYFILE_STRING=replacethisstring MONGO_ADMIN_DB_USER=admin -MONGO_ADMIN_DB_PASS= +MONGO_ADMIN_DB_PASS=replace_me MONGO_READ_WRITE_USER=ode -MONGO_READ_WRITE_PASS= +MONGO_READ_WRITE_PASS=replace_me -MONGO_PORT=27017 -MONGO_URI=mongodb://${MONGO_READ_WRITE_USER}:${MONGO_READ_WRITE_PASS}@${MONGO_IP}:${MONGO_PORT}/?directConnection=true -MONGO_COLLECTION_TTL=7 # days +MONGO_READ_USER=user +MONGO_READ_PASS=replace_me + +MONGO_EXPORTER_USERNAME=export +MONGO_EXPORTER_PASSWORD=replace_me MONGO_EXPRESS_USER=${MONGO_ADMIN_DB_USER} MONGO_EXPRESS_PASS=${MONGO_ADMIN_DB_PASS} +MONGO_PORT=27017 +MONGO_DATA_RETENTION_SECONDS=5184000 +MONGO_ASN_RETENTION_SECONDS=86400 + + +MONGO_DATABASE_STORAGE_COLLECTION_NAME=MongoStorage +MONGO_DATABASE_SIZE_GB=1000 +MONGO_DATABASE_SIZE_TARGET_PERCENT=0.8 +MONGO_DATABASE_DELETE_THRESHOLD_PERCENT=0.9 +MONGO_DATABASE_MAX_TTL_RETENTION_SECONDS=5184000 +MONGO_DATABASE_MIN_TTL_RETENTION_SECONDS=604800 +MONGO_DATABASE_COMPACTION_TRIGGER_PERCENT=MONGO_DATABASE_COMPACTION_TRIGGER_PERCENT +MONGO_ENABLE_STORAGE_RECORD=true +MONGO_ENABLE_DYNAMIC_TTL=true + + + # Relative path to the MongoDB init script, upper level directories are supported MONGO_SETUP_SCRIPT_RELATIVE_PATH="./mongo/setup_mongo.sh" +MONGO_INIT_REPLICAS_SCRIPT_RELATIVE_PATH="./mongo/init_replicas.js" MONGO_CREATE_INDEXES_SCRIPT_RELATIVE_PATH="./mongo/create_indexes.js" +MONGO_MANAGE_VOLUMES_SCRIPT_RELATIVE_PATH="./mongo/manage_volume.js" + + + ### MONGODB variables - END ### ### Kafka connect variables - START ### # NOTE: Required variables: [MONGODB, KAFKA] +CONNECT_URL=http://${DOCKER_HOST_IP}:8083 # Kafka connect log level CONNECT_LOG_LEVEL=ERROR -# Relative path to the Kafka init script, upper level directories are supported + +CONNECT_TASKS_MAX=1 # Number of concurrent tasks to configure on kafka connectors +CONNECT_CREATE_ODE=true # Create kafka connectors to MongoDB for ODE +CONNECT_CREATE_GEOJSONCONVERTER=true # Create kafka connectors to MongoDB for GeoJSON Converter +CONNECT_CREATE_CONFLICTMONITOR=true # Create kafka connectors to MongoDB for Conflict Monitor +CONNECT_CREATE_DEDUPLICATOR=false # Create kafka connectors to MongoDB for Deduplicator +CONNECT_CREATE_MECDEPOSIT=false # Create kafka connectors to MongoDB for MecDeposit +# Relative path to the Kafka Connector yaml configuration script, upper level directories are supported # NOTE: This script is used to create kafka connectors -CONNECT_SCRIPT_RELATIVE_PATH="./kafka-connect/connect_start.sh" +CONNECT_CONFIG_RELATIVE_PATH="./jikkou/kafka-connectors-values.yaml" + ### Kafka connect variables - END ### \ No newline at end of file