-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcovid_api.py
189 lines (144 loc) · 6.51 KB
/
covid_api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
import csv
import bottle
from copy import deepcopy
from json import dumps as _dumps
from bottle import request, response, get
from covid_api.CaseData import CASE_DATA
from covid_api.PolyGeoJSON import POLY_GEOJSON
from covid_api.PointGeoJSON import POINT_GEOJSON
app = application = bottle.default_app()
def dumps(obj):
response.content_type = 'application/json'
return _dumps(obj, ensure_ascii=False)
#======================================================================#
# Schemas/DataTypes
#======================================================================#
@get('/source_ids')
def source_ids():
"""Get schemas"""
out = {}
with open('source_info_table.tsv', 'r', encoding='utf-8') as f:
for item in csv.DictReader(f, delimiter='\t'):
out[item.pop('source_id')] = item
return dumps(out)
@get('/datatypes')
def datatypes():
"""Get datatypes"""
out = {}
with open('datatypes_table.tsv', 'r', encoding='utf-8') as f:
for item in csv.DictReader(f, delimiter='\t'):
out[item.pop('datatype_id')] = item
return dumps(out)
@get('/schemas')
def schemas():
"""Get datatypes"""
return dumps(sorted(CASE_DATA.SOURCES_BY_SCHEMA))
#======================================================================#
# GeoJSON Data
#======================================================================#
GEOJSON_TEMPLATE = {
"type": "FeatureCollection",
"features": []
}
@get('/geojson_point_data')
def geojson_point_data():
"""Get GeoJSON"""
schema = request.query.schema
region_child = request.query.get('region_child')
region_parent = request.query.get('region_parent')
source_id = request.query.get('source_id', None)
# Get most recent figures, choosing the most recently
# updated if source_id isn't explicitly specified
case_data = CASE_DATA.get_latest_case_data_singlesource(schema, None, source_id,
region_parent, region_child)
if region_child is not None:
# Get just for a single region child
geojson_out = deepcopy(GEOJSON_TEMPLATE)
geojson_out['features'].extend(POINT_GEOJSON.DATA[schema][region_parent][region_child])
elif region_parent is not None:
# Get all region children for a given region parent
geojson_out = deepcopy(GEOJSON_TEMPLATE)
[geojson_out['features'].extend(POINT_GEOJSON.DATA[schema][region_parent][i]) for i in POINT_GEOJSON.DATA[schema][region_parent]]
else:
# Get all region parents/children for a given schema
geojson_out = deepcopy(GEOJSON_TEMPLATE)
for region_parent in POINT_GEOJSON.DATA[schema]:
for region_child in POINT_GEOJSON.DATA[schema][region_parent]:
geojson_out['features'].extend(POINT_GEOJSON.DATA[schema][region_parent][region_child])
for feature in geojson_out['features']:
print(feature)
properties = feature['properties']
key = properties['region_parent'], properties['region_child'] # FIXME!
if key in case_data:
case_data_dict = case_data[key]
properties.update(case_data_dict)
return dumps(geojson_out)
@get('/geojson_poly_data')
def geojson_poly_data():
"""Get GeoJSON"""
schema = request.query.schema
region_child = request.query.get('region_child')
region_parent = request.query.get('region_parent')
source_id = request.query.get('source_id', None)
# Get most recent figures, choosing the most recently
# updated if source_id isn't explicitly specified
case_data = CASE_DATA.get_latest_case_data_singlesource(schema, None, source_id,
region_parent, region_child)
if region_child is not None:
# Get just for a single region child
geojson_out = deepcopy(GEOJSON_TEMPLATE)
geojson_out['features'].extend(POLY_GEOJSON.DATA[schema][region_parent][region_child])
elif region_parent is not None:
# Get all region children for a given region parent
geojson_out = deepcopy(GEOJSON_TEMPLATE)
[geojson_out['features'].extend(POLY_GEOJSON.DATA[schema][region_parent][i]) for i in POINT_GEOJSON.DATA[schema][region_parent]]
else:
# Get all region parents/children for a given schema
geojson_out = deepcopy(GEOJSON_TEMPLATE)
for region_parent in POLY_GEOJSON.DATA[schema]:
for region_child in POLY_GEOJSON.DATA[schema][region_parent]:
geojson_out['features'].extend(POLY_GEOJSON.DATA[schema][region_parent][region_child])
for feature in geojson_out['features']:
properties = feature['properties']
key = properties['region_parent'], properties['region_child'] # FIXME!
if key in case_data:
case_data_dict = case_data[key]
properties.update(case_data_dict)
return dumps(geojson_out)
#======================================================================#
# Case Data
#======================================================================#
@get('/case_data_sources')
def case_data_sources():
"""Get possible case data sources by schema"""
schema = request.query.schema
return dumps(list(CASE_DATA.SOURCES_BY_SCHEMA[schema]))
@get('/case_datatypes')
def case_datatypes():
"""Get possible case datatypes by schema"""
schema = request.query.schema
return dumps(list(CASE_DATA.DATATYPES_BY_SCHEMA[schema]))
@get('/case_data_time_series')
def case_data_time_series():
"""Get case data time series"""
schema = request.query.schema
datatype = request.query.get('datatype', None)
source_id = request.query.get('source_id', None)
region_parent = request.query.get('region_parent')
region_child = request.query.get('region_child')
age_breakdowns = request.query.get('age_breakdowns', False)
out = CASE_DATA.get_time_series(schema, datatype, source_id, region_parent, region_child, age_breakdowns)
return dumps(out)
@get('/latest_case_data')
def latest_case_data():
"""Get the latest case data only"""
schema = request.query.schema
datatype = request.query.get('datatype', None)
source_id = request.query.get('source_id', None)
region_parent = request.query.get('region_parent')
region_child = request.query.get('region_child')
age_breakdowns = request.query.get('age_breakdowns', False)
out = CASE_DATA.get_latest_case_data(schema, datatype, source_id, region_parent, region_child, age_breakdowns)
return dumps(out)
if __name__ == '__main__':
bottle.run(host='0.0.0.0', port=8000)