|
| 1 | +import importlib |
| 2 | +import inspect |
| 3 | +from typing import Annotated, Callable, Literal, Optional |
| 4 | + |
| 5 | +import dagster as dg |
| 6 | +from dagster.components import ( |
| 7 | + Component, |
| 8 | + ComponentLoadContext, |
| 9 | + Model, |
| 10 | + ResolutionContext, |
| 11 | + Resolvable, |
| 12 | + ResolvedAssetSpec, |
| 13 | + Resolver, |
| 14 | +) |
| 15 | +from dagster_shared import check |
| 16 | +from typing_extensions import TypeAlias |
| 17 | + |
| 18 | + |
| 19 | +class DailyPartitionDefinitionModel(Resolvable, Model): |
| 20 | + type: Literal["daily"] = "daily" |
| 21 | + start_date: str |
| 22 | + end_offset: int = 0 |
| 23 | + |
| 24 | + |
| 25 | +def resolve_partition_definition( |
| 26 | + context: ResolutionContext, model: DailyPartitionDefinitionModel |
| 27 | +) -> dg.PartitionsDefinition: |
| 28 | + return dg.DailyPartitionsDefinition( |
| 29 | + start_date=model.start_date, |
| 30 | + end_offset=model.end_offset, |
| 31 | + ) |
| 32 | + |
| 33 | + |
| 34 | +ResolvedPartitionDefinition: TypeAlias = Annotated[ |
| 35 | + dg.DailyPartitionsDefinition, |
| 36 | + Resolver( |
| 37 | + resolve_partition_definition, |
| 38 | + model_field_type=DailyPartitionDefinitionModel, |
| 39 | + can_inject=True, |
| 40 | + ), |
| 41 | +] |
| 42 | + |
| 43 | + |
| 44 | +def resolve_callable(context: ResolutionContext, model: str) -> Callable: |
| 45 | + module_path, fn_name = model.rsplit(".", 1) |
| 46 | + module = importlib.import_module(module_path) |
| 47 | + return getattr(module, fn_name) |
| 48 | + |
| 49 | + |
| 50 | +ResolvableCallable: TypeAlias = Annotated[ |
| 51 | + Callable, Resolver(resolve_callable, model_field_type=str) |
| 52 | +] |
| 53 | + |
| 54 | + |
| 55 | +def get_resources_from_callable(func: Callable) -> list[str]: |
| 56 | + sig = inspect.signature(func) |
| 57 | + return [param.name for param in sig.parameters.values() if param.name != "context"] |
| 58 | + |
| 59 | + |
| 60 | +class ExecutableComponent(Component, Resolvable, Model): |
| 61 | + """Executable Component represents an executable node in the asset graph. |
| 62 | +
|
| 63 | + It is comprised of an execute_fn, which is can be specified as a fully |
| 64 | + resolved symbol reference in yaml. This makes it a plain ole' Python function |
| 65 | + that does the execution within the asset graph. |
| 66 | +
|
| 67 | + You can pass an arbitrary number of assets or asset checks to the component. |
| 68 | +
|
| 69 | + With this structure this component replaces @asset, @multi_asset, @asset_check, and @multi_asset_check. |
| 70 | + which can all be expressed as a single ExecutableComponent. |
| 71 | + """ |
| 72 | + |
| 73 | + # inferred from the function name if not provided |
| 74 | + name: Optional[str] = None |
| 75 | + partitions_def: Optional[ResolvedPartitionDefinition] = None |
| 76 | + assets: Optional[list[ResolvedAssetSpec]] = None |
| 77 | + execute_fn: ResolvableCallable |
| 78 | + |
| 79 | + def get_resource_keys(self) -> set[str]: |
| 80 | + return set(get_resources_from_callable(self.execute_fn)) |
| 81 | + |
| 82 | + def build_defs(self, context: ComponentLoadContext) -> dg.Definitions: |
| 83 | + required_resource_keys = self.get_resource_keys() |
| 84 | + |
| 85 | + check.invariant(len(self.assets or []) > 0, "assets is required for now") |
| 86 | + |
| 87 | + @dg.multi_asset( |
| 88 | + name=self.name or self.execute_fn.__name__, |
| 89 | + specs=self.assets, |
| 90 | + partitions_def=self.partitions_def, |
| 91 | + required_resource_keys=required_resource_keys, |
| 92 | + ) |
| 93 | + def _assets_def(context: dg.AssetExecutionContext, **kwargs): |
| 94 | + rd = context.resources.original_resource_dict |
| 95 | + to_pass = {k: v for k, v in rd.items() if k in required_resource_keys} |
| 96 | + check.invariant(set(to_pass.keys()) == required_resource_keys, "Resource keys mismatch") |
| 97 | + return self.execute_fn(context, **to_pass) |
| 98 | + |
| 99 | + return dg.Definitions(assets=[_assets_def]) |
0 commit comments