-
Notifications
You must be signed in to change notification settings - Fork 78
/
Copy pathconvert_csv_to_pq.py
executable file
·127 lines (112 loc) · 3.49 KB
/
convert_csv_to_pq.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#!/usr/bin/env python
"""
Convert data from CSV to Parquet files and partition dataset by asset.
A Parquet file partitioned by assets looks like:
```
dst_dir/
year=2021/
month=12/
day=11/
asset=BTC_USDT/
data.parquet
asset=ETH_USDT/
data.parquet
```
Usage sample:
> im_v2/common/data/transform/convert_csv_to_pq.py \
--src_dir 's3://<ck-data>/historical/binance/' \
--dst_dir 's3://<ck-data>/historical/binance_parquet/' \
--datetime_col 'timestamp' \
--asset_col 'currency_pair'
"""
import argparse
import logging
import os
import pandas as pd
import helpers.hdbg as hdbg
import helpers.hparquet as hparque
import helpers.hparser as hparser
import helpers.hs3 as hs3
import im_v2.common.data.transform.transform_utils as imvcdttrut
_LOG = logging.getLogger(__name__)
def _run(args: argparse.Namespace, *, aws_profile: hs3.AwsProfile) -> None:
# Check that the `aws_profile` is valid.
if aws_profile:
_ = hs3.get_s3fs(aws_profile)
files = hs3.listdir(
args.src_dir,
"*.csv*",
only_files=True,
use_relative_paths=True,
aws_profile=aws_profile,
)
_LOG.info("Files found at %s:\n%s", args.src_dir, "\n".join(files))
for file in files:
full_path = os.path.join(args.src_dir, file)
_LOG.debug("Converting %s...", full_path)
df = pd.read_csv(full_path)
# Set datetime index.
reindexed_df = imvcdttrut.reindex_on_datetime(df, args.datetime_col)
# Add date partition columns to the dataframe.
df, partition_cols = hparque.add_date_partition_columns(
reindexed_df, "by_year_month"
)
hparque.to_partitioned_parquet(
df,
[args.asset_col] + partition_cols,
args.dst_dir,
aws_profile=aws_profile,
)
hparque.list_and_merge_pq_files(args.dst_dir, aws_profile=aws_profile)
def _parse() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument(
"--src_dir",
action="store",
type=str,
required=True,
help="Dir with input CSV files to convert to Parquet format",
)
parser.add_argument(
"--dst_dir",
action="store",
type=str,
required=True,
help="Destination dir where to save converted Parquet files",
)
parser.add_argument(
"--datetime_col",
action="store",
type=str,
required=True,
help="Name of column containing datetime information",
)
parser.add_argument(
"--asset_col",
action="store",
type=str,
default=None,
help="Name of column containing asset name for partitioning by asset",
)
parser.add_argument(
"--incremental",
action="store_true",
help="Skip files that have already been converted",
)
parser.add_argument(
"--aws_profile",
action="store",
required=True,
type=str,
help="The AWS profile to use for .aws/credentials or for env vars",
)
parser = hparser.add_verbosity_arg(parser)
return parser
def _main(parser: argparse.ArgumentParser) -> None:
args = parser.parse_args()
hdbg.init_logger(verbosity=args.log_level, use_exec_path=True)
_run(args, aws_profile=args.aws_profile)
if __name__ == "__main__":
_main(_parse())