Skip to content

[Bug]: Python TypeError when converting Avro logicalType timestamp-millis to Beam Schema #31656

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
1 of 16 tasks
gergely-g opened this issue Jun 20, 2024 · 4 comments · May be fixed by #34603
Open
1 of 16 tasks

[Bug]: Python TypeError when converting Avro logicalType timestamp-millis to Beam Schema #31656

gergely-g opened this issue Jun 20, 2024 · 4 comments · May be fixed by #34603
Assignees

Comments

@gergely-g
Copy link

What happened?

When importing Avro files that have schemas with the field type:

{"type": "long", "logicalType": "timestamp-millis"}

an attempt to convert the collection to Beam Schemas (beam.Row) will fail with a TypeError in RowCoderImpl in the form:

apache_beam/coders/coder_impl.py:209: in apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
    ???
apache_beam/coders/coder_impl.py:248: in apache_beam.coders.coder_impl.StreamCoderImpl.estimate_size
    ???
apache_beam/coders/coder_impl.py:1824: in apache_beam.coders.coder_impl.RowCoderImpl.encode_to_stream
    ???
E   TypeError: an integer is required [while running 'ReadFromAvro/Map(<lambda at avroio.py:633>)']

Avro files with timestamp-millis logical types are created for example when a BigQuery table data with column type TIMESTAMP gets exported to .avro files.

Unit test to repro the issue:

import datetime
import tempfile
import unittest

import apache_beam as beam
import fastavro


class TestBeamSchemaConversions(unittest.TestCase):
    def test_convert_timestamp_millis(self):
        """Demonstrate bug: Avro-to-Beam schema conversion cannot handle timestamp-millis logical type."""
        avro_schema = {
            "type": "record",
            "name": "Test",
            "fields": [
                {"name": "name", "type": "string"},
                {"name": "timestamp", "type": {"type": "long", "logicalType": "timestamp-millis"}},
            ],
        }

        # Write test records into a temp file using fastavro.
        with tempfile.NamedTemporaryFile(delete=True) as input_avro_file:
            fastavro.writer(
                input_avro_file,
                avro_schema,
                [
                    {"name": "Alice", "timestamp": datetime.datetime(2024, 6, 3, 14, 14, 4, 765000, tzinfo=datetime.timezone.utc)},
                    {"name": "Bob", "timestamp": datetime.datetime(2024, 6, 3, 14, 14, 4, 765000, tzinfo=datetime.timezone.utc)},
                ],
                validator=True,
            )
            input_avro_file.flush()

            with self.assertRaises(TypeError) as context:
                with beam.Pipeline() as p:
                    avro_records = p | beam.io.ReadFromAvro(input_avro_file.name, as_rows=True)
                    avro_records | beam.LogElements()

            self.assertTrue("an integer is required" in str(context.exception))

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@gergely-g
Copy link
Author

Part of the solution could be in avro_value_to_beam_value()

Checking for the beam_type matching this FieldType:

    logical_type {
        urn: "beam:logical_type:millis_instant:v1"
        representation {
            atomic_type: INT64
        }
    }

The converter could return Timestamp.of to convert from datetime.datetime

@mls3odp
Copy link
Contributor

mls3odp commented Jul 1, 2024

.take-issue

@mls3odp mls3odp removed their assignment Jul 1, 2024
@mls3odp
Copy link
Contributor

mls3odp commented Jul 1, 2024

.take-issue

@mls3odp mls3odp removed their assignment Jul 5, 2024
@DKER2
Copy link
Contributor

DKER2 commented Apr 9, 2025

.take-issue

@DKER2 DKER2 linked a pull request Apr 10, 2025 that will close this issue
3 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants