Skip to content

Commit b536325

Browse files
tfx-copybaratensorflow-extended-team
authored and
tensorflow-extended-team
committed
Deprecate airflow_runner.AirflowDAGRunner and runner.KubeflowRunner after 0.14 release
PiperOrigin-RevId: 266229885
1 parent f665ff8 commit b536325

File tree

7 files changed

+29
-16
lines changed

7 files changed

+29
-16
lines changed

tfx/examples/chicago_taxi_pipeline/taxi_pipeline_kubeflow_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import os
2121
import tensorflow as tf
2222
from tfx.examples.chicago_taxi_pipeline import taxi_pipeline_kubeflow
23-
from tfx.orchestration.kubeflow.runner import KubeflowRunner
23+
from tfx.orchestration.kubeflow.kubeflow_dag_runner import KubeflowDagRunner
2424

2525

2626
class TaxiPipelineKubeflowTest(tf.test.TestCase):
@@ -50,7 +50,7 @@ def testTaxiPipelineConstructionAndDefinitionFileExists(self):
5050
._ai_platform_serving_args)
5151
self.assertEqual(9, len(logical_pipeline.components))
5252

53-
KubeflowRunner().run(logical_pipeline)
53+
KubeflowDagRunner().run(logical_pipeline)
5454
file_path = os.path.join(self._tmp_dir,
5555
'chicago_taxi_pipeline_kubeflow.tar.gz')
5656
self.assertTrue(tf.gfile.Exists(file_path))

tfx/examples/chicago_taxi_pipeline/taxi_pipeline_mysql.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
from tfx.components.transform.component import Transform
3333
from tfx.orchestration import metadata
3434
from tfx.orchestration import pipeline
35-
from tfx.orchestration.airflow.airflow_runner import AirflowDAGRunner
35+
from tfx.orchestration.airflow.airflow_dag_runner import AirflowDagRunner
3636
from tfx.proto import evaluator_pb2
3737
from tfx.proto import pusher_pb2
3838
from tfx.proto import trainer_pb2
@@ -143,7 +143,8 @@ def _create_pipeline(
143143
metadata_connection_config=metadata_connection_config)
144144

145145

146-
airflow_pipeline = AirflowDAGRunner(_airflow_config).run(
146+
# 'DAG' below need to be kept for Airflow to detect dag.
147+
DAG = AirflowDagRunner(_airflow_config).run(
147148
_create_pipeline(
148149
pipeline_name=_pipeline_name,
149150
pipeline_root=_pipeline_root,

tfx/examples/chicago_taxi_pipeline/taxi_pipeline_portable_beam.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
from tfx.components.transform.component import Transform
3232
from tfx.orchestration import metadata
3333
from tfx.orchestration import pipeline
34-
from tfx.orchestration.airflow.airflow_runner import AirflowDAGRunner
34+
from tfx.orchestration.airflow.airflow_dag_runner import AirflowDagRunner
3535
from tfx.proto import evaluator_pb2
3636
from tfx.proto import pusher_pb2
3737
from tfx.proto import trainer_pb2
@@ -159,12 +159,14 @@ def _create_pipeline(pipeline_name: Text, pipeline_root: Text, data_root: Text,
159159
# TODO(FLINK-10672): Obviate setting BATCH_FORCED.
160160
'--execution_mode_for_batch=BATCH_FORCED',
161161
],
162-
# LINT.ThenChange(tfx/examples/chicago_taxi/setup_beam_on_portable_beam.sh)
162+
# LINT.ThenChange(../chicago_taxi/setup_beam_on_spark.sh)
163+
# LINT.ThenChange(../chicago_taxi/setup_beam_on_flink.sh)
163164
})
164165

165166

166167
# TODO(jyzhao): consider using beam orchestrator after b/137294896.
167-
airflow_pipeline = AirflowDAGRunner(_airflow_config).run(
168+
# 'DAG' below need to be kept for Airflow to detect dag.
169+
DAG = AirflowDagRunner(_airflow_config).run(
168170
_create_pipeline(
169171
pipeline_name=_pipeline_name,
170172
pipeline_root=_pipeline_root,

tfx/examples/chicago_taxi_pipeline/taxi_pipeline_simple.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
from tfx.components.transform.component import Transform
3232
from tfx.orchestration import metadata
3333
from tfx.orchestration import pipeline
34-
from tfx.orchestration.airflow.airflow_runner import AirflowDAGRunner
34+
from tfx.orchestration.airflow.airflow_dag_runner import AirflowDagRunner
3535
from tfx.proto import evaluator_pb2
3636
from tfx.proto import pusher_pb2
3737
from tfx.proto import trainer_pb2
@@ -137,7 +137,8 @@ def _create_pipeline(pipeline_name: Text, pipeline_root: Text, data_root: Text,
137137
metadata_path))
138138

139139

140-
airflow_pipeline = AirflowDAGRunner(_airflow_config).run(
140+
# 'DAG' below need to be kept for Airflow to detect dag.
141+
DAG = AirflowDagRunner(_airflow_config).run(
141142
_create_pipeline(
142143
pipeline_name=_pipeline_name,
143144
pipeline_root=_pipeline_root,

tfx/examples/chicago_taxi_pipeline/taxi_pipeline_simple_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import tensorflow as tf
2525

2626
from tfx.examples.chicago_taxi_pipeline import taxi_pipeline_simple
27-
from tfx.orchestration.airflow.airflow_runner import AirflowDAGRunner
27+
from tfx.orchestration.airflow.airflow_dag_runner import AirflowDagRunner
2828

2929

3030
class TaxiPipelineSimpleTest(tf.test.TestCase):
@@ -48,7 +48,7 @@ def testTaxiPipelineCheckDagConstruction(self):
4848
serving_model_dir=self._test_dir,
4949
metadata_path=self._test_dir)
5050
self.assertEqual(9, len(logical_pipeline.components))
51-
pipeline = AirflowDAGRunner(airflow_config).run(logical_pipeline)
51+
pipeline = AirflowDagRunner(airflow_config).run(logical_pipeline)
5252
self.assertIsInstance(pipeline, models.DAG)
5353

5454

tfx/orchestration/airflow/airflow_runner.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@
1717
from __future__ import division
1818
from __future__ import print_function
1919

20+
from tensorflow.python.util import deprecation # pylint: disable=g-direct-tensorflow-import
2021
from tfx.orchestration.airflow import airflow_dag_runner
2122

22-
# TODO(jyzhao): deprecate this and change taxi examples.
23-
AirflowDAGRunner = airflow_dag_runner.AirflowDagRunner
23+
AirflowDAGRunner = deprecation.deprecated_alias( # pylint: disable=invalid-name
24+
deprecated_name='airflow_runner.AirflowDAGRunner',
25+
name='airflow_dag_runner.AirflowDagRunner',
26+
func_or_class=airflow_dag_runner.AirflowDagRunner)

tfx/orchestration/kubeflow/runner.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,14 @@
1717
from __future__ import division
1818
from __future__ import print_function
1919

20+
from tensorflow.python.util import deprecation # pylint: disable=g-direct-tensorflow-import
2021
from tfx.orchestration.kubeflow import kubeflow_dag_runner
2122

22-
# TODO(jyzhao): deprecate this and change taxi examples.
23-
KubeflowRunner = kubeflow_dag_runner.KubeflowDagRunner
24-
KubeflowRunnerConfig = kubeflow_dag_runner.KubeflowDagRunnerConfig
23+
KubeflowRunner = deprecation.deprecated_alias( # pylint: disable=invalid-name
24+
deprecated_name='runner.KubeflowRunner',
25+
name='kubeflow_dag_runner.KubeflowDagRunner',
26+
func_or_class=kubeflow_dag_runner.KubeflowDagRunner)
27+
KubeflowRunnerConfig = deprecation.deprecated_alias( # pylint: disable=invalid-name
28+
deprecated_name='runner.KubeflowRunnerConfig',
29+
name='kubeflow_dag_runner.KubeflowDagRunnerConfig',
30+
func_or_class=kubeflow_dag_runner.KubeflowDagRunnerConfig)

0 commit comments

Comments
 (0)