Skip to content

Commit 750ee61

Browse files
authored
Week 4 Challenge (#60)
* Week 4 Challenge
1 parent e62d4ae commit 750ee61

10 files changed

+156
-1
lines changed

week_4/project/dbt_config.py

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
DBT_PROJECT_PATH = "/opt/dagster/dagster_home/project/dbt_test_project"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
name: 'test_dbt'
2+
version: '1.0.0'
3+
config-version: 2
4+
5+
profile: 'test_dbt'
6+
7+
source-paths: ["models"]
8+
analysis-paths: ["analysis"]
9+
test-paths: ["tests"]
10+
data-paths: ["data"]
11+
macro-paths: ["macros"]
12+
snapshot-paths: ["snapshots"]
13+
14+
target-path: "target"
15+
clean-targets:
16+
- "target"
17+
- "dbt_modules"
18+
19+
models:
20+
test_dbt:
21+
example:
22+
+materialized: view
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{{ config(materialized='table') }}
2+
3+
4+
SELECT *
5+
FROM {{ source('postgresql', 'dbt_table') }}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{{ config(materialized='table') }}
2+
3+
4+
SELECT column_2 AS my_column
5+
FROM {{ ref('my_first_dbt_model') }}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
version: 2
2+
3+
models:
4+
- name: my_first_dbt_model
5+
description: "A starter dbt model"
6+
columns:
7+
- name: column_1
8+
tests:
9+
- not_null
10+
- name: column_2
11+
tests:
12+
- not_null
13+
- name: column_3
14+
tests:
15+
- not_null
16+
17+
- name: my_second_dbt_model
18+
description: "A starter dbt model"
19+
columns:
20+
- name: my_column
21+
tests:
22+
- not_null
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
version: 2
2+
3+
sources:
4+
- name: postgresql
5+
schema: analytics
6+
tables:
7+
- name: dbt_table
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
test_dbt:
2+
target: test
3+
outputs:
4+
test:
5+
type: postgres
6+
host: postgresql
7+
user: postgres_user
8+
password: postgres_password
9+
port: 5432
10+
dbname: postgres_db
11+
schema: analytics
12+
threads: 4

week_4/project/repo.py

+10-1
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,20 @@
1-
from dagster import repository
1+
from dagster import repository, with_resources
2+
from dagster_dbt import dbt_cli_resource
3+
from project.dbt_config import DBT_PROJECT_PATH
4+
from project.resources import postgres_resource
25
from project.week_4 import (
36
get_s3_data_docker,
47
process_data_docker,
58
put_redis_data_docker,
69
)
10+
from project.week_4_challenge import create_dbt_table, insert_dbt_data
711

812

913
@repository
1014
def repo():
1115
return [get_s3_data_docker, process_data_docker, put_redis_data_docker]
16+
17+
18+
@repository
19+
def assets_dbt():
20+
pass

week_4/project/resources.py

+36
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,26 @@
44

55
import boto3
66
import redis
7+
import sqlalchemy
78
from dagster import Field, Int, String, resource
89

910

11+
class Postgres:
12+
def __init__(self, host: str, user: str, password: str, database: str):
13+
self.host = host
14+
self.user = user
15+
self.password = password
16+
self.database = database
17+
self._engine = sqlalchemy.create_engine(self.uri)
18+
19+
@property
20+
def uri(self):
21+
return f"postgresql://{self.user}:{self.password}@{self.host}/{self.database}"
22+
23+
def execute_query(self, query: str):
24+
self._engine.execute(query)
25+
26+
1027
class S3:
1128
def __init__(self, bucket: str, access_key: str, secret_key: str, endpoint_url: str = None):
1229
self.bucket = bucket
@@ -39,6 +56,25 @@ def put_data(self, name: str, value: str):
3956
self.client.set(name, value)
4057

4158

59+
@resource(
60+
config_schema={
61+
"host": Field(String),
62+
"user": Field(String),
63+
"password": Field(String),
64+
"database": Field(String),
65+
},
66+
description="A resource that can run Postgres",
67+
)
68+
def postgres_resource(context) -> Postgres:
69+
"""This resource defines a Postgres client"""
70+
return Postgres(
71+
host=context.resource_config["host"],
72+
user=context.resource_config["user"],
73+
password=context.resource_config["password"],
74+
database=context.resource_config["database"],
75+
)
76+
77+
4278
@resource
4379
def mock_s3_resource(context):
4480
stocks = [

week_4/project/week_4_challenge.py

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
from random import randint
2+
3+
from dagster import AssetIn, asset
4+
from dagster_dbt import load_assets_from_dbt_project
5+
from project.dbt_config import DBT_PROJECT_PATH
6+
7+
8+
@asset(
9+
required_resource_keys={"database"},
10+
op_tags={"kind": "postgres"},
11+
)
12+
def create_dbt_table(context):
13+
sql = "CREATE SCHEMA IF NOT EXISTS analytics;"
14+
context.resources.database.execute_query(sql)
15+
sql = "CREATE TABLE IF NOT EXISTS analytics.dbt_table (column_1 VARCHAR(100), column_2 VARCHAR(100), column_3 VARCHAR(100));"
16+
context.resources.database.execute_query(sql)
17+
18+
19+
@asset(
20+
required_resource_keys={"database"},
21+
op_tags={"kind": "postgres"},
22+
)
23+
def insert_dbt_data(context, create_dbt_table):
24+
sql = "INSERT INTO analytics.dbt_table (column_1, column_2, column_3) VALUES ('A', 'B', 'C');"
25+
26+
number_of_rows = randint(1, 10)
27+
for _ in range(number_of_rows):
28+
context.resources.database.execute_query(sql)
29+
context.log.info("Inserted a row")
30+
31+
context.log.info("Batch inserted")
32+
33+
34+
@asset
35+
def final(context):
36+
context.log.info("Week 4 Challenge completed")

0 commit comments

Comments
 (0)