diff --git a/Makefile b/Makefile index 3d49da5..05cdf32 100644 --- a/Makefile +++ b/Makefile @@ -56,7 +56,7 @@ run_all_normal: | run_glide_normal run_gdacs_normal run_dc_normal run_emdat_norm @echo "Running all normalisation scripts.." run_all_clean: | run_all_normal - @echo "Running all cleaner scripts.." + @echo "Running all cleaning scripts.." @poetry run python -m src.utils.splitter help: diff --git a/src/cerf/data_normalisation_cerf.py b/src/cerf/data_normalisation_cerf.py index 0eb0ea2..ed262ab 100644 --- a/src/cerf/data_normalisation_cerf.py +++ b/src/cerf/data_normalisation_cerf.py @@ -9,12 +9,12 @@ from src.data_consolidation.dictionary import ( CERF_MAPPING, ) -from src.glide.data_normalisation_glide import ( +from src.utils.azure_blob_utils import read_blob_to_dataframe +from src.utils.util import ( change_data_type, map_and_drop_columns, normalize_event_type, ) -from src.utils.azure_blob_utils import read_blob_to_dataframe SCHEMA_PATH_CERF = "./src/cerf/cerf_schema.json" EVENT_CODE_CSV = "./static_data/event_code_table.csv" @@ -38,7 +38,11 @@ def get_iso3_code(country_name: str) -> None: cleaned1_df["Country_Code"] = cleaned1_df["Country"].apply(get_iso3_code) cleaned2_df = change_data_type(cleaned1_df, cerf_schema) - cleaned2_df["Date"] = pd.to_datetime(cleaned2_df["Date"], errors="coerce") + cleaned2_df["Date"] = pd.to_datetime( + cleaned2_df["Date"], + errors="coerce", + dayfirst=True, + ) cleaned2_df = normalize_event_type(cleaned2_df, EVENT_CODE_CSV) schema_order = list(cerf_schema["properties"].keys()) ordered_columns = [col for col in schema_order if col in cleaned2_df.columns] diff --git a/src/disaster_charter/data_normalisation_dc.py b/src/disaster_charter/data_normalisation_dc.py index 4c31e7b..60e0599 100644 --- a/src/disaster_charter/data_normalisation_dc.py +++ b/src/disaster_charter/data_normalisation_dc.py @@ -9,12 +9,12 @@ from src.data_consolidation.dictionary import ( DISASTER_CHARTER_MAPPING, ) -from src.glide.data_normalisation_glide import ( +from src.utils.azure_blob_utils import read_blob_to_dataframe +from src.utils.util import ( change_data_type, map_and_drop_columns, normalize_event_type, ) -from src.utils.azure_blob_utils import read_blob_to_dataframe SCHEMA_PATH_DISASTER_CHARTER = "./src/disaster_charter/disaster_charter_schema.json" BLOB_NAME = ( diff --git a/src/emdat/data_normalisation_emdat.py b/src/emdat/data_normalisation_emdat.py index 1a01096..f21fe3d 100644 --- a/src/emdat/data_normalisation_emdat.py +++ b/src/emdat/data_normalisation_emdat.py @@ -7,11 +7,11 @@ import pandas as pd from src.data_consolidation.dictionary import EMDAT_MAPPING -from src.glide.data_normalisation_glide import ( +from src.utils.azure_blob_utils import read_blob_to_dataframe +from src.utils.util import ( map_and_drop_columns, normalize_event_type, ) -from src.utils.azure_blob_utils import read_blob_to_dataframe EMDAT_INPUT_XLX_BLOB = ( "disaster-impact/raw/emdat/" diff --git a/src/gdacs/data_normalisation_gdacs.py b/src/gdacs/data_normalisation_gdacs.py index 2300bae..004edaf 100644 --- a/src/gdacs/data_normalisation_gdacs.py +++ b/src/gdacs/data_normalisation_gdacs.py @@ -9,15 +9,16 @@ import pycountry from src.data_consolidation.dictionary import GDACS_MAPPING -from src.glide.data_normalisation_glide import ( +from src.utils.azure_blob_utils import combine_csvs_from_blob_dir +from src.utils.util import ( change_data_type, map_and_drop_columns, normalize_event_type, ) -from src.utils.azure_blob_utils import combine_csvs_from_blob_dir EVENT_CODE_CSV = "./static_data/event_code_table.csv" COORDINATE_PAIR_LENGTH = 2 +SCHEMA_PATH_GDACS = "./src/gdacs/gdacs_schema.json" def combine_csvs_from_blob(blob_dir: str) -> pd.DataFrame: @@ -138,9 +139,9 @@ def get_iso3_from_country_name(country_name: str) -> None: return df -if __name__ == "__main__": +def main() -> None: + """Main function to clean the GDACS data and save it to a CSV file.""" blob_dir = "disaster-impact/raw/gdacs/v2/" - SCHEMA_PATH_GDACS = "./src/gdacs/gdacs_schema.json" gdacs_df_raw = combine_csvs_from_blob(blob_dir) gdacs_df_raw = split_coordinates( @@ -173,3 +174,7 @@ def get_iso3_from_country_name(country_name: str) -> None: Path("./data_mid_1/gdacs/").mkdir(parents=True, exist_ok=True) output_file_path = "./data_mid_1/gdacs/gdacs_mid1.csv" cleaned2_gdacs_df.to_csv(output_file_path, index=False) + + +if __name__ == "__main__": + main() diff --git a/src/glide/data_normalisation_glide.py b/src/glide/data_normalisation_glide.py index 3aa50d5..b3b63a9 100644 --- a/src/glide/data_normalisation_glide.py +++ b/src/glide/data_normalisation_glide.py @@ -10,6 +10,11 @@ from src.data_consolidation.dictionary import GLIDE_MAPPING from src.utils.azure_blob_utils import read_blob_to_dataframe +from src.utils.util import ( + change_data_type, + map_and_drop_columns, + normalize_event_type, +) GLIDE_INPUT_BLOB = "disaster-impact/raw/glide/glide_data_combined_all.csv" SCHEMA_PATH_GLIDE = "./src/glide/glide_schema.json" @@ -19,104 +24,6 @@ glide_schema = json.load(schema_glide) -def map_and_drop_columns(raw_data: pd.DataFrame, dictionary: dict) -> pd.DataFrame: - """Renames columns in the raw_data DataFrame based. - - Args: - raw_data (pd.DataFrame): The input DataFrame with raw data. - dictionary (dict): A dictionary where keys are - the new column names and values are the old column names. - - Returns: - pd.DataFrame: A DataFrame with columns renamed and unnecessary columns dropped. - """ - rename_mapping = {value: key for key, value in dictionary.items() if value} - return raw_data[list(rename_mapping.keys())].rename(columns=rename_mapping) - - -def change_data_type(cleaned1_data: pd.DataFrame, json_schema: dict) -> pd.DataFrame: - """Change the data types of columns in a DataFrame based on a JSON schema. - - Args: - cleaned1_data (pd.DataFrame): The DataFrame with data to be type-casted. - json_schema (dict): The JSON schema defining - the desired data types for each column. - - Returns: - pd.DataFrame: The DataFrame with columns cast to the specified data types. - """ - for column, properties in json_schema["properties"].items(): - if column in cleaned1_data.columns: - column_type = properties.get("type") - if "array" in column_type: - cleaned1_data[column] = cleaned1_data[column].apply( - lambda x: ",".join(map(str, x)) - if isinstance(x, list) - else (str(x) if pd.notna(x) else ""), - ) - elif "string" in column_type: - cleaned1_data[column] = cleaned1_data[column].astype(str) - elif "number" in column_type: - cleaned1_data[column] = pd.to_numeric( - cleaned1_data[column], - errors="coerce", - ) - elif "integer" in column_type: - cleaned1_data[column] = pd.to_numeric( - cleaned1_data[column], - errors="coerce", - ).astype("Int64") - elif "null" in column_type: - cleaned1_data[column] = cleaned1_data[column].where( - cleaned1_data[column].notna(), - None, - ) - return cleaned1_data - - -def normalize_event_type(df: pd.DataFrame, event_code_csv: str) -> pd.DataFrame: - """Normalizes the Event_Type. - - The CSV file is expected to have two columns with headers: - - event_code: the normalized event type key. - - event_name: the event type description. - - For each row in `df`, if the standardized Event_Type value matches a - description from the CSV, the corresponding normalized key is stored in a - new column, Event_Code. If no match is found, the original Event_Type value - is retained. - - Args: - df (pd.DataFrame): The input DataFrame containing an 'Event_Type' column. - event_code_csv (str): The path to the CSV file containing the event code - mapping. - - Returns: - pd.DataFrame: The DataFrame with an additional 'Event_Code' column. - """ - event_mapping_df = pd.read_csv(event_code_csv) - event_mapping_df["event_name"] = ( - event_mapping_df["event_name"].str.strip().str.upper() - ) - event_mapping_df["event_code"] = event_mapping_df["event_code"].str.strip() - mapping = dict( - zip( - event_mapping_df["event_name"], - event_mapping_df["event_code"], - strict=False, - ), - ) - df["Event_Code"] = ( - df["Event_Type"] - .astype(str) - .str.strip() - .str.upper() - .map(mapping) - .fillna(df["Event_Type"]) - ) - return df - - def main() -> None: """Main function to clean the GLIDE data and save it to a CSV file.""" glide_df_raw = read_blob_to_dataframe(GLIDE_INPUT_BLOB) diff --git a/src/idmc/data_normalisation_idmc.py b/src/idmc/data_normalisation_idmc.py index 1db47db..11bacf6 100644 --- a/src/idmc/data_normalisation_idmc.py +++ b/src/idmc/data_normalisation_idmc.py @@ -8,12 +8,12 @@ import pandas as pd from src.data_consolidation.dictionary import IDMC_MAPPING -from src.glide.data_normalisation_glide import ( +from src.utils.azure_blob_utils import read_blob_to_json +from src.utils.util import ( change_data_type, map_and_drop_columns, normalize_event_type, ) -from src.utils.azure_blob_utils import read_blob_to_json SCHEMA_PATH_IDMC = "./src/idmc/idmc_schema.json" EVENT_CODE_CSV = "./static_data/event_code_table.csv" diff --git a/src/ifrc_eme/data_normalisation_ifrc_eme.py b/src/ifrc_eme/data_normalisation_ifrc_eme.py index df368b2..079c1bb 100644 --- a/src/ifrc_eme/data_normalisation_ifrc_eme.py +++ b/src/ifrc_eme/data_normalisation_ifrc_eme.py @@ -10,12 +10,12 @@ import pandas as pd from src.data_consolidation.dictionary import IFRC_EME_MAPPING -from src.glide.data_normalisation_glide import ( +from src.utils.azure_blob_utils import read_blob_to_dataframe +from src.utils.util import ( change_data_type, map_and_drop_columns, normalize_event_type, ) -from src.utils.azure_blob_utils import read_blob_to_dataframe IFRC_EME_INPUT_BLOB = "disaster-impact/raw/ifrc_dref/IFRC_emergencies.csv" SCHEMA_PATH_IFRC_EME = "./src/ifrc_eme/ifrc_eme_schema.json" diff --git a/src/utils/util.py b/src/utils/util.py new file mode 100644 index 0000000..0731cfe --- /dev/null +++ b/src/utils/util.py @@ -0,0 +1,101 @@ +"""Utility functions for the project.""" + +import pandas as pd + + +def map_and_drop_columns(raw_data: pd.DataFrame, dictionary: dict) -> pd.DataFrame: + """Renames columns in the raw_data DataFrame based. + + Args: + raw_data (pd.DataFrame): The input DataFrame with raw data. + dictionary (dict): A dictionary where keys are + the new column names and values are the old column names. + + Returns: + pd.DataFrame: A DataFrame with columns renamed and unnecessary columns dropped. + """ + rename_mapping = {value: key for key, value in dictionary.items() if value} + return raw_data[list(rename_mapping.keys())].rename(columns=rename_mapping) + + +def change_data_type(cleaned1_data: pd.DataFrame, json_schema: dict) -> pd.DataFrame: + """Change the data types of columns in a DataFrame based on a JSON schema. + + Args: + cleaned1_data (pd.DataFrame): The DataFrame with data to be type-casted. + json_schema (dict): The JSON schema defining + the desired data types for each column. + + Returns: + pd.DataFrame: The DataFrame with columns cast to the specified data types. + """ + for column, properties in json_schema["properties"].items(): + if column in cleaned1_data.columns: + column_type = properties.get("type") + if "array" in column_type: + cleaned1_data[column] = cleaned1_data[column].apply( + lambda x: ",".join(map(str, x)) + if isinstance(x, list) + else (str(x) if pd.notna(x) else ""), + ) + elif "string" in column_type: + cleaned1_data[column] = cleaned1_data[column].astype(str) + elif "number" in column_type: + cleaned1_data[column] = pd.to_numeric( + cleaned1_data[column], + errors="coerce", + ) + elif "integer" in column_type: + cleaned1_data[column] = pd.to_numeric( + cleaned1_data[column], + errors="coerce", + ).astype("Int64") + elif "null" in column_type: + cleaned1_data[column] = cleaned1_data[column].where( + cleaned1_data[column].notna(), + None, + ) + return cleaned1_data + + +def normalize_event_type(df: pd.DataFrame, event_code_csv: str) -> pd.DataFrame: + """Normalizes the Event_Type. + + The CSV file is expected to have two columns with headers: + - event_code: the normalized event type key. + - event_name: the event type description. + + For each row in `df`, if the standardized Event_Type value matches a + description from the CSV, the corresponding normalized key is stored in a + new column, Event_Code. If no match is found, the original Event_Type value + is retained. + + Args: + df (pd.DataFrame): The input DataFrame containing an 'Event_Type' column. + event_code_csv (str): The path to the CSV file containing the event code + mapping. + + Returns: + pd.DataFrame: The DataFrame with an additional 'Event_Code' column. + """ + event_mapping_df = pd.read_csv(event_code_csv) + event_mapping_df["event_name"] = ( + event_mapping_df["event_name"].str.strip().str.upper() + ) + event_mapping_df["event_code"] = event_mapping_df["event_code"].str.strip() + mapping = dict( + zip( + event_mapping_df["event_name"], + event_mapping_df["event_code"], + strict=False, + ), + ) + df["Event_Code"] = ( + df["Event_Type"] + .astype(str) + .str.strip() + .str.upper() + .map(mapping) + .fillna(df["Event_Type"]) + ) + return df