Skip to content

Commit

Permalink
Merge pull request #2 from usdot-jpo-ode/develop
Browse files Browse the repository at this point in the history
Sync the CDOT fork with USDOT
  • Loading branch information
Michael7371 authored Dec 23, 2024
2 parents 8a67952 + 4d76447 commit 6e701ef
Show file tree
Hide file tree
Showing 78 changed files with 5,528 additions and 327 deletions.
24 changes: 24 additions & 0 deletions .github/workflows/docker.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
name: Docker build

on:
pull_request:
types: [opened, synchronize, reopened]

jobs:
jpo-deduplicator:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
- name: Build
uses: docker/build-push-action@v3
with:
context: jpo-deduplicator
build-args: |
MAVEN_GITHUB_TOKEN_NAME=${{ vars.MAVEN_GITHUB_TOKEN_NAME }}
MAVEN_GITHUB_TOKEN=${{ secrets.MAVEN_GITHUB_TOKEN }}
MAVEN_GITHUB_ORG=${{ github.repository_owner }}
secrets: |
MAVEN_GITHUB_TOKEN: ${{ secrets.MAVEN_GITHUB_TOKEN }}
39 changes: 39 additions & 0 deletions .github/workflows/dockerhub.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
name: "DockerHub Build and Push"

on:
push:
branches:
- "develop"
- "master"
- "release/*"

jobs:
dockerhub-jpo-deduplicator:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
- name: Login to DockerHub
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}

- name: Replcae Docker tag
id: set_tag
run: echo "TAG=$(echo ${GITHUB_REF##*/} | sed 's/\//-/g')" >> $GITHUB_ENV

- name: Build
uses: docker/build-push-action@v3
with:
context: jpo-deduplicator
push: true
tags: usdotjpoode/jpo-deduplicator:${{ env.TAG }}
build-args: |
MAVEN_GITHUB_TOKEN_NAME=${{ vars.MAVEN_GITHUB_TOKEN_NAME }}
MAVEN_GITHUB_TOKEN=${{ secrets.MAVEN_GITHUB_TOKEN }}
MAVEN_GITHUB_ORG=${{ github.repository_owner }}
secrets: |
MAVEN_GITHUB_TOKEN: ${{ secrets.MAVEN_GITHUB_TOKEN }}
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
**/.env
**/.env

