Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add dtype_backend=pyarrow to to_pandas #484

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,11 +406,16 @@ def reorder_levels(self, ids: typing.Sequence[str]):
level_names = [self.col_id_to_index_name[index_id] for index_id in ids]
return Block(self.expr, ids, self.column_labels, level_names)

def _to_dataframe(self, result) -> pd.DataFrame:
def _to_dataframe(
self, result, dtype_backend: Literal["default", "pyarrow"] = "default"
) -> pd.DataFrame:
"""Convert BigQuery data to pandas DataFrame with specific dtypes."""

dtypes = dict(zip(self.index_columns, self.index.dtypes))
dtypes.update(zip(self.value_columns, self.dtypes))
return self.session._rows_to_dataframe(result, dtypes)
return self.session._rows_to_dataframe(
result, dtypes, dtype_backend=dtype_backend
)

def to_pandas(
self,
Expand All @@ -419,6 +424,7 @@ def to_pandas(
random_state: Optional[int] = None,
*,
ordered: bool = True,
dtype_backend: Literal["default", "pyarrow"] = "default",
) -> Tuple[pd.DataFrame, bigquery.QueryJob]:
"""Run query and download results as a pandas DataFrame."""
if (sampling_method is not None) and (sampling_method not in _SAMPLING_METHODS):
Expand All @@ -438,7 +444,8 @@ def to_pandas(
df, query_job = self._materialize_local(
materialize_options=MaterializationOptions(
downsampling=sampling, ordered=ordered
)
),
dtype_backend=dtype_backend,
)
df.set_axis(self.column_labels, axis=1, copy=False)
return df, query_job
Expand Down Expand Up @@ -478,7 +485,9 @@ def _copy_index_to_pandas(self, df: pd.DataFrame):
df.columns = self.column_labels

def _materialize_local(
self, materialize_options: MaterializationOptions = MaterializationOptions()
self,
materialize_options: MaterializationOptions = MaterializationOptions(),
dtype_backend: Literal["default", "pyarrow"] = "default",
) -> Tuple[pd.DataFrame, bigquery.QueryJob]:
"""Run query and download results as a pandas DataFrame. Return the total number of results as well."""
# TODO(swast): Allow for dry run and timeout.
Expand Down Expand Up @@ -530,7 +539,10 @@ def _materialize_local(
)
else:
total_rows = results_iterator.total_rows
df = self._to_dataframe(results_iterator)
df = self._to_dataframe(
results_iterator,
dtype_backend=dtype_backend,
)
self._copy_index_to_pandas(df)

return df, query_job
Expand Down
7 changes: 7 additions & 0 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1038,6 +1038,7 @@ def to_pandas(
random_state: Optional[int] = None,
*,
ordered: bool = True,
dtype_backend: Literal["default", "pyarrow"] = "default",
) -> pandas.DataFrame:
"""Write DataFrame to pandas DataFrame.

Expand All @@ -1060,6 +1061,11 @@ def to_pandas(
ordered (bool, default True):
Determines whether the resulting pandas dataframe will be deterministically ordered.
In some cases, unordered may result in a faster-executing query.
dtype_backend (str, default "default"):
Controls dtypes returns. Options include:

* ``"default"``: a mix of dtypes, optimizing correctness and compatibility.
* ``"pyarrow"``: pyarrow-backed ArrowDtype for all columns.

Returns:
pandas.DataFrame: A pandas DataFrame with all rows and columns of this DataFrame if the
Expand All @@ -1073,6 +1079,7 @@ def to_pandas(
sampling_method=sampling_method,
random_state=random_state,
ordered=ordered,
dtype_backend=dtype_backend,
)
self._set_internal_query_job(query_job)
return df.set_axis(self._block.column_labels, axis=1, copy=False)
Expand Down
7 changes: 7 additions & 0 deletions bigframes/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ def to_pandas(
random_state: Optional[int] = None,
*,
ordered: bool = True,
dtype_backend: Literal["default", "pyarrow"] = "default",
) -> pandas.Series:
"""Writes Series to pandas Series.

Expand All @@ -317,6 +318,11 @@ def to_pandas(
ordered (bool, default True):
Determines whether the resulting pandas series will be deterministically ordered.
In some cases, unordered may result in a faster-executing query.
dtype_backend (str, default "default"):
Controls dtypes returns. Options include:

* ``"default"``: a mix of dtypes, optimizing correctness and compatibility.
* ``"pyarrow"``: pyarrow-backed ArrowDtype for all columns.


Returns:
Expand All @@ -328,6 +334,7 @@ def to_pandas(
sampling_method=sampling_method,
random_state=random_state,
ordered=ordered,
dtype_backend=dtype_backend,
)
self._set_internal_query_job(query_job)
series = df.squeeze(axis=1)
Expand Down
18 changes: 16 additions & 2 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1822,10 +1822,24 @@ def _get_table_size(self, destination_table):
return table.num_bytes

def _rows_to_dataframe(
self, row_iterator: bigquery.table.RowIterator, dtypes: Dict
self,
row_iterator: bigquery.table.RowIterator,
dtypes: Dict,
dtype_backend: Literal["default", "pyarrow"] = "default",
) -> pandas.DataFrame:
arrow_table = row_iterator.to_arrow()
return bigframes.session._io.pandas.arrow_to_pandas(arrow_table, dtypes)

if dtype_backend == "default":
return bigframes.session._io.pandas.arrow_to_pandas(arrow_table, dtypes)
elif dtype_backend == "pyarrow":
return pandas.DataFrame(
{
name: pandas.Series(value, dtype=pandas.ArrowDtype(value.type))
for name, value in zip(arrow_table.column_names, arrow_table)
}
)
else:
raise ValueError(f"got unexpected dtype_backend={repr(dtype_backend)}")

def _start_generic_job(self, job: formatting_helpers.GenericJob):
if bigframes.options.display.progress_bar is not None:
Expand Down
3 changes: 2 additions & 1 deletion bigframes/session/_io/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ def _arrow_to_pandas_arrowdtype(


def arrow_to_pandas(
arrow_table: Union[pyarrow.Table, pyarrow.RecordBatch], dtypes: Dict
arrow_table: Union[pyarrow.Table, pyarrow.RecordBatch],
dtypes: Dict,
):
if len(dtypes) != arrow_table.num_columns:
raise ValueError(
Expand Down
11 changes: 11 additions & 0 deletions tests/system/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,17 @@ def scalars_pandas_df_index(
return scalars_pandas_df_default_index.set_index("rowindex").sort_index()


@pytest.fixture(scope="session")
def scalars_pandas_df_index_pyarrow() -> pd.DataFrame:
"""pd.DataFrame pointing at test data."""

if pd.__version__.startswith("1."):
pytest.skip("dtype_backend='pyarrow' not supported in pandas 1.x")

df = pd.read_json(DATA_DIR / "scalars.jsonl", lines=True, dtype_backend="pyarrow")
return df.set_index("rowindex").sort_index()


@pytest.fixture(scope="session")
def scalars_pandas_df_multi_index(
scalars_pandas_df_default_index: pd.DataFrame,
Expand Down