Skip to content

Commit

Permalink
fix concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
C-Loftus committed Nov 29, 2024
1 parent 702fad9 commit af49f07
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 17 deletions.
2 changes: 1 addition & 1 deletion dagster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ scheduler:
class: DagsterDaemonScheduler

run_queue:
max_concurrent_runs: 1
max_concurrent_runs: 3

run_monitoring:
enabled: true
Expand Down
4 changes: 4 additions & 0 deletions userCode/lib/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ def create_service(
container_context.networks if len(container_context.networks) else None
),
restart_policy=RestartPolicy(condition="none"),
# Replicated jobs terminate after run
# TODO: There is still a potential error here. If a container fails,
# the job finishes but still appears in the swarm stack
# This might cause an issue
mode=ServiceMode("replicated-job", concurrency=1, replicas=1),
configs=[gleaner, nabu],
)
Expand Down
46 changes: 30 additions & 16 deletions userCode/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
)
import docker
import dagster_slack
import docker.errors
import requests
import yaml
from .lib.classes import S3
Expand Down Expand Up @@ -174,23 +175,36 @@ def docker_client_environment():
# navigating / mounting file systems for local config access
api_client = docker.APIClient()

try:
gleanerconfig = client.configs.list(filters={"name": ["gleaner"]})
nabuconfig = client.configs.list(filters={"name": ["nabu"]})
if gleanerconfig:
api_client.remove_config(gleanerconfig[0].id)
if nabuconfig:
api_client.remove_config(nabuconfig[0].id)
except IndexError as e:
get_dagster_logger().info(
f"No configs found to remove during docker client environment creation: {e}"
)
# At the start of the pipeline, remove any existing configs
# and try to regenerate a new one
# since we don't want old/stale configs to be used

s3_client = S3()
client.configs.create(name="nabu", data=s3_client.read("configs/nabuconfig.yaml"))
client.configs.create(
name="gleaner", data=s3_client.read("configs/gleanerconfig.yaml")
)
# However, if another container is using the config it will fail and throw an error
# Instead of using a mutex and trying to synchronize access,
# we just assume that a config that is in use is not stale.
configs = {
"gleaner": "configs/gleanerconfig.yaml",
"nabu": "configs/nabuconfig.yaml",
}

for config_name, config_file in configs.items():
try:
config = client.configs.list(filters={"name": [config_name]})
if config:
api_client.remove_config(config[0].id)
except docker.errors.APIError as e:
get_dagster_logger().info(
f"Skipped removing {config_name} config during docker client environment creation since it is likely in use. Underlying skipped exception was {e}"
)
except IndexError as e:
get_dagster_logger().info(f"No config found for {config_name}: {e}")

try:
client.configs.create(name=config_name, data=S3().read(config_file))
except docker.errors.APIError as e:
get_dagster_logger().info(
f"Skipped creating {config_name} config during docker client environment creation since it is likely in use. Underlying skipped exception was {e}"
)


@asset_check(asset=docker_client_environment)
Expand Down

0 comments on commit af49f07

Please sign in to comment.