**/target
129 changes: 107 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@ The JPO ITS utilities repository serves as a central location for deploying open
- [Quick Run](#quick-run-1)
- [4. MongoDB Kafka Connect](#4-mongodb-kafka-connect)
- [Configuration](#configuration)
- [Configure Kafka Connector Creation](#configure-kafka-connector-creation)
- [Quick Run](#quick-run-2)
- [5. Deduplicator](#5-jpo-Deduplicator)
- [Deduplication Configuration](#deduplication-config)
- [Github Token Generation](#generate-a-github-token)
- [Quick Run](#quick-run-3)


<a name="base-configuration"></a>
Expand Down Expand Up @@ -88,7 +93,7 @@ An optional `kafka-init`, `schema-registry`, and `kafka-ui` instance can be depl

### Configure Topic Creation

The Kafka topics created by the `kafka-setup` service are configured in the [kafka-topics-values.yaml](kafka/kafka-topics-values.yaml) file. The topics in that file are organized by the application, and sorted into "Stream Topics" (those with `cleanup.policy` = `delete`) and "Table Topics" (with `cleanup.policy` = `compact`).
The Kafka topics created by the `kafka-setup` service are configured in the [kafka-topics-values.yaml](jikkou/kafka-topics-values.yaml) file. The topics in that file are organized by the application, and sorted into "Stream Topics" (those with `cleanup.policy` = `delete`) and "Table Topics" (with `cleanup.policy` = `compact`).

The following enviroment variables can be used to configure Kafka Topic creation.

Expand All @@ -103,8 +108,7 @@ The following enviroment variables can be used to configure Kafka Topic creation
| `KAFKA_TOPIC_MIN_INSYNC_REPLICAS` | Minumum number of in-sync replicas (for use with ack=all) |
| `KAFKA_TOPIC_RETENTION_MS` | Retention time for stream topics, milliseconds |
| `KAFKA_TOPIC_DELETE_RETENTION_MS` | Tombstone retention time for compacted topics, milliseconds |


| `KAFKA_TOPIC_CONFIG_RELATIVE_PATH` | Relative path to the Kafka topic yaml configuration script, upper level directories are supported |

### Quick Run

Expand All @@ -121,34 +125,49 @@ The following enviroment variables can be used to configure Kafka Topic creation
<a name="mongodb-kafka-connect"></a>

## 4. MongoDB Kafka Connect
The mongo-connector service connects to specified Kafka topics (as defined in the mongo-connector/connect_start.sh script) and deposits these messages to separate collections in the MongoDB Database. The codebase that provides this functionality comes from Confluent using their community licensed [cp-kafka-connect image](https://hub.docker.com/r/confluentinc/cp-kafka-connect). Documentation for this image can be found [here](https://docs.confluent.io/platform/current/connect/index.html#what-is-kafka-connect).
The mongo-connector service connects to specified Kafka topics and deposits these messages to separate collections in the MongoDB Database. The codebase that provides this functionality comes from Confluent using their community licensed [cp-kafka-connect image](https://hub.docker.com/r/confluentinc/cp-kafka-connect). Documentation for this image can be found [here](https://docs.confluent.io/platform/current/connect/index.html#what-is-kafka-connect).

### Configuration
Provided in the mongo-connector directory is a sample configuration shell script ([connect_start.sh](./kafka-connect/connect_start.sh)) that can be used to create kafka connectors to MongoDB. The connectors in kafka connect are defined in the format that follows:

``` shell
declare -A config_name=([name]="topic_name" [collection]="mongo_collection_name"
[convert_timestamp]=true [timefield]="timestamp" [use_key]=true [key]="key" [add_timestamp]=true)
```

The format above describes the basic configuration for configuring a sink connector, this should be placed at the beginning of the connect_start.sh file. In general we recommend to keep the MongoDB collection name the same as the topic name to avoid confusion. Additionally, if there is a top level timefield set `convert_timestamp` to true and then specify the time field name that appears in the message. This will allow MongoDB to transform that message into a date object to allow for TTL creation and reduce message size. To override MongoDB's default message `_id` field, set `use_key` to true and then set the `key` property to "key". The "add_timestamp" field defines whether the connector will add a auto generated timestamp to each document. This allows for creation of Time To Live (TTL) indexes on the collections to help limit collection size growth.

After the sink connector is configured above, then make sure to call the createSink function with the config_name of the configuration like so:

``` shell
createSink config_name
```

This needs to be put after the createSink function definition. To use a different `connect_start.sh` script, pass in the relative path of the new script by overriding the `CONNECT_SCRIPT_RELATIVE_PATH` environmental variable.
Kafka connectors are managed by the

Set the `COMPOSE_PROFILES` environmental variable as follows:

- `kafka_connect` will only spin up the `kafka-connect` service in [docker-compose-connect](docker-compose-connect.yml)
- `kafka_connect` will only spin up the `kafka-connect` and `kafka-init` services in [docker-compose-connect](docker-compose-connect.yml)
- NOTE: This implies that you will be using a separate Kafka and MongoDB cluster
- `kafka_connect_standalone` will run the following:
1. `kafka-connect` service from [docker-compose-connect](docker-compose-connect.yml)
2. `kafka` service from [docker-compose-kafka](docker-compose-kafka.yml)
3. `mongo` and `mongo-setup` services from [docker-compose-mongo](docker-compose-mongo.yml)
2. `kafka-init` service from [docker-compose-connect](docker-compose-connect.yml)
3. `kafka` service from [docker-compose-kafka](docker-compose-kafka.yml)
4. `mongo` and `mongo-setup` services from [docker-compose-mongo](docker-compose-mongo.yml)

### Configure Kafka Connector Creation

The Kafka connectors created by the `kafka-connect-setup` service are configured in the [kafka-connectors-values.yaml](jikkou/kafka-connectors-values.yaml) file. The connectors in that file are organized by the application, and given parameters to define the Kafka -> MongoDB sync connector:

| Connector Variable | Required | Condition | Description|
|---|---|---|---|
| `topicName` | Yes | Always | The name of the Kafka topic to sync from |
| `collectionName` | Yes | Always | The name of the MongoDB collection to write to |
| `generateTimestamp` | No | Optional | Enable or disable adding a timestamp to each message (true/false) |
| `connectorName` | No | Optional | Override the name of the connector from the `collectionName` to this field instead |
| `useTimestamp` | No | Optional | Converts the `timestampField` field at the top level of the value to a BSON date |
| `timestampField` | No | Required if `useTimestamp` is `true` | The name of the timestamp field at the top level of the message |
| `useKey` | No | Optional | Override the document `_id` field in MongoDB to use a specified `keyField` from the message |
| `keyField` | No | Required if `useKey` is `true` | The name of the key field |

The following environment variables can be used to configure Kafka Connectors:

| Environment Variable | Description |
|---|---|
| `CONNECT_URL` | Kafka connect API URL |
| `CONNECT_LOG_LEVEL` | Kafka connect log level (`OFF`, `ERROR`, `WARN`, `INFO`) |
| `CONNECT_TASKS_MAX` | Number of concurrent tasks to configure on kafka connectors |
| `CONNECT_CREATE_ODE` | Whether to create kafka connectors for the ODE |
| `CONNECT_CREATE_GEOJSONCONVERTER` | Whether to create topics for the GeojsonConverter |
| `CONNECT_CREATE_CONFLICTMONITOR` | Whether to create kafka connectors for the Conflict Monitor |
| `CONNECT_CREATE_DEDUPLICATOR` | Whether to create topics for the Deduplicator |
| `CONNECT_CONFIG_RELATIVE_PATH` | Relative path to the Kafka connector yaml configuration script, upper level directories are supported |

### Quick Run

Expand All @@ -170,4 +189,70 @@ Set the `COMPOSE_PROFILES` environmental variable as follows:
3. Click `OdeBsmJson`, and now you should see your message!
8. Feel free to test this with other topics or by producing to these topics using the [ODE](https://github.com/usdot-jpo-ode/jpo-ode)


<a name="deduplicator"></a>

## 5. jpo-deduplicator
The JPO-Deduplicator is a Kafka Java spring-boot application designed to reduce the number of messages stored and processed in the ODE system. This is done by reading in messages from an input topic (such as topic.ProcessedMap) and outputting a subset of those messages on a related output topic (topic.DeduplicatedProcessedMap). Functionally, this is done by removing deduplicate messages from the input topic and only passing on unique messages. In addition, each topic will pass on at least 1 message per hour even if the message is a duplicate. This behavior helps ensure messages are still flowing through the system. The following topics currently support deduplication.

- topic.ProcessedMap -> topic.DeduplicatedProcessedMap
- topic.ProcessedMapWKT -> topic.DeduplicatedProcessedMapWKT
- topic.OdeMapJson -> topic.DeduplicatedOdeMapJson
- topic.OdeTimJson -> topic.DeduplicatedOdeTimJson
- topic.OdeRawEncodedTIMJson -> topic.DeduplicatedOdeRawEncodedTIMJson
- topic.OdeBsmJson -> topic.DeduplicatedOdeBsmJson
- topic.ProcessedSpat -> topic.DeduplicatedProcessedSpat

### Deduplication Config

When running the jpo-deduplication as a submodule in jpo-utils, the deduplicator will automatically turn on deduplication for a topic when that topic is created. For example if the KAFKA_TOPIC_CREATE_GEOJSONCONVERTER environment variable is set to true, the deduplicator will start performing deduplication for ProcessedMap, ProcessedMapWKT, and ProcessedSpat data.

To manually configure deduplication for a topic, the following environment variables can also be used.

| Environment Variable | Description |
|---|---|
| `ENABLE_PROCESSED_MAP_DEDUPLICATION` | `true` / `false` - Enable ProcessedMap message Deduplication |
| `ENABLE_PROCESSED_MAP_WKT_DEDUPLICATION` | `true` / `false` - Enable ProcessedMap WKT message Deduplication |
| `ENABLE_ODE_MAP_DEDUPLICATION` | `true` / `false` - Enable ODE MAP message Deduplication |
| `ENABLE_ODE_TIM_DEDUPLICATION` | `true` / `false` - Enable ODE TIM message Deduplication |
| `ENABLE_ODE_RAW_ENCODED_TIM_DEDUPLICATION` | `true` / `false` - Enable ODE Raw Encoded TIM Deduplication |
| `ENABLE_PROCESSED_SPAT_DEDUPLICATION` | `true` / `false` - Enable ProcessedSpat Deduplication |
| `ENABLE_ODE_BSM_DEDUPLICATION` | `true` / `false` - Enable ODE BSM Deduplication |

### Generate a Github Token

A GitHub token is required to pull artifacts from GitHub repositories. This is required to obtain the jpo-deduplicator jars and must be done before attempting to build this repository.

1. Log into GitHub.
2. Navigate to Settings -> Developer settings -> Personal access tokens.
3. Click "New personal access token (classic)".
1. As of now, GitHub does not support `Fine-grained tokens` for obtaining packages.
4. Provide a name and expiration for the token.
5. Select the `read:packages` scope.
6. Click "Generate token" and copy the token.
7. Copy the token name and token value into your `.env` file.

For local development the following steps are also required
8. Create a copy of [settings.xml](jpo-deduplicator/jpo-deduplicator/settings.xml) and save it to `~/.m2/settings.xml`
9. Update the variables in your `~/.m2/settings.xml` with the token value and target jpo-ode organization.

### Quick Run
1. Create a copy of `sample.env` and rename it to `.env`.
2. Update the variable `MAVEN_GITHUB_TOKEN` to a github token used for downloading jar file dependencies. For full instructions on how to generate a token please see here:
3. Set the password for `MONGO_ADMIN_DB_PASS` and `MONGO_READ_WRITE_PASS` environmental variables to a secure password.
4. Set the `COMPOSE_PROFILES` variable to: `kafka,kafka_ui,kafka_setup, jpo-deduplicator`
5. Navigate back to the root directory and run the following command: `docker compose up -d`
6. Produce a sample message to one of the sink topics by using `kafka_ui` by:
1. Go to `localhost:8001`
2. Click local -> Topics
3. Select `topic.OdeMapJson`
4. Select `Produce Message`
5. Copy in sample JSON for a Map Message
6. Click `Produce Message` multiple times
7. View the synced message in `kafka_ui` by:
1. Go to `localhost:8001`
2. Click local -> Topics
3. Select `topic.DeduplicatedOdeMapJson`
4. You should now see only one copy of the map message sent.

[Back to top](#toc)
56 changes: 46 additions & 10 deletions docker-compose-connect.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,68 @@ services:
memory: 4G
ports:
- "8083:8083"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8083/connectors"]
interval: 30s
timeout: 10s
retries: 4
depends_on:
mongo:
condition: service_healthy
kafka:
condition: service_healthy
environment:
MONGO_URI: ${MONGO_URI}
MONGO_DB_NAME: ${MONGO_DB_NAME}
CONNECT_BOOTSTRAP_SERVERS: ${KAFKA_BOOTSTRAP_SERVERS}
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: kafka-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: topic.kafka-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_CONFIG_STORAGE_CLEANUP_POLICY: compact
# Topics are created with jikkou in the kafka-setup service
CONNECT_CONFIG_STORAGE_TOPIC: topic.KafkaConnectConfigs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: -1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: topic.kafka-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_TOPIC: topic.KafkaConnectOffsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: -1
CONNECT_OFFSET_STORAGE_CLEANUP_POLICY: compact
CONNECT_STATUS_STORAGE_TOPIC: topic.kafka-connect-status
CONNECT_STATUS_STORAGE_TOPIC: topic.KafkaConnectStatus
CONNECT_STATUS_STORAGE_CLEANUP_POLICY: compact
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: -1
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_LOG4J_ROOT_LOGLEVEL: ${CONNECT_LOG_LEVEL}
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=${CONNECT_LOG_LEVEL},org.reflections=${CONNECT_LOG_LEVEL},com.mongodb.kafka=${CONNECT_LOG_LEVEL}"
CONNECT_PLUGIN_PATH: /usr/share/confluent-hub-components

kafka-connect-setup:
profiles:
- all
- kafka_connect
- kafka_connect_standalone
image: jpo-jikkou
build:
context: jikkou
dockerfile: Dockerfile.jikkou
entrypoint: ./kafka_connector_init.sh
restart: on-failure
deploy:
resources:
limits:
cpus: '0.5'
memory: 1G
depends_on:
kafka-connect:
condition: service_healthy
environment:
CONNECT_URL: ${CONNECT_URL}
CONNECT_TASKS_MAX: ${CONNECT_TASKS_MAX}
CONNECT_CREATE_ODE: ${CONNECT_CREATE_ODE}
CONNECT_CREATE_GEOJSONCONVERTER: ${CONNECT_CREATE_GEOJSONCONVERTER}
CONNECT_CREATE_CONFLICTMONITOR: ${CONNECT_CREATE_CONFLICTMONITOR}
CONNECT_CREATE_DEDUPLICATOR: ${CONNECT_CREATE_DEDUPLICATOR}
MONGO_CONNECTOR_USERNAME: ${MONGO_ADMIN_DB_USER}
MONGO_CONNECTOR_PASSWORD: ${MONGO_ADMIN_DB_PASS:?}
MONGO_DB_IP: ${MONGO_IP}
MONGO_DB_NAME: ${MONGO_DB_NAME}
volumes:
- ${CONNECT_SCRIPT_RELATIVE_PATH}:/scripts/connect_start.sh
- ${CONNECT_CONFIG_RELATIVE_PATH-./jikkou/kafka-connectors-values.yaml}:/app/kafka-connectors-values.yaml
44 changes: 44 additions & 0 deletions docker-compose-deduplicator.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
services:
deduplicator:
profiles:
- all
- deduplicator
build:
context: jpo-deduplicator
dockerfile: Dockerfile
args:
MAVEN_GITHUB_TOKEN: ${MAVEN_GITHUB_TOKEN:?error}
MAVEN_GITHUB_ORG: ${MAVEN_GITHUB_ORG:?error}
image: jpo-deduplicator:latest
restart: ${RESTART_POLICY}
environment:
DOCKER_HOST_IP: ${DOCKER_HOST_IP}
KAFKA_BOOTSTRAP_SERVERS: ${KAFKA_BOOTSTRAP_SERVERS:?error}
spring.kafka.bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:?error}
enableProcessedMapDeduplication: ${ENABLE_PROCESSED_MAP_DEDUPLICATION}
enableProcessedMapWktDeduplication: ${ENABLE_PROCESSED_MAP_WKT_DEDUPLICATION}
enableOdeMapDeduplication: ${ENABLE_ODE_MAP_DEDUPLICATION}
enableOdeTimDeduplication: ${ENABLE_ODE_TIM_DEDUPLICATION}
enableOdeRawEncodedTimDeduplication: ${ENABLE_ODE_RAW_ENCODED_TIM_DEDUPLICATION}
enableProcessedSpatDeduplication: ${ENABLE_PROCESSED_SPAT_DEDUPLICATION}
enableOdeBsmDeduplication: ${ENABLE_ODE_BSM_DEDUPLICATION}



healthcheck:
test: ["CMD", "java", "-version"]
interval: 10s
timeout: 10s
retries: 20
logging:
options:
max-size: "10m"
max-file: "5"
deploy:
resources:
limits:
memory: 3G
depends_on:
kafka:
condition: service_healthy
required: false
Loading

0 comments on commit 6e701ef

Please sign in to comment.