Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CSV ADRIO time-series correction #124

Merged
merged 3 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 76 additions & 26 deletions doc/devlog/2024-06-12.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@
},
{
"cell_type": "code",
"execution_count": 1,
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from datetime import date\n",
"from pathlib import Path\n",
"\n",
"from numpy import array_equal\n",
"from datetime import datetime\n",
"\n",
"from epymorph.data_shape import Shapes\n",
"from epymorph.geo.adrio import adrio_maker_library\n",
Expand All @@ -34,7 +35,7 @@
"from epymorph.geography.us_census import (STATE, CountyScope, StateScope,\n",
" get_us_counties, get_us_states)\n",
"from epymorph.simulation import geo_attrib\n",
"from pandas import DataFrame, concat\n",
"from pandas import DataFrame, concat, read_csv\n",
"\n",
"# create and store 'pei_population.csv'\n",
"census_maker = ADRIOMakerCensus()\n",
Expand All @@ -48,7 +49,7 @@
},
{
"cell_type": "code",
"execution_count": 2,
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
Expand All @@ -71,7 +72,7 @@
},
{
"cell_type": "code",
"execution_count": 3,
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
Expand All @@ -85,7 +86,7 @@
},
{
"cell_type": "code",
"execution_count": 4,
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
Expand All @@ -95,8 +96,6 @@
"states_list = ['04', '08', '49', '35', '32']\n",
"population_2015 = census_maker.make_adrio(geo_attrib(\n",
" 'population_by_age', int, Shapes.NxA(3)), CountyScope.in_states(states_list), Year(2015)).get_value()\n",
"population_2016 = census_maker.make_adrio(geo_attrib(\n",
" 'population_by_age', int, Shapes.NxA(3)), CountyScope.in_states(states_list), Year(2016)).get_value()\n",
"\n",
"# get county and state info from shapefiles and convert to dataframes\n",
"counties_info = get_us_counties(2010)\n",
Expand All @@ -112,11 +111,8 @@
"merged_df = merged_df.loc[merged_df['state_geoid'].isin(states_list)]\n",
"\n",
"# create and merge dataframes to be converted to csvs\n",
"df_2015 = DataFrame({'Date': [date(2015, 1, 1) for i in merged_df.index], 'County': merged_df['county_name'], 'Young': [\n",
" pop[0] for pop in population_2015], 'Adult': [pop[1] for pop in population_2015], 'Elderly': [pop[2] for pop in population_2015]})\n",
"df_2016 = DataFrame({'Date': [date(2016, 1, 1) for i in merged_df.index], 'County': merged_df['county_name'], 'Young': [\n",
" pop[0] for pop in population_2016], 'Adult': [pop[1] for pop in population_2016], 'Elderly': [pop[2] for pop in population_2016]})\n",
"df = concat([df_2015, df_2016])\n",
"df = DataFrame({'Date': [date(2015, 1, 1) for i in merged_df.index], 'County': merged_df['county_name'], 'Young': [\n",
" pop[0] for pop in population_2015], 'Adult': [pop[1] for pop in population_2015], 'Elderly': [pop[2] for pop in population_2015]})\n",
"\n",
"# sort incorrectly and store as csv\n",
"df.sort_values('Young', inplace=True)\n",
Expand All @@ -125,7 +121,7 @@
},
{
"cell_type": "code",
"execution_count": 5,
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
Expand All @@ -143,20 +139,20 @@
" source={\n",
" 'label': 'Census:name',\n",
" 'population': 'Census',\n",
" 'population_0-19': CSVSpecTime(file_path=Path(\"./scratch/us_sw_counties_population.csv\"),\n",
" time_col=0, key_col=1, data_col=2, key_type=\"county_state\", skiprows=1),\n",
" 'population_20-64': CSVSpecTime(file_path=Path(\"./scratch/us_sw_counties_population.csv\"),\n",
" time_col=0, key_col=1, data_col=3, key_type=\"county_state\", skiprows=1),\n",
" 'population_65+': CSVSpecTime(file_path=Path(\"./scratch/us_sw_counties_population.csv\"),\n",
" time_col=0, key_col=1, data_col=4, key_type=\"county_state\", skiprows=1),\n",
" 'population_0-19': CSVSpec(file_path=Path(\"./scratch/us_sw_counties_population.csv\"),\n",
" key_col=1, data_col=2, key_type=\"county_state\", skiprows=1),\n",
" 'population_20-64': CSVSpec(file_path=Path(\"./scratch/us_sw_counties_population.csv\"),\n",
" key_col=1, data_col=3, key_type=\"county_state\", skiprows=1),\n",
" 'population_65+': CSVSpec(file_path=Path(\"./scratch/us_sw_counties_population.csv\"),\n",
" key_col=1, data_col=4, key_type=\"county_state\", skiprows=1),\n",
" 'population_by_age': 'Census'\n",
" }\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 6,
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
Expand All @@ -176,7 +172,61 @@
},
{
"cell_type": "code",
"execution_count": 7,
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# create and store 'vaccination_time_series.csv'\n",
"fips = '\\'' + '\\',\\''.join(['08001', '35001', '04013', '04017']) + '\\''\n",
"url = f\"https://data.cdc.gov/resource/8xkx-amqh.csv?$select=date,fips,series_complete_yes&$where=fips%20in({fips})&$limit=1962781\"\n",
"df = read_csv(url, dtype={'fips': str})\n",
"\n",
"df['date'] = [datetime.fromisoformat(\n",
" week.replace('/', '-')).date() for week in df['date']]\n",
"\n",
"df = df[df['date'] >= date(2021, 1, 1)]\n",
"df = df[df['date'] <= date(2021, 12, 31)]\n",
"\n",
"df.to_csv('./scratch/vaccination_time_series.csv', index=False)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"spec = DynamicGeoSpec(\n",
" attributes=[\n",
" geo_attrib('label', str, Shapes.N),\n",
" geo_attrib('population', int, Shapes.N),\n",
" geo_attrib('vaccinations', int, Shapes.TxN),\n",
" ],\n",
" time_period=Year(2021),\n",
" scope=CountyScope.in_counties(['08001', '04013', '35001']),\n",
" source={\n",
" 'label': 'Census:name',\n",
" 'population': 'Census',\n",
" 'vaccinations': CSVSpecTime(file_path=Path(\"./scratch/vaccination_time_series.csv\"),\n",
" time_col=0, key_col=1, data_col=2, key_type=\"geoid\", skiprows=1),\n",
" }\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"geo = DynamicGeo.from_library(spec, adrio_maker_library)\n",
"\n",
"geo.validate()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
Expand All @@ -186,15 +236,15 @@
"df['res_geoid'] = df['res_state_code'] + df['res_county_code']\n",
"df['wrk_geoid'] = df['wrk_state_code'].apply(lambda x: x[1:]) + df['wrk_county_code']\n",
"\n",
"df.to_csv('./scratch/counties_commuters_2020.csv',\n",
" columns=['res_geoid', 'wrk_geoid', 'workers'], index=False)\n",
"df.sort_values(by='workers', inplace=True)\n",
"\n",
"df.sort_values(by='workers', inplace=True)"
"df.to_csv('./scratch/counties_commuters_2020.csv',\n",
" columns=['res_geoid', 'wrk_geoid', 'workers'], index=False)"
]
},
{
"cell_type": "code",
"execution_count": 8,
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
Expand All @@ -219,7 +269,7 @@
},
{
"cell_type": "code",
"execution_count": 9,
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
Expand Down
74 changes: 40 additions & 34 deletions epymorph/geo/adrio/file/adrio_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from numpy.typing import NDArray
from pandas import DataFrame, Series, read_csv

from epymorph.error import DataResourceException
from epymorph.error import DataResourceException, GeoValidationException
from epymorph.geo.adrio.adrio import ADRIO, ADRIOMaker
from epymorph.geo.spec import AttributeDef, SpecificTimePeriod, TimePeriod
from epymorph.geography.scope import GeoScope
Expand Down Expand Up @@ -53,49 +53,64 @@ class CSVSpecMatrix(_BaseCSVSpecMatrix):
"""Dataclass to store parameters for CSV ADRIO with data shape NxN."""


@dataclass
class CSVSpecMatrixTime(_BaseCSVSpecMatrix):
"""Dataclass to store parameters for time-series CSV ADRIO with data shape TxNxN."""
time_col: int


class ADRIOMakerCSV(ADRIOMaker):
@staticmethod
def accepts_source(source: Any) -> bool:
if isinstance(source, CSVSpec | CSVSpecTime | CSVSpecMatrix | CSVSpecMatrixTime):
if isinstance(source, CSVSpec | CSVSpecTime | CSVSpecMatrix):
return True
else:
return False

def make_adrio(self, attrib: AttributeDef, scope: GeoScope, time_period: TimePeriod, spec: CSVSpec | CSVSpecTime | CSVSpecMatrix | CSVSpecMatrixTime) -> ADRIO:
def make_adrio(self, attrib: AttributeDef, scope: GeoScope, time_period: TimePeriod, spec: CSVSpec | CSVSpecTime | CSVSpecMatrix) -> ADRIO:
if isinstance(spec, CSVSpec | CSVSpecTime):
return self._make_single_column_adrio(attrib, scope, time_period, spec)
else:
return self._make_matrix_adrio(attrib, scope, time_period, spec)
return self._make_matrix_adrio(attrib, scope, spec)

def _make_single_column_adrio(self, attrib: AttributeDef, scope: GeoScope, time_period: TimePeriod, spec: CSVSpec | CSVSpecTime) -> ADRIO:
"""Makes an ADRIO to fetch data from a single relevant column in a .csv file."""
def fetch() -> NDArray:
df = self._load_from_file(spec, time_period, scope)
if spec.key_col == spec.data_col:
msg = "Key column and data column must not be the same."
raise GeoValidationException(msg)

df.rename(columns={spec.key_col: 'key'}, inplace=True)
df.sort_values(by='key', inplace=True)

data_values = df[spec.data_col]
def fetch() -> NDArray:
df = self._load_from_file(spec, scope)

# check for null values (missing data in file)
if data_values.isnull().any():
if df[spec.data_col].isnull().any():
msg = f"Data for required geographies missing from {attrib.name} attribute file or could not be found."
raise DataResourceException(msg)

return df[spec.data_col].to_numpy(dtype=attrib.dtype)
if isinstance(spec, CSVSpec):
df.rename(columns={spec.key_col: 'key'}, inplace=True)
df.sort_values(by='key', inplace=True)
return df[spec.data_col].to_numpy(dtype=attrib.dtype)
else:
if not isinstance(time_period, SpecificTimePeriod):
raise GeoValidationException("Unsupported time period.")

df[spec.time_col] = df[spec.time_col].apply(date.fromisoformat)

if any(df[spec.time_col] < time_period.start_date) or any(df[spec.time_col] > time_period.end_date):
msg = "Found time column value(s) outside of geo's date range."
raise DataResourceException(msg)

df.rename(columns={spec.key_col: 'key', spec.data_col: 'data',
spec.time_col: 'time'}, inplace=True)
df.sort_values(by=['time', 'key'], inplace=True)
df = df.pivot(index='time', columns='key', values='data')
return df.to_numpy(dtype=attrib.dtype)

return ADRIO(attrib.name, fetch)

def _make_matrix_adrio(self, attrib: AttributeDef, scope: GeoScope, time_period: TimePeriod, spec: CSVSpecMatrix | CSVSpecMatrixTime) -> ADRIO:
def _make_matrix_adrio(self, attrib: AttributeDef, scope: GeoScope, spec: CSVSpecMatrix) -> ADRIO:
"""Makes an ADRIO to fetch data from a single column within a .csv file and converts it to matrix format."""
if len(set([spec.from_key_col, spec.to_key_col, spec.data_col])) != 3:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer set construction syntax here: {item1, item2, item3} instead of set([item1, item2, item3])

msg = "From key column, to key column, and data column must all be unique."
raise GeoValidationException(msg)

def fetch() -> NDArray:
df = self._load_from_file(spec, time_period, scope)
df = self._load_from_file(spec, scope)

df = df.pivot(index=spec.from_key_col, columns=spec.to_key_col,
values=spec.data_col)
Expand All @@ -109,7 +124,7 @@ def fetch() -> NDArray:

return ADRIO(attrib.name, fetch)

def _load_from_file(self, spec: CSVSpec | CSVSpecTime | CSVSpecMatrix | CSVSpecMatrixTime, time_period: TimePeriod, scope: GeoScope) -> DataFrame:
def _load_from_file(self, spec: CSVSpec | CSVSpecTime | CSVSpecMatrix, scope: GeoScope) -> DataFrame:
"""
Loads .csv at path location into a pandas DataFrame, filtering out data outside of the specified
geographic scope and time period.
Expand All @@ -131,15 +146,6 @@ def _load_from_file(self, spec: CSVSpec | CSVSpecTime | CSVSpecMatrix | CSVSpecM
df = read_csv(path, header=None, dtype={
spec.from_key_col: str, spec.to_key_col: str})

if isinstance(spec, CSVSpecTime | CSVSpecMatrixTime):
df[spec.time_col] = df[spec.time_col].apply(date.fromisoformat)

if isinstance(time_period, SpecificTimePeriod):
df = df.loc[df[spec.time_col] >= time_period.start_date]
df = df.loc[df[spec.time_col] < time_period.end_date]
else:
raise DataResourceException("Unsupported time period.")

if isinstance(spec, CSVSpec | CSVSpecTime):
df = self._parse_label(spec.key_type, scope, df, spec.key_col)
else:
Expand Down Expand Up @@ -182,9 +188,9 @@ def _parse_abbrev(self, scope: GeoScope, df: DataFrame, key_col: int, key_col2:
df[key_col] = [state_mapping.get(x) for x in df[key_col]]
if df[key_col].isnull().any():
raise DataResourceException("Invalid state code in key column.")
df = df.loc[df[key_col].isin(scope.get_node_ids())]
df = df[df[key_col].isin(scope.get_node_ids())]
if key_col2 is not None:
df = df.loc[df[key_col2].isin(scope.get_node_ids())]
df = df[df[key_col2].isin(scope.get_node_ids())]
return df

else:
Expand Down Expand Up @@ -239,9 +245,9 @@ def _parse_geoid(self, scope: GeoScope, df: DataFrame, key_col: int, key_col2: i
if not all(granularity.matches(x) for x in df[key_col]):
raise DataResourceException("Invalid geoid in key column.")

df = df.loc[df[key_col].isin(scope.get_node_ids())]
df = df[df[key_col].isin(scope.get_node_ids())]
if key_col2 is not None:
df = df.loc[df[key_col2].isin(scope.get_node_ids())]
df = df[df[key_col2].isin(scope.get_node_ids())]

return df

Expand Down
Loading