26
26
from trainer import taxi
27
27
28
28
import tensorflow_transform as transform
29
+ import tensorflow_transform .beam as tft_beam
29
30
30
- from tensorflow_transform .beam import impl as beam_impl
31
- from tensorflow_transform .beam .tft_beam_io import transform_fn_io
32
31
from tensorflow_transform .coders import example_proto_coder
33
32
from tensorflow_transform .tf_metadata import dataset_metadata
34
33
from tensorflow_transform .tf_metadata import dataset_schema
@@ -127,7 +126,7 @@ def preprocessing_fn(inputs):
127
126
raw_data_metadata = dataset_metadata .DatasetMetadata (raw_schema )
128
127
129
128
with beam .Pipeline (argv = pipeline_args ) as pipeline :
130
- with beam_impl .Context (temp_dir = working_dir ):
129
+ with tft_beam .Context (temp_dir = working_dir ):
131
130
if input_handle .lower ().endswith ('csv' ):
132
131
csv_coder = taxi .make_csv_coder (schema )
133
132
raw_data = (
@@ -147,22 +146,22 @@ def preprocessing_fn(inputs):
147
146
if transform_dir is None :
148
147
transform_fn = (
149
148
(raw_data , raw_data_metadata )
150
- | ('Analyze' >> beam_impl .AnalyzeDataset (preprocessing_fn )))
149
+ | ('Analyze' >> tft_beam .AnalyzeDataset (preprocessing_fn )))
151
150
152
151
_ = (
153
152
transform_fn
154
153
| ('WriteTransformFn' >>
155
- transform_fn_io .WriteTransformFn (working_dir )))
154
+ tft_beam .WriteTransformFn (working_dir )))
156
155
else :
157
- transform_fn = pipeline | transform_fn_io .ReadTransformFn (transform_dir )
156
+ transform_fn = pipeline | tft_beam .ReadTransformFn (transform_dir )
158
157
159
158
# Shuffling the data before materialization will improve Training
160
159
# effectiveness downstream.
161
160
shuffled_data = raw_data | 'RandomizeData' >> beam .transforms .Reshuffle ()
162
161
163
162
(transformed_data , transformed_metadata ) = (
164
163
((shuffled_data , raw_data_metadata ), transform_fn )
165
- | 'Transform' >> beam_impl .TransformDataset ())
164
+ | 'Transform' >> tft_beam .TransformDataset ())
166
165
167
166
coder = example_proto_coder .ExampleProtoCoder (transformed_metadata .schema )
168
167
_ = (
0 commit comments