Skip to content

Commit 3716e4b

Browse files
author
Saul Frank
committed
moved to pickle from parquet for Redis transfer
1 parent 7aaaa12 commit 3716e4b

File tree

7 files changed

+10
-8
lines changed

7 files changed

+10
-8
lines changed

.devcontainer/docker-compose.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ services:
5555

5656
# Overrides default command so things don't shut down after the process ends.
5757
command: /bin/sh -c "while sleep 1000; do :; done"
58+
environment:
59+
S3_HOST: "http://minio:9000"
60+
REDIS_HOST: "redis-service"
5861

5962
# Use "forwardPorts" in **devcontainer.json** to forward an app port locally.
6063
# (Adding the "ports" property to this file will not forward from a Codespace.)

.devcontainer/requirements.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ redis==4.3.4
44
pandas==1.5.1
55
boto3==1.24.93
66
pytest==7.1.3
7-
pyarrow==9.0.0
87
nanoid==0.1
98
requests==2.28.1
109
python-dotenv==0.21.0

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[build-system]
2-
requires = [ "setuptools>=61.0", "redis>=1.0.0", "pyarrow>1.0.0", "pandas>1.0.0", "requests>1.0.0",]
2+
requires = [ "setuptools>=61.0", "redis>=1.0.0", "pandas>1.0.0", "requests>1.0.0", "boto3>1.0.0",]
33
build-backend = "setuptools.build_meta"
44

55
[project]

src/dataplane/Microsoft/Sharepoint/sharepoint_download.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ def sharepoint_download(Host, TenantID, ClientID, Secret, SiteName, SharepointF
9595

9696
if ItemID.status_code != 200:
9797
duration = datetime.now() - start
98-
return {"result":"Fail", "reason":"Get upload session", "duration": str(duration), "status": ItemID.status_code, "error": ItemID.json(), "payload": json.dumps(payload), "url": url}
98+
return {"result":"Fail", "reason":"Get download session", "duration": str(duration), "status": ItemID.status_code, "error": ItemID.json(), "payload": json.dumps(payload), "url": url}
9999

100100
ItemID = ItemID.json()
101101

src/dataplane/Microsoft/Sharepoint/sharepoint_upload.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ def sharepoint_upload(Host, TenantID, ClientID, Secret, SiteName, TargetFilePath
135135
upload = requests.put(UploadUrl["uploadUrl"], data=UploadObject, headers=headers, proxies=proxies)
136136
if upload.status_code != 201:
137137
duration = datetime.now() - start
138-
return {"result":"Fail", "reason":"Upload file", "duration": str(duration), "status": upload.status_code, "error": upload.json()}
138+
return {"result":"non 201", "reason":"Upload file", "duration": str(duration), "status": upload.status_code, "response": upload.json()}
139139

140140
duration = datetime.now() - start
141141

src/dataplane/pipelinerun/data_persist/pandas_redis_store.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ def pipeline_pandas_redis_store(StoreKey, DataFrame, Redis, Expire=True, ExpireD
3333
raise Exception("Redis connection failed.")
3434

3535
buffer = io.BytesIO()
36-
DataFrame.to_parquet(buffer, compression='gzip')
36+
DataFrame.to_pickle(buffer, compression='gzip')
3737
buffer.seek(0) # re-set the pointer to the beginning after reading
3838

3939
if Expire:
@@ -69,7 +69,7 @@ def pipeline_pandas_redis_get(StoreKey, Redis):
6969
buffer = io.BytesIO(Redis.get(InsertKey))
7070
buffer.seek(0)
7171
import pandas as pd
72-
df = pd.read_parquet(buffer)
72+
df = pd.read_pickle(buffer,compression='gzip')
7373

7474
duration = datetime.now() - start
7575

src/dataplane/pipelinerun/data_persist/pandas_s3_store.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ def pipeline_pandas_s3_store(StoreKey, DataFrame, S3Client, Bucket, Expire=True,
2323
InsertKey = f"/dataplane-transfer/{EnvID}/" + StoreKey+ "-" +os.getenv("DP_RUNID")+".parquet"
2424

2525
output_buffer=BytesIO()
26-
DataFrame.to_parquet(output_buffer,index=False,compression='gzip',engine='pyarrow',allow_truncated_timestamps=True)
26+
DataFrame.to_pickle(output_buffer,compression='gzip')
2727
S3Client.put_object(Bucket=Bucket,Key=InsertKey,Body=output_buffer.getvalue())
2828

2929
duration = datetime.now() - start
@@ -53,7 +53,7 @@ def pipeline_pandas_s3_get(StoreKey, S3Client, Bucket):
5353
# buffer = BytesIO()
5454
objectGet = S3Client.get_object(Bucket=Bucket, Key=InsertKey, ChecksumMode='ENABLED')["Body"].read()
5555
import pandas as pd
56-
df = pd.read_parquet(BytesIO(objectGet))
56+
df = pd.read_pickle(BytesIO(objectGet),compression='gzip')
5757

5858
duration = datetime.now() - start
5959

0 commit comments

Comments
 (0)