27
27
28
28
import tensorflow_transform as transform
29
29
30
- from tensorflow_transform .beam import impl as beam_impl
31
- from tensorflow_transform .beam .tft_beam_io import transform_fn_io
32
30
from tensorflow_transform .coders import example_proto_coder
31
+ import tensorflow_transform .beam as tft_beam
33
32
from tensorflow_transform .tf_metadata import dataset_metadata
34
33
from tensorflow_transform .tf_metadata import dataset_schema
35
34
@@ -126,7 +125,7 @@ def preprocessing_fn(inputs):
126
125
raw_data_metadata = dataset_metadata .DatasetMetadata (raw_schema )
127
126
128
127
with beam .Pipeline (argv = pipeline_args ) as pipeline :
129
- with beam_impl .Context (temp_dir = working_dir ):
128
+ with tft_beam .Context (temp_dir = working_dir ):
130
129
if input_handle .lower ().endswith ('csv' ):
131
130
csv_coder = taxi .make_csv_coder (schema )
132
131
raw_data = (
@@ -146,22 +145,22 @@ def preprocessing_fn(inputs):
146
145
if transform_dir is None :
147
146
transform_fn = (
148
147
(raw_data , raw_data_metadata )
149
- | ('Analyze' >> beam_impl .AnalyzeDataset (preprocessing_fn )))
148
+ | ('Analyze' >> tft_beam .AnalyzeDataset (preprocessing_fn )))
150
149
151
150
_ = (
152
151
transform_fn
153
152
| ('WriteTransformFn' >>
154
- transform_fn_io .WriteTransformFn (working_dir )))
153
+ tft_beam .WriteTransformFn (working_dir )))
155
154
else :
156
- transform_fn = pipeline | transform_fn_io .ReadTransformFn (transform_dir )
155
+ transform_fn = pipeline | tft_beam .ReadTransformFn (transform_dir )
157
156
158
157
# Shuffling the data before materialization will improve Training
159
158
# effectiveness downstream.
160
159
shuffled_data = raw_data | 'RandomizeData' >> beam .transforms .Reshuffle ()
161
160
162
161
(transformed_data , transformed_metadata ) = (
163
162
((shuffled_data , raw_data_metadata ), transform_fn )
164
- | 'Transform' >> beam_impl .TransformDataset ())
163
+ | 'Transform' >> tft_beam .TransformDataset ())
165
164
166
165
coder = example_proto_coder .ExampleProtoCoder (transformed_metadata .schema )
167
166
_ = (
0 commit comments