Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/utils/refactoring #93

Merged
merged 5 commits into from
Feb 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ run_all_normal: | run_glide_normal run_gdacs_normal run_dc_normal run_emdat_norm
@echo "Running all normalisation scripts.."

run_all_clean: | run_all_normal
@echo "Running all cleaner scripts.."
@echo "Running all cleaning scripts.."
@poetry run python -m src.utils.splitter

help:
Expand Down
10 changes: 7 additions & 3 deletions src/cerf/data_normalisation_cerf.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@
from src.data_consolidation.dictionary import (
CERF_MAPPING,
)
from src.glide.data_normalisation_glide import (
from src.utils.azure_blob_utils import read_blob_to_dataframe
from src.utils.util import (
change_data_type,
map_and_drop_columns,
normalize_event_type,
)
from src.utils.azure_blob_utils import read_blob_to_dataframe

SCHEMA_PATH_CERF = "./src/cerf/cerf_schema.json"
EVENT_CODE_CSV = "./static_data/event_code_table.csv"
Expand All @@ -38,7 +38,11 @@ def get_iso3_code(country_name: str) -> None:

cleaned1_df["Country_Code"] = cleaned1_df["Country"].apply(get_iso3_code)
cleaned2_df = change_data_type(cleaned1_df, cerf_schema)
cleaned2_df["Date"] = pd.to_datetime(cleaned2_df["Date"], errors="coerce")
cleaned2_df["Date"] = pd.to_datetime(
cleaned2_df["Date"],
errors="coerce",
dayfirst=True,
)
cleaned2_df = normalize_event_type(cleaned2_df, EVENT_CODE_CSV)
schema_order = list(cerf_schema["properties"].keys())
ordered_columns = [col for col in schema_order if col in cleaned2_df.columns]
Expand Down
4 changes: 2 additions & 2 deletions src/disaster_charter/data_normalisation_dc.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@
from src.data_consolidation.dictionary import (
DISASTER_CHARTER_MAPPING,
)
from src.glide.data_normalisation_glide import (
from src.utils.azure_blob_utils import read_blob_to_dataframe
from src.utils.util import (
change_data_type,
map_and_drop_columns,
normalize_event_type,
)
from src.utils.azure_blob_utils import read_blob_to_dataframe

SCHEMA_PATH_DISASTER_CHARTER = "./src/disaster_charter/disaster_charter_schema.json"
BLOB_NAME = (
Expand Down
4 changes: 2 additions & 2 deletions src/emdat/data_normalisation_emdat.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
import pandas as pd

from src.data_consolidation.dictionary import EMDAT_MAPPING
from src.glide.data_normalisation_glide import (
from src.utils.azure_blob_utils import read_blob_to_dataframe
from src.utils.util import (
map_and_drop_columns,
normalize_event_type,
)
from src.utils.azure_blob_utils import read_blob_to_dataframe

EMDAT_INPUT_XLX_BLOB = (
"disaster-impact/raw/emdat/"
Expand Down
13 changes: 9 additions & 4 deletions src/gdacs/data_normalisation_gdacs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@
import pycountry

from src.data_consolidation.dictionary import GDACS_MAPPING
from src.glide.data_normalisation_glide import (
from src.utils.azure_blob_utils import combine_csvs_from_blob_dir
from src.utils.util import (
change_data_type,
map_and_drop_columns,
normalize_event_type,
)
from src.utils.azure_blob_utils import combine_csvs_from_blob_dir

EVENT_CODE_CSV = "./static_data/event_code_table.csv"
COORDINATE_PAIR_LENGTH = 2
SCHEMA_PATH_GDACS = "./src/gdacs/gdacs_schema.json"


def combine_csvs_from_blob(blob_dir: str) -> pd.DataFrame:
Expand Down Expand Up @@ -138,9 +139,9 @@ def get_iso3_from_country_name(country_name: str) -> None:
return df


if __name__ == "__main__":
def main() -> None:
"""Main function to clean the GDACS data and save it to a CSV file."""
blob_dir = "disaster-impact/raw/gdacs/v2/"
SCHEMA_PATH_GDACS = "./src/gdacs/gdacs_schema.json"

gdacs_df_raw = combine_csvs_from_blob(blob_dir)
gdacs_df_raw = split_coordinates(
Expand Down Expand Up @@ -173,3 +174,7 @@ def get_iso3_from_country_name(country_name: str) -> None:
Path("./data_mid_1/gdacs/").mkdir(parents=True, exist_ok=True)
output_file_path = "./data_mid_1/gdacs/gdacs_mid1.csv"
cleaned2_gdacs_df.to_csv(output_file_path, index=False)


if __name__ == "__main__":
main()
103 changes: 5 additions & 98 deletions src/glide/data_normalisation_glide.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@

from src.data_consolidation.dictionary import GLIDE_MAPPING
from src.utils.azure_blob_utils import read_blob_to_dataframe
from src.utils.util import (
change_data_type,
map_and_drop_columns,
normalize_event_type,
)

GLIDE_INPUT_BLOB = "disaster-impact/raw/glide/glide_data_combined_all.csv"
SCHEMA_PATH_GLIDE = "./src/glide/glide_schema.json"
Expand All @@ -19,104 +24,6 @@
glide_schema = json.load(schema_glide)


def map_and_drop_columns(raw_data: pd.DataFrame, dictionary: dict) -> pd.DataFrame:
"""Renames columns in the raw_data DataFrame based.

Args:
raw_data (pd.DataFrame): The input DataFrame with raw data.
dictionary (dict): A dictionary where keys are
the new column names and values are the old column names.

Returns:
pd.DataFrame: A DataFrame with columns renamed and unnecessary columns dropped.
"""
rename_mapping = {value: key for key, value in dictionary.items() if value}
return raw_data[list(rename_mapping.keys())].rename(columns=rename_mapping)


def change_data_type(cleaned1_data: pd.DataFrame, json_schema: dict) -> pd.DataFrame:
"""Change the data types of columns in a DataFrame based on a JSON schema.

Args:
cleaned1_data (pd.DataFrame): The DataFrame with data to be type-casted.
json_schema (dict): The JSON schema defining
the desired data types for each column.

Returns:
pd.DataFrame: The DataFrame with columns cast to the specified data types.
"""
for column, properties in json_schema["properties"].items():
if column in cleaned1_data.columns:
column_type = properties.get("type")
if "array" in column_type:
cleaned1_data[column] = cleaned1_data[column].apply(
lambda x: ",".join(map(str, x))
if isinstance(x, list)
else (str(x) if pd.notna(x) else ""),
)
elif "string" in column_type:
cleaned1_data[column] = cleaned1_data[column].astype(str)
elif "number" in column_type:
cleaned1_data[column] = pd.to_numeric(
cleaned1_data[column],
errors="coerce",
)
elif "integer" in column_type:
cleaned1_data[column] = pd.to_numeric(
cleaned1_data[column],
errors="coerce",
).astype("Int64")
elif "null" in column_type:
cleaned1_data[column] = cleaned1_data[column].where(
cleaned1_data[column].notna(),
None,
)
return cleaned1_data


def normalize_event_type(df: pd.DataFrame, event_code_csv: str) -> pd.DataFrame:
"""Normalizes the Event_Type.

The CSV file is expected to have two columns with headers:
- event_code: the normalized event type key.
- event_name: the event type description.

For each row in `df`, if the standardized Event_Type value matches a
description from the CSV, the corresponding normalized key is stored in a
new column, Event_Code. If no match is found, the original Event_Type value
is retained.

Args:
df (pd.DataFrame): The input DataFrame containing an 'Event_Type' column.
event_code_csv (str): The path to the CSV file containing the event code
mapping.

Returns:
pd.DataFrame: The DataFrame with an additional 'Event_Code' column.
"""
event_mapping_df = pd.read_csv(event_code_csv)
event_mapping_df["event_name"] = (
event_mapping_df["event_name"].str.strip().str.upper()
)
event_mapping_df["event_code"] = event_mapping_df["event_code"].str.strip()
mapping = dict(
zip(
event_mapping_df["event_name"],
event_mapping_df["event_code"],
strict=False,
),
)
df["Event_Code"] = (
df["Event_Type"]
.astype(str)
.str.strip()
.str.upper()
.map(mapping)
.fillna(df["Event_Type"])
)
return df


def main() -> None:
"""Main function to clean the GLIDE data and save it to a CSV file."""
glide_df_raw = read_blob_to_dataframe(GLIDE_INPUT_BLOB)
Expand Down
4 changes: 2 additions & 2 deletions src/idmc/data_normalisation_idmc.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
import pandas as pd

from src.data_consolidation.dictionary import IDMC_MAPPING
from src.glide.data_normalisation_glide import (
from src.utils.azure_blob_utils import read_blob_to_json
from src.utils.util import (
change_data_type,
map_and_drop_columns,
normalize_event_type,
)
from src.utils.azure_blob_utils import read_blob_to_json

SCHEMA_PATH_IDMC = "./src/idmc/idmc_schema.json"
EVENT_CODE_CSV = "./static_data/event_code_table.csv"
Expand Down
4 changes: 2 additions & 2 deletions src/ifrc_eme/data_normalisation_ifrc_eme.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@
import pandas as pd

from src.data_consolidation.dictionary import IFRC_EME_MAPPING
from src.glide.data_normalisation_glide import (
from src.utils.azure_blob_utils import read_blob_to_dataframe
from src.utils.util import (
change_data_type,
map_and_drop_columns,
normalize_event_type,
)
from src.utils.azure_blob_utils import read_blob_to_dataframe

IFRC_EME_INPUT_BLOB = "disaster-impact/raw/ifrc_dref/IFRC_emergencies.csv"
SCHEMA_PATH_IFRC_EME = "./src/ifrc_eme/ifrc_eme_schema.json"
Expand Down
101 changes: 101 additions & 0 deletions src/utils/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
"""Utility functions for the project."""

import pandas as pd


def map_and_drop_columns(raw_data: pd.DataFrame, dictionary: dict) -> pd.DataFrame:
"""Renames columns in the raw_data DataFrame based.

Args:
raw_data (pd.DataFrame): The input DataFrame with raw data.
dictionary (dict): A dictionary where keys are
the new column names and values are the old column names.

Returns:
pd.DataFrame: A DataFrame with columns renamed and unnecessary columns dropped.
"""
rename_mapping = {value: key for key, value in dictionary.items() if value}
return raw_data[list(rename_mapping.keys())].rename(columns=rename_mapping)


def change_data_type(cleaned1_data: pd.DataFrame, json_schema: dict) -> pd.DataFrame:
"""Change the data types of columns in a DataFrame based on a JSON schema.

Args:
cleaned1_data (pd.DataFrame): The DataFrame with data to be type-casted.
json_schema (dict): The JSON schema defining
the desired data types for each column.

Returns:
pd.DataFrame: The DataFrame with columns cast to the specified data types.
"""
for column, properties in json_schema["properties"].items():
if column in cleaned1_data.columns:
column_type = properties.get("type")
if "array" in column_type:
cleaned1_data[column] = cleaned1_data[column].apply(
lambda x: ",".join(map(str, x))
if isinstance(x, list)
else (str(x) if pd.notna(x) else ""),
)
elif "string" in column_type:
cleaned1_data[column] = cleaned1_data[column].astype(str)
elif "number" in column_type:
cleaned1_data[column] = pd.to_numeric(
cleaned1_data[column],
errors="coerce",
)
elif "integer" in column_type:
cleaned1_data[column] = pd.to_numeric(
cleaned1_data[column],
errors="coerce",
).astype("Int64")
elif "null" in column_type:
cleaned1_data[column] = cleaned1_data[column].where(
cleaned1_data[column].notna(),
None,
)
return cleaned1_data


def normalize_event_type(df: pd.DataFrame, event_code_csv: str) -> pd.DataFrame:
"""Normalizes the Event_Type.

The CSV file is expected to have two columns with headers:
- event_code: the normalized event type key.
- event_name: the event type description.

For each row in `df`, if the standardized Event_Type value matches a
description from the CSV, the corresponding normalized key is stored in a
new column, Event_Code. If no match is found, the original Event_Type value
is retained.

Args:
df (pd.DataFrame): The input DataFrame containing an 'Event_Type' column.
event_code_csv (str): The path to the CSV file containing the event code
mapping.

Returns:
pd.DataFrame: The DataFrame with an additional 'Event_Code' column.
"""
event_mapping_df = pd.read_csv(event_code_csv)
event_mapping_df["event_name"] = (
event_mapping_df["event_name"].str.strip().str.upper()
)
event_mapping_df["event_code"] = event_mapping_df["event_code"].str.strip()
mapping = dict(
zip(
event_mapping_df["event_name"],
event_mapping_df["event_code"],
strict=False,
),
)
df["Event_Code"] = (
df["Event_Type"]
.astype(str)
.str.strip()
.str.upper()
.map(mapping)
.fillna(df["Event_Type"])
)
return df