Skip to content

Commit

Permalink
Make commuter adrio refactor. (#120)
Browse files Browse the repository at this point in the history
Simplification of ACS5 commuting flows code in ADRIOMakerCensus.
  • Loading branch information
TJohnsonAZ authored Jun 20, 2024
1 parent e399d5c commit c0a31c3
Showing 1 changed file with 20 additions and 70 deletions.
90 changes: 20 additions & 70 deletions epymorph/geo/adrio/census/adrio_census.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,6 @@ def fetch_acs5(self, variables: list[str], scope: CensusScope, year: int) -> Dat

def fetch_sf(self, scope: CensusScope) -> GeoDataFrame:
"""Utility function to fetch shape files from Census for specified regions."""

# call appropriate pygris function based on granularity and sort result
match scope:
case StateScopeAll() | StateScope():
Expand All @@ -216,16 +215,14 @@ def fetch_sf(self, scope: CensusScope) -> GeoDataFrame:

df = df.rename(columns={'GEOID': 'geoid'})

df = df.loc[df['geoid'].isin(scope.get_node_ids())]
df = df[df['geoid'].isin(scope.get_node_ids())]

return GeoDataFrame(df)

def fetch_commuters(self, scope: CensusScope, year: int) -> DataFrame:
"""
Utility function to fetch commuting data from .xslx format filtered down to requested regions.
"""
"""Utility function to fetch commuting data from .xslx format filtered down to requested regions."""
# check for invalid granularity
if isinstance(scope, TractScope) or isinstance(scope, BlockGroupScope):
if isinstance(scope, TractScope | BlockGroupScope):
msg = "Commuting data cannot be retrieved for tract or block group granularities"
raise DataResourceException(msg)

Expand Down Expand Up @@ -273,33 +270,24 @@ def fetch_commuters(self, scope: CensusScope, year: int) -> DataFrame:
data = read_excel(url, header=header_num, names=all_fields, dtype={
'res_state_code': str, 'wrk_state_code': str, 'res_county_code': str, 'wrk_county_code': str})

match scope:
case StateScopeAll():
# remove nodes not in acs5 data for all states case
data = data.loc[data['res_state_code'].isin(scope.get_node_ids())]
data = data.loc[data['wrk_state_code'].isin(
'0' + x for x in scope.get_node_ids())]
node_ids = scope.get_node_ids()
match scope.granularity:
case 'state':
data.rename(columns={'res_state_code': 'res_geoid',
'wrk_state_code': 'wrk_geoid'}, inplace=True)

case StateScope('state', includes) | CountyScope('state', includes):
states = list(includes)
data = data.loc[data['res_state_code'].isin(states)]

for state in range(len(states)):
states[state] = states[state].zfill(3)
data = data.loc[data['wrk_state_code'].isin(states)]

case CountyScope('county', includes):
data['res_county_full'] = data['res_state_code'] + \
case 'county':
data['res_geoid'] = data['res_state_code'] + \
data['res_county_code']
data['wrk_county_full'] = data['wrk_state_code'] + \
data['wrk_geoid'] = data['wrk_state_code'] + \
data['wrk_county_code']
data = data.loc[data['res_county_full'].isin(includes)]
data = data.loc[data['wrk_county_full'].isin(
['0' + x for x in includes])]

case _:
raise DataResourceException("Unsupported query.")

data = data[data['res_geoid'].isin(node_ids)]
data = data[data['wrk_geoid'].isin(['0' + x for x in node_ids])]

return data

def make_acs5_queries(self, scope: CensusScope) -> list[dict[str, str]]:
Expand Down Expand Up @@ -581,56 +569,18 @@ def _make_commuter_adrio(self, scope: CensusScope, year: int) -> ADRIO:
"""Makes an ADRIO to retrieve ACS commuting flow data."""
def fetch() -> NDArray:
df = self.fetch_commuters(scope, year)
# state level
if isinstance(scope, StateScope) or isinstance(scope, StateScopeAll):
# get unique state identifier
unique_states = ('0' + df['res_state_code']).unique()
state_len = np.count_nonzero(unique_states)

# create dictionary to be used as array indices
states_enum = enumerate(unique_states)
states_dict = dict((y, x) for x, y in states_enum)

if isinstance(scope, StateScope | StateScopeAll):
# group and aggregate data
data_group = df.groupby(['res_state_code', 'wrk_state_code'])
data_group = df.groupby(['res_geoid', 'wrk_geoid'])
df = data_group.agg({'workers': 'sum'})
df = df.reset_index()

# create and return array for each state
output = np.zeros((state_len, state_len), dtype=int)

# fill array with commuting data
for state in range(len(df.index)):
x = states_dict.get('0' + df.iloc[state]['res_state_code'])
y = states_dict.get(df.iloc[state]['wrk_state_code'])

output[x][y] = df.iloc[state]['workers']

# county level
else:
# get unique identifier for each county
df['res_geoid'] = '0' + df['res_state_code'] + df['res_county_code']
df['wrk_geoid'] = df['wrk_state_code'] + df['wrk_county_code']
unique_counties = df['res_geoid'].unique()

# create empty output array
county_len = np.count_nonzero(unique_counties)
output = np.zeros((county_len, county_len), dtype=int)

# create dictionary to be used as array indices
counties_enum = enumerate(unique_counties)
counties_dict = dict((fips, index) for index, fips in counties_enum)

df.reset_index(drop=True, inplace=True)
df.reset_index(inplace=True)

# fill array with commuting data
for county in range(len(df.index)):
x = counties_dict.get(df.iloc[county]['res_geoid'])
y = counties_dict.get(df.iloc[county]['wrk_geoid'])
df = df.pivot(index='res_geoid', columns='wrk_geoid', values='workers')
df.fillna(0, inplace=True)

output[x][y] = df.iloc[county]['workers']
return df.to_numpy(dtype=int)

return output
return ADRIO('commuters', fetch)

def _make_simple_adrios(self, attrib: AttributeDef, scope: CensusScope, year: int) -> ADRIO:
Expand Down

0 comments on commit c0a31c3

Please sign in to comment.