From d8de7df39df373467e43d680d19ace5e97e4f388 Mon Sep 17 00:00:00 2001 From: meaghan66 <102330088+meaghan66@users.noreply.github.com> Date: Tue, 20 Aug 2024 15:48:25 -0700 Subject: [PATCH] LODES adrio2 refactor. (#148) * LODES adrio2 refactor. * Minor issues addressed in LODES. --- doc/devlog/2024-06-05.ipynb | 169 ++++----- epymorph/geo/adrio/__init__.py | 2 +- .../adrio/census/{lodes.py => adrio_lodes.py} | 0 epymorph/geo/adrio/lodes.py | 324 ++++++++++++++++++ 4 files changed, 415 insertions(+), 80 deletions(-) rename epymorph/geo/adrio/census/{lodes.py => adrio_lodes.py} (100%) create mode 100644 epymorph/geo/adrio/lodes.py diff --git a/doc/devlog/2024-06-05.ipynb b/doc/devlog/2024-06-05.ipynb index 5eee6db3..e1f111ed 100644 --- a/doc/devlog/2024-06-05.ipynb +++ b/doc/devlog/2024-06-05.ipynb @@ -44,55 +44,70 @@ "source": [ "### **Basic Queries**\n", "\n", - "The 'label' and 'commuters' are two simple yet imperative queries that show the basic functionality of the LODES ADRIO maker. The 'label' query represents the GEOIDs that are involved with the commuter matrices. The input given by the user for the scope, in this case being states, is translated into a list of GEOIDs. The 'commuters' query shows the total number of workers moving from a home GEOID to a work GEOID as a matrix. The matrix is read so that the rows represent the residence GEOID and the columns are the work location GEOID." + "The Commuters and Geoid calls are two simple yet imperative queries that show the basic functionality of the LODES ADRIO maker. The Geoid query represents the GEOIDs that are involved with the commuter matrices. The input given by the user for the scope, in this case being states, is translated into a list of GEOIDs. The Commuters query shows the total number of workers moving from a home GEOID to a work GEOID as a matrix. The matrix is read so that the rows represent the residence GEOID and the columns are the work location GEOID." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, + "outputs": [], + "source": [ + "from unittest.mock import Mock\n", + "\n", + "import numpy as np\n", + "\n", + "from epymorph.data_shape import SimDimensions\n", + "from epymorph.geography.us_census import StateScope\n", + "from epymorph.simulation import NamespacedAttributeResolver\n", + "\n", + "state_scope = StateScope.in_states_by_code([\"AZ\", \"CO\", \"NV\", \"NM\"])\n", + "\n", + "data = Mock(spec=NamespacedAttributeResolver)\n", + "dim = Mock(spec=SimDimensions)\n", + "rng = Mock(spec=np.random.Generator)" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "from epymorph.geo.adrio.lodes import Commuters, Geoid\n", + "\n", + "time_period = 2015\n", + "commuters = Commuters(time_period)\n", + "geoids = Geoid()" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ - "Home/Work GEOIDs: ['04' '08' '32' '35']\n", + "Home/Work GEOIDs:\n", + " ['04' '08' '32' '35']\n", "\n", "Commuters Matrix:\n", " [[2550132 2582 13263 8100]\n", " [ 1202 2405258 382 5557]\n", " [ 3552 535 1179411 361]\n", - " [ 6813 4824 409 764244]]\n" + " [ 6813 4824 409 764244]]\n", + "\n" ] } ], "source": [ - "from epymorph.data_shape import Shapes\n", - "from epymorph.geo.adrio import adrio_maker_library\n", - "from epymorph.geo.dynamic import DynamicGeo\n", - "from epymorph.geo.spec import DynamicGeoSpec, Year\n", - "from epymorph.geography.us_census import StateScope\n", - "from epymorph.simulation import AttributeDef\n", - "\n", - "spec = DynamicGeoSpec(\n", - " attributes=[\n", - " AttributeDef('label', str, Shapes.N),\n", - " AttributeDef('commuters', int, Shapes.NxN),\n", - " ],\n", - " time_period=Year(2015),\n", - " scope=StateScope.in_states_by_code([\"AZ\", \"CO\", \"NV\", \"NM\"]),\n", - " source={\n", - " 'label': 'LODES:geoid',\n", - " 'commuters': 'LODES'\n", - " }\n", - ")\n", - "\n", - "geo = DynamicGeo.from_library(spec, adrio_maker_library)\n", - "\n", + "print(\n", + " f\"Home/Work GEOIDs:\\n {geoids.evaluate_in_context(data, dim, state_scope, rng)}\\n\")\n", "\n", - "print(f\"Home/Work GEOIDs: {geo['label']}\\n\")\n", - "\n", - "print(f\"Commuters Matrix:\\n {geo['commuters']}\")" + "print(\n", + " f\"Commuters Matrix:\\n {commuters.evaluate_in_context(data, dim, state_scope, rng)}\\n\")" ] }, { @@ -101,30 +116,30 @@ "source": [ "# Attributes\n", "\n", - "The 'commuters' attribute outputs the total number of workers commuting, but LODES provides three categories for attributes specifying the type of workers: Age, Monthly Income, and Industry Sectors. Within each category, there are three ranges within them, and the sum of the ranges equals the total number of workers. All of these categories and the total commuters are displayed as NxN matrices of integers, excluding the label query.\n", + "The Commuters class outputs the total number of workers commuting, but LODES provides three categories for attributes specifying the type of workers: Age, Monthly Income, and Industry Sectors. Within each category, there are three ranges within them, and the sum of the ranges equals the total number of workers. All of these categories and the total commuters are displayed as NxN matrices of integers, excluding the Geoid query.\n", "\n", "## Age\n", - "- 'commuters_29_under'\n", + "- '29 and Under'\n", " - Commuters that are ages 29 and under.\n", - "- 'commuters_30_to_54\n", + "- ''30_54'\n", " - Commuters that are between the ages of 30 and 54.\n", - "- 'commuters_55_over'\n", + "- '55 and Over'\n", " - Commuters that are ages 55 and over.\n", "\n", "## Monthly Income\n", - "- 'commuters_1250_under_earnings'\n", + "- '$1250 and Under'\n", " - Commuters that earn $1250 and under per month.\n", - "- 'commuters_1251_to_3333_earnings'\n", + "- '$1251_$3333'\n", " - Commuters that earn between $1251 and $3333 per month.\n", - "- 'commuters_3333_over_earnings'\n", + "- '$3333 and Over'\n", " - Commuters that earn over $3333 per month.\n", "\n", "## Industry Sector\n", - "- 'commuters_goods_producing_industry'\n", + "- 'Goods Producing'\n", " - Commuters that work in Goods Producing industry sectors.\n", - "- 'commuters_trade_transport_utility_industry'\n", + "- 'Trade Transport Utility'\n", " - Commuters that work in Trade, Transportation, and Utility industry sectors.\n", - "- 'commuters_other_industry'\n", + "- 'Other'\n", " - Commuters that work under all other service industry sectors other than the above claimed industries.\n" ] }, @@ -133,32 +148,53 @@ "metadata": {}, "source": [ "## Job Type\n", - "Along with the above categories, LODES provides files detailing different job types and the total number of jobs under that type. However, unlike the attributes, these matrices do not sum to be the total number of workers. \n", + "Along with the above categories, LODES provides files detailing different job types and the total number of jobs under that type. However, unlike the attributes, these matrices do not sum to be the total number of workers.\n", "\n", - "- 'all_jobs'\n", + "- 'All Jobs'\n", " - All jobs regardless of job type. Allows for multiple jobs per person and is the default when calling the above attributes.\n", - "- 'primary_jobs'\n", + "- 'Primary Jobs'\n", " - Primary jobs, which a primary job is the highest paying job for an individual worker for the year. Limits to one job per worker.\n", - "- 'all_private_jobs'\n", + "- 'All Private Jobs'\n", " - All private jobs, which are privately owned businesses and organizations excluding federal government jobs.\n", - "- 'private_primary_jobs'\n", + "- 'Private Primary Jobs'\n", " - Primary jobs within the private sector.\n", - "- 'all_federal_jobs'\n", + "- 'All Federal Jobs'\n", " - All jobs within the federal government sector.\n", - "- 'federal_primary_jobs\n", - " - Jobs under the federal government sector that are defined as primary jobs." + "- 'Federal Primary Jobs\n", + " - Jobs under the federal government sector that are defined as primary jobs.\n", + "\n", + "Job type is an additional variable that can be combined with the previously explained attributes. For example, a user may retrieve a matrix of commuters work for the Goods Producing industry sector for only Primary jobs. This variable is not required for all calls and the default value will be 'All Jobs'." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ + "### Age Range Attribute Example\n", + "\n", "Below is an example of calling the three different age ranges provided by LODES in a geo spec. The example here loads four counties into the matrices rather than the four states that was used previously." ] }, { "cell_type": "code", - "execution_count": 2, + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "from epymorph.geo.adrio.lodes import CommutersByAge\n", + "from epymorph.geography.us_census import CountyScope\n", + "\n", + "time_period = 2015\n", + "county_scope = CountyScope.in_counties([\"04013\", \"08041\", \"32003\", \"35001\"])\n", + "\n", + "commuters_29_under = CommutersByAge(time_period, '29 and Under')\n", + "commuters_30_54 = CommutersByAge(time_period, '30_54')\n", + "commuters_55_over = CommutersByAge(time_period, '55 and Over')" + ] + }, + { + "cell_type": "code", + "execution_count": 7, "metadata": {}, "outputs": [ { @@ -187,37 +223,12 @@ } ], "source": [ - "from epymorph.data_shape import Shapes\n", - "from epymorph.geo.adrio import adrio_maker_library\n", - "from epymorph.geo.dynamic import DynamicGeo\n", - "from epymorph.geo.spec import DynamicGeoSpec, Year\n", - "from epymorph.geography.us_census import CountyScope\n", - "from epymorph.simulation import AttributeDef\n", - "\n", - "spec = DynamicGeoSpec(\n", - " attributes=[\n", - " AttributeDef('label', str, Shapes.N),\n", - " AttributeDef('commuters_29_under', int, Shapes.NxN),\n", - " AttributeDef('commuters_30_to_54', int, Shapes.NxN),\n", - " AttributeDef('commuters_55_over', int, Shapes.NxN),\n", - " ],\n", - " time_period=Year(2015),\n", - " scope=CountyScope.in_counties([\"04013\", \"08041\", \"32003\", \"35001\"]),\n", - " source={\n", - " 'label': 'LODES:geoid',\n", - " 'commuters_29_under': 'LODES',\n", - " 'commuters_30_to_54': 'LODES',\n", - " 'commuters_55_over': 'LODES',\n", - " }\n", - ")\n", - "\n", - "geo = DynamicGeo.from_library(spec, adrio_maker_library)\n", - "\n", - "print(f\"Commuters ages 29 and under:\\n {geo['commuters_29_under']}\\n\")\n", - "\n", - "print(f\"Commuters between ages 30 and 54:\\n {geo['commuters_30_to_54']}\\n\")\n", - "\n", - "print(f\"Commuters ages 55 and over:\\n {geo['commuters_55_over']}\\n\")" + "print(\n", + " f\"Commuters ages 29 and under:\\n {commuters_29_under.evaluate_in_context(data, dim, county_scope, rng)}\\n\")\n", + "print(\n", + " f\"Commuters between ages 30 and 54:\\n {commuters_30_54.evaluate_in_context(data, dim, county_scope, rng)}\\n\")\n", + "print(\n", + " f\"Commuters ages 55 and over:\\n {commuters_55_over.evaluate_in_context(data, dim, county_scope, rng)}\\n\")" ] } ], @@ -237,7 +248,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.11.9" + "version": "3.11.0rc1" } }, "nbformat": 4, diff --git a/epymorph/geo/adrio/__init__.py b/epymorph/geo/adrio/__init__.py index f4dec120..f214cadc 100644 --- a/epymorph/geo/adrio/__init__.py +++ b/epymorph/geo/adrio/__init__.py @@ -1,6 +1,6 @@ """AdrioMaker library.""" from epymorph.geo.adrio.census.adrio_census import ADRIOMakerCensus -from epymorph.geo.adrio.census.lodes import ADRIOMakerLODES +from epymorph.geo.adrio.census.adrio_lodes import ADRIOMakerLODES from epymorph.geo.adrio.disease.adrio_cdc import ADRIOMakerCDC from epymorph.geo.adrio.file.adrio_csv import ADRIOMakerCSV from epymorph.geo.dynamic import ADRIOMaker diff --git a/epymorph/geo/adrio/census/lodes.py b/epymorph/geo/adrio/census/adrio_lodes.py similarity index 100% rename from epymorph/geo/adrio/census/lodes.py rename to epymorph/geo/adrio/census/adrio_lodes.py diff --git a/epymorph/geo/adrio/lodes.py b/epymorph/geo/adrio/lodes.py new file mode 100644 index 00000000..7836d777 --- /dev/null +++ b/epymorph/geo/adrio/lodes.py @@ -0,0 +1,324 @@ +from pathlib import Path +from typing import Literal + +import numpy as np +import pandas as pd +from numpy.typing import NDArray + +from epymorph.cache import load_or_fetch_url +from epymorph.error import DataResourceException +from epymorph.geo.adrio.adrio2 import Adrio +from epymorph.geography.scope import GeoScope +from epymorph.geography.us_census import STATE, CensusScope, state_fips_to_code + +_LODES_CACHE_PATH = Path("geo/adrio/lodes") + +# job type variables for use among all commuters classes +JobType = Literal[ + 'All Jobs', 'Primary Jobs', + 'All Private Jobs', 'Private Primary Jobs', + 'All Federal Jobs', 'Federal Primary Jobs' +] + +job_variables: dict[JobType, str] = { + 'All Jobs': 'JT00', + 'Primary Jobs': 'JT01', + 'All Private Jobs': 'JT02', + 'Private Primary Jobs': 'JT03', + 'All Federal Jobs': 'JT04', + 'Federal Primary Jobs': 'JT05' +} + + +def _fetch_lodes(scope: CensusScope, worker_type: str, job_type: str, year: int) -> NDArray[np.int64]: + """Fetches data from LODES commuting flow data for a given year""" + + # check for valid year input + if year not in range(2002, 2022): + msg = "Invalid year. LODES data is only available for 2002-2021" + raise DataResourceException(msg) + + # file type is main (residence in state only) by default + file_type = "main" + + # initialize variables + aggregated_data = None + geoid = scope.get_node_ids() + n_geocode = len(geoid) + geocode_to_index = {geocode: i for i, geocode in enumerate(geoid)} + geocode_len = len(geoid[0]) + data_frames = [] + # can change the lodes version, default is the most recent LODES8 + lodes_ver = "LODES8" + + if scope.granularity != 'state': + states = STATE.truncate_list(geoid) + else: + states = geoid + + # check for multiple states + if (len(states) > 1): + file_type = "aux" + + # no federal jobs in given years + if year in range(2002, 2010) and (job_type == "JT04" or job_type == "JT05"): + + msg = "Invalid year for job type, no federal jobs can be found between 2002 to 2009" + raise DataResourceException(msg) + + # LODES year and state exceptions + # exceptions can be found in this document for LODES8.1: https://lehd.ces.census.gov/data/lodes/LODES8/LODESTechDoc8.1.pdf + invalid_conditions = [ + (year in range(2002, 2010) and (job_type == "JT04" or job_type == "JT05"), + "Invalid year for job type, no federal jobs can be found between 2002 to 2009"), + + (('05' in states) and (year == 2002 or year in range(2019, 2022)), + "Invalid year for state, no commuters can be found for Arkansas in 2002 or between 2019-2021"), + + (('04' in states) and (year == 2002 or year == 2003), + "Invalid year for state, no commuters can be found for Arizona in 2002 or 2003"), + + (('11' in states) and (year in range(2002, 2010)), + "Invalid year for state, no commuters can be found for DC in 2002 or between 2002-2009"), + + (('25' in states) and (year in range(2002, 2011)), + "Invalid year for state, no commuters can be found for Massachusetts between 2002-2010"), + + (('28' in states) and (year in range(2002, 2004) or year in range(2019, 2022)), + "Invalid year for state, no commuters can be found for Mississippi in 2002, 2003, or between 2019-2021"), + + (('33' in states) and year == 2002, + "Invalid year for state, no commuters can be found for New Hampshire in 2002"), + + (('02' in states) and year in range(2017, 2022), + "Invalid year for state, no commuters can be found for Alaska in between 2017-2021") + ] + for condition, message in invalid_conditions: + if condition: + raise DataResourceException(message) + + # translate state FIPS code to state to use in URL + state_codes = state_fips_to_code(scope.year) + state_abbreviations = [state_codes.get( + fips, "").lower() for fips in states] + + for state in state_abbreviations: + + # construct the URL to fetch LODES data, reset to empty each time + url_list = [] + + # always get main file (in state residency) + url_main = f'https://lehd.ces.census.gov/data/lodes/{lodes_ver}/{state}/od/{state}_od_main_{job_type}_{year}.csv.gz' + url_list.append(url_main) + + # if there are more than one state in the input, get the aux files (out of state residence) + if file_type == "aux": + url_aux = f'https://lehd.ces.census.gov/data/lodes/{lodes_ver}/{state}/od/{state}_od_aux_{job_type}_{year}.csv.gz' + url_list.append(url_aux) + + try: + files = [ + load_or_fetch_url(u, _LODES_CACHE_PATH / Path(u).name) + for u in url_list + ] + except Exception as e: + raise DataResourceException("Unable to fetch LODES data.") from e + + unfiltered_df = [pd.read_csv(file, compression="gzip", converters={ + 'w_geocode': str, 'h_geocode': str}) for file in files] + + # go through dataframes, multiple if there are main and aux files + for df in unfiltered_df: + + # filter the rows on if they start with the prefix + filtered_rows = [df[df['h_geocode'].str.startswith( + tuple(geoid)) & df['w_geocode'].str.startswith(tuple(geoid))]] + + # add the filtered dataframe to the list of dataframes + data_frames.append(pd.concat(filtered_rows)) + + for data_df in data_frames: + # convert w_geocode and h_geocode to strings + data_df['w_geocode'] = data_df['w_geocode'].astype(str) + data_df['h_geocode'] = data_df['h_geocode'].astype(str) + + # group by w_geocode and h_geocode and sum the worker values + grouped_data = data_df.groupby( + [data_df['w_geocode'].str[:geocode_len], data_df['h_geocode'].str[:geocode_len]])[worker_type].sum() + + if aggregated_data is None: + aggregated_data = grouped_data + else: + aggregated_data = aggregated_data.add(grouped_data, fill_value=0) + + # create an empty array to store worker type values + output = np.zeros((n_geocode, n_geocode), dtype=np.int64) + + # loop through all of the grouped values and add to output + for (w_geocode, h_geocode), value in aggregated_data.items(): # type: ignore + w_index = geocode_to_index.get(w_geocode) + h_index = geocode_to_index.get(h_geocode) + output[h_index, w_index] += value + + return output + + +def _validate_scope(scope: GeoScope) -> CensusScope: + if not isinstance(scope, CensusScope): + msg = 'Census scope is required for LODES attributes.' + raise DataResourceException(msg) + + # check if the CensusScope year is the current LODES geography: 2020 + if scope.year != 2020: + msg = "GeoScope year does not match the LODES geography year." + raise DataResourceException(msg) + + return scope + + +class Commuters(Adrio[np.int64]): + """ + Creates an NxN matrix of integers representing the number of workers moving from a home GEOID to a work GEOID. + """ + + year: int + """The year the data encompasses.""" + + job_type: JobType + + def __init__(self, year: int, job_type: JobType = 'All Jobs'): + self.year = year + self.job_type = job_type + + def evaluate(self) -> NDArray[np.int64]: + scope = self.scope + scope = _validate_scope(scope) + job_var = job_variables[self.job_type] + df = _fetch_lodes(scope, "S000", job_var, self.year) + return df + + +class CommutersByAge(Adrio[np.int64]): + """ + Creates an NxN matrix of integers representing the number of workers moving from a + home GEOID to a work GEOID that fall under a certain age range. + """ + + year: int + """The year the data encompasses.""" + + job_type: JobType + + AgeRange = Literal[ + '29 and Under', '30_54', + '55 and Over' + ] + + age_variables: dict[AgeRange, str] = { + '29 and Under': 'SA01', + '30_54': 'SA02', + '55 and Over': 'SA03' + } + + age_range: AgeRange + + def __init__(self, year: int, age_range: AgeRange, job_type: JobType = 'All Jobs'): + self.year = year + self.age_range = age_range + self.job_type = job_type + + def evaluate(self) -> NDArray[np.int64]: + scope = self.scope + scope = _validate_scope(scope) + age_var = self.age_variables[self.age_range] + job_var = job_variables[self.job_type] + df = _fetch_lodes(scope, age_var, job_var, self.year) + return df + + +class CommutersByEarnings(Adrio[np.int64]): + """ + Creates an NxN matrix of integers representing the number of workers moving from a + home GEOID to a work GEOID that earn a certain income range monthly. + """ + + year: int + """The year the data encompasses.""" + + job_type: JobType + + EarningRange = Literal[ + '$1250 and Under', '$1251_$3333', + '$3333 and Over' + ] + + earnings_variables: dict[EarningRange, str] = { + '$1250 and Under': 'SE01', + '$1251_$3333': 'SE02', + '$3333 and Over': 'SE03' + } + + earning_range: EarningRange + + def __init__(self, year: int, earning_range: EarningRange, job_type: JobType = 'All Jobs'): + self.year = year + self.earning_range = earning_range + self.job_type = job_type + + def evaluate(self) -> NDArray[np.int64]: + scope = self.scope + scope = _validate_scope(scope) + earning_var = self.earnings_variables[self.earning_range] + job_var = job_variables[self.job_type] + df = _fetch_lodes(scope, earning_var, job_var, self.year) + return df + + +class CommutersByIndustry(Adrio[np.int64]): + """ + Creates an NxN matrix of integers representing the number of workers moving from a + home GEOID to a work GEOID that work under specified industry sector. + """ + + year: int + """The year the data encompasses.""" + + job_type: JobType + + Industries = Literal[ + 'Goods Producing', 'Trade Transport Utility', + 'Other' + ] + + industry_variables: dict[Industries, str] = { + 'Goods Producing': 'SI01', + 'Trade Transport Utility': 'SI02', + 'Other': 'SI03' + } + + industry: Industries + + def __init__(self, year: int, industry: Industries, job_type: JobType = 'All Jobs'): + self.year = year + self.industry = industry + self.job_type = job_type + + def evaluate(self) -> NDArray[np.int64]: + scope = self.scope + scope = _validate_scope(scope) + industry_var = self.industry_variables[self.industry] + job_var = job_variables[self.job_type] + df = _fetch_lodes(scope, industry_var, job_var, self.year) + return df + + +class Geoid(Adrio): + """ + Creates an array of strings of the geocodes involved within a commuters matrix. + """ + + def evaluate(self) -> NDArray: + scope = self.scope + scope = _validate_scope(scope) + geoid = scope.get_node_ids() + return np.array(geoid, dtype=str)