Skip to content

Commit

Permalink
fetch_sf tract and block group cases.
Browse files Browse the repository at this point in the history
  • Loading branch information
TJohnsonAZ committed May 21, 2024
1 parent 08184f2 commit a05b6cf
Showing 1 changed file with 95 additions and 71 deletions.
166 changes: 95 additions & 71 deletions epymorph/geo/adrio/census/adrio_census.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from epymorph.data_type import CentroidDType
from epymorph.error import GeoValidationException
from epymorph.geo.adrio.adrio import ADRIO, ADRIOMaker
from epymorph.geo.spec import AttributeDef, TimePeriod
from epymorph.geo.spec import AttributeDef, TimePeriod, Year
from epymorph.geography.us_census import (BLOCK_GROUP, COUNTY, STATE, TRACT,
BlockGroupScope, CensusScope,
CountyScope, StateScope,
Expand Down Expand Up @@ -139,6 +139,11 @@ def make_adrio(self, attrib: AttributeDef, scope: CensusScope, time_period: Time
if attrib not in self.attributes:
msg = f"{attrib.name} is not supported for the Census data source."
raise GeoValidationException(msg)
if not isinstance(time_period, Year):
msg = f"Census ADRIO requires Year (TimePeriod), given {type(time_period)}."
raise GeoValidationException(msg)

year = time_period.year

if attrib.name == 'geoid':
return self._make_geoid_adrio(scope)
Expand All @@ -155,7 +160,7 @@ def make_adrio(self, attrib: AttributeDef, scope: CensusScope, time_period: Time
elif attrib.name == 'tract_median_income':
return self._make_tract_med_income_adrio(scope)
elif attrib.name == 'commuters':
return self._make_commuter_adrio(scope)
return self._make_commuter_adrio(scope, year)
else:
return self._make_simple_adrios(attrib, scope)

Expand Down Expand Up @@ -315,102 +320,118 @@ def fetch_sf(self, scope: CensusScope) -> GeoDataFrame:

sort_param = ['state']

case CountyScope('state', includes, year):
data_df = counties(state=includes, year=year)
case CountyScope('state', includes):
data_df = counties(state=includes, year=scope.year)
data_df = data_df.rename(
columns={'STATEFP': 'state', 'COUNTYFP': 'county'})

sort_param = ['state', 'county']

case CountyScope('county', includes, year):
counties_by_state: dict[str, list[str]] = defaultdict(list)
for state, county in map(COUNTY.decompose, includes):
counties_by_state[state].append(county)
data_df = counties(state=list(counties_by_state.keys()), year=year)
case CountyScope('county', includes):
state = list({STATE.extract(x) for x in includes})
data_df = counties(state=state, year=scope.year)
data_df = data_df.rename(
columns={'STATEFP': 'state', 'COUNTYFP': 'county'})
data_df['county_full'] = data_df['state'] + data_df['county']
data_df = data_df.loc[data_df['county_full'].isin(includes)]

sort_param = ['state', 'county']

# elif granularity == Granularity.TRACT.value:
# if state_fips is not None and state_fips[0] != '*' and county_fips is not None and county_fips[0] != '*':
# data_df = GeoDataFrame()
# # tract and block group level files cannot be fetched using lists
# # several queries must be made and merged instead
# for i in range(len(state_fips)):
# for j in range(len(county_fips)):
# current_data = tracts(
# state=state_fips[i], county=county_fips[j], year=year)

# if len(data_df.index) > 0:
# data_df = data_df.merge(
# current_data, on=['STATEFP', 'COUNTYFP', 'TRACTCE'])
# else:
# data_df = current_data
# else:
# msg = "Data could not be retrieved due to missing state or county fips codes. \
# Wildcard specifier(*) cannot be used for tract level data."
# raise Exception(msg)

# data_df = data_df.rename(
# columns={'STATEFP': 'state', 'COUNTYFP': 'county', 'TRACTCE': 'tract'})

# if tract_fips is not None and tract_fips[0] != '*':
# data_df = data_df.loc[data_df['tract'].isin(tract_fips)]

# sort_param = ['state', 'county', 'tract']

# else:
# state_fips = nodes.get('state')
# county_fips = nodes.get('county')
# if state_fips is not None and state_fips[0] != '*' and county_fips is not None and county_fips[0] != '*':
# data_df = GeoDataFrame()
# for i in range(len(state_fips)):
# for j in range(len(county_fips)):
# current_data = block_groups(
# state=state_fips[i], county=county_fips[j], year=year)
# if len(data_df.index) > 0:
# data_df = data_df.merge(
# current_data, on=['STATEFP', 'COUNTYFP', 'TRACTCE', 'BLKGRPCE'])
# else:
# data_df = current_data
# else:
# msg = "Data could not be retrieved due to missing state or county fips codes. \
# Wildcard specifier(*) cannot be used for block group level data."
# raise Exception(msg)

# data_df = data_df.rename(
# columns={'STATEFP': 'state', 'COUNTYFP': 'county', 'TRACTCE': 'tract', 'BLKGRPCE': 'block group'})
# if cbg_fips is not None and cbg_fips[0] != '*':
# data_df = data_df.iloc[data_df['block group'].isin(cbg_fips)]

# sort_param = ['state', 'county', 'block group']
case TractScope('state', includes):
data_df = concat(tracts(state=s, year=scope.year) for s in includes)
data_df = data_df.rename(
columns={'STATEFP': 'state', 'COUNTYFP': 'county', 'TRACTCE': 'tract'})

sort_param = ['state', 'county', 'tract']

case TractScope('county', includes):
data_df = concat(tracts(state=s, county=c)
for s, c in map(COUNTY.decompose, includes))

data_df = data_df.rename(
columns={'STATEFP': 'state', 'COUNTYFP': 'county', 'TRACTCE': 'tract'})

sort_param = ['state', 'county', 'tract']

case TractScope('tract', includes):
data_df = concat(tracts(state=s, county=c)
for s, c, t in map(TRACT.decompose, includes))

data_df = data_df.rename(
columns={'STATEFP': 'state', 'COUNTYFP': 'county', 'TRACTCE': 'tract'})
data_df['tract_full'] = data_df['state'] + \
data_df['county'] + data_df['tract']
data_df = data_df.loc[data_df['tract_full'].isin(includes)]

sort_param = ['state', 'county', 'tract']

case BlockGroupScope('state', includes):
data_df = concat(block_groups(state=s, year=scope.year)
for s in includes)
data_df = data_df.rename(columns={
'STATEFP': 'state', 'COUNTYFP': 'county', 'TRACTCE': 'tract', 'BLKGRPCE': 'block group'})

sort_param = ['state', 'county', 'tract', 'block group']

case BlockGroupScope('county', includes):
data_df = concat(block_groups(state=s, county=c)
for s, c in map(COUNTY.decompose, includes))

data_df = data_df.rename(columns={
'STATEFP': 'state', 'COUNTYFP': 'county', 'TRACTCE': 'tract', 'BLKGRPCE': 'block group'})

sort_param = ['state', 'county', 'tract', 'block group']

case BlockGroupScope('tract', includes):
data_df = concat(block_groups(state=s, county=c)
for s, c, t in map(TRACT.decompose, includes))

data_df = data_df.rename(columns={
'STATEFP': 'state', 'COUNTYFP': 'county', 'TRACTCE': 'tract', 'BLKGRPCE': 'block_group'})
data_df['tract_full'] = data_df['state'] + \
data_df['county'] + data_df['tract']
data_df = data_df.loc[data_df['tract_full'].isin(includes)]

sort_param = ['state', 'county', 'tract', 'block_group']

case BlockGroupScope('block group', includes):
data_df = concat(block_groups(state=s, county=c)
for s, c, t, bg in map(BLOCK_GROUP.decompose, includes))

data_df = data_df.rename(columns={
'STATEFP': 'state', 'COUNTYFP': 'county', 'TRACTCE': 'tract', 'BLKGRPCE': 'block_group'})
data_df['bg_full'] = data_df['state'] + data_df['county'] + \
data_df['tract'] + data_df['block_group']
data_df = data_df.loc[data_df['bg_full'].isin(includes)]

sort_param = ['state', 'county', 'tract', 'block_group']

case _:
raise Exception("Unsupported query.")

data_df = GeoDataFrame(data_df.sort_values(by=sort_param))
data_df.reset_index(drop=True, inplace=True)

return data_df

def fetch_commuters(self, scope: CensusScope) -> DataFrame:
def fetch_commuters(self, scope: CensusScope, year: int) -> DataFrame:
"""
Utility function to fetch commuting data from .xslx format filtered down to requested regions.
"""
# check for invalid granularity
if isinstance(scope, TractScope) or isinstance(scope, BlockGroupScope):
msg = "Error: Commuting data cannot be retrieved for tract or block group granularities"
msg = "Commuting data cannot be retrieved for tract or block group granularities"
raise Exception(msg)
year = scope.year

# check for valid year
if year not in [2010, 2015, 2020]:
# if invalid year is close to a valid year, fetch valid data and notify user
passed_year = scope.year
if scope.year in range(2008, 2012):
passed_year = year
if year in range(2008, 2012):
year = 2010
elif scope.year in range(2013, 2017):
elif year in range(2013, 2017):
year = 2015
elif scope.year in range(2018, 2022):
elif year in range(2018, 2022):
year = 2020
else:
msg = "Invalid year. Communting data is only available for 2008-2022"
Expand Down Expand Up @@ -470,6 +491,9 @@ def fetch_commuters(self, scope: CensusScope) -> DataFrame:
data = data.loc[data['wrk_county_full'].isin(
['0' + x for x in includes])]

case _:
raise Exception("Unsupported query.")

return data

def _make_geoid_adrio(self, scope: CensusScope) -> ADRIO:
Expand Down Expand Up @@ -642,10 +666,10 @@ def fetch() -> NDArray:
return data_df[self.attrib_vars['median_income']].to_numpy(dtype=np.int64).squeeze()
return ADRIO('tract_median_income', fetch)

def _make_commuter_adrio(self, scope: CensusScope) -> ADRIO:
def _make_commuter_adrio(self, scope: CensusScope, year: int) -> ADRIO:
"""Makes an ADRIO to retrieve ACS commuting flow data."""
def fetch() -> NDArray:
data_df = self.fetch_commuters(scope)
data_df = self.fetch_commuters(scope, year)
# state level
if isinstance(scope, StateScope):
# get unique state identifier
Expand Down

0 comments on commit a05b6cf

Please sign in to comment.