-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathscrape_data.py
172 lines (123 loc) · 5.5 KB
/
scrape_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
"""Script to scrape payment card spend data over £500 from gov.scot"""
import concurrent.futures
import os
import urllib.parse
from datetime import datetime
import pandas as pd
import requests
from bs4 import BeautifulSoup
MONTHLY_SPEND_URL = (
"https://www.gov.scot/collections/government-spend-over-gbp500-monthly-reports/"
)
TIMEOUT = 30
SAVE_PATH = "_data/spendsOver500"
COLUMN_HEADERS_ALIASES = {
"Merchant Category Description": "Merchant Category Name",
"Merchant Category Description": "Merchant Category Name",
"Mercant Category Name": "Merchant Category Name",
"Merchant Code Description": "Merchant Category Name",
"Category Code Description": "Merchant Category Name",
"Merchant Description": "Merchant Category Name",
"Financial Transaction Date": "Transaction Date",
"Transaction Amount (£)": "Transaction Amount",
"Financial Transaction Amount (£)": "Transaction Amount",
"Description": "Expense Description",
"Description To Be Published": "Expense Description",
"Final Description": "Expense Description",
"Final Expense Description": "Expense Description",
}
temp_headers = []
def clean_currency(currency_value, temp):
"""Takes currency strings and turns them into floats with 2 decimal places"""
if not isinstance(currency_value, str):
return round(currency_value, 2)
currency_value = currency_value.replace(",","")
currency_value = currency_value.replace("£","")
try:
currency_value = round(float(currency_value), 2)
except:
print(temp)
print(currency_value)
return currency_value
def clean_report(report_dataframe, temp):
"""Cleans up any inconsistencies in the table"""
# Title case all column names for consistency
report_dataframe.columns = map(str.title, report_dataframe.columns)
# Director General only appears in May 2016 so we may as well remove it
if "Director General" in report_dataframe.columns:
report_dataframe = report_dataframe.drop(["Director General"], axis=1)
# Fix column aliases
report_dataframe = report_dataframe.rename(columns=COLUMN_HEADERS_ALIASES)
# Remove duplicate header rows
report_dataframe = report_dataframe[report_dataframe.iloc[:, 0] != report_dataframe.columns[0]]
# Remove £ and comma from transaction amounts
report_dataframe["Transaction Amount"] = report_dataframe[
"Transaction Amount"
].apply(lambda x: clean_currency(x, temp))
# Drop blank unnamed columns
report_dataframe = report_dataframe.loc[
:, ~report_dataframe.columns.str.contains("^Unnamed")
]
# Finally, check for missing columns and if so, add them
# January 2020 doesn't have Merchant Category Name
if "Merchant Category Name" not in report_dataframe.columns:
report_dataframe.insert(
loc=2, column="Merchant Category Name", value="Not specified in data source"
)
# December 2019 doesn't have Expense Description
if "Expense Description" not in report_dataframe.columns:
report_dataframe.insert(
loc=5, column="Expense Description", value="Not specified in data source"
)
return report_dataframe
def extract_report(tag_href, tag_text):
"""Extracts the month's spending report from the given href URL and saves it to a JSON file"""
report_url = tag_href
split_tag_text = tag_text.split()
report_year_month_array = split_tag_text[-2:]
report_title = " ".join(report_year_month_array)
report_month_name = report_year_month_array[0]
report_month_number = datetime.strptime(report_month_name, "%B").strftime("%m")
report_year_number = report_year_month_array[1]
report_year_month = f"{report_year_number}-{report_month_number}"
print(f"Accessing spend for {report_title} at {report_url}...")
report_url = urllib.parse.urljoin(MONTHLY_SPEND_URL, report_url)
report_html = requests.get(report_url, timeout=TIMEOUT)
report_soup = BeautifulSoup(report_html.content, "html.parser")
report_table = report_soup.find("table")
report_dataframe = pd.read_html(str(report_table), header=0, encoding="utf8")[0]
report_dataframe = clean_report(report_dataframe, report_title)
report_dataframe.to_json(
f"{SAVE_PATH}/{report_year_month}.json",
orient="records",
date_format="iso",
indent=4,
force_ascii=False,
)
temp_headers.append([report_year_month] + report_dataframe.columns.tolist())
return f"Completed {report_title} at {report_url}"
def main():
"""Main method"""
# Pre-check if the data folder exists and if not, create it
if not os.path.exists(SAVE_PATH):
os.makedirs(SAVE_PATH)
index_page_html = requests.get(MONTHLY_SPEND_URL, timeout=TIMEOUT)
index_soup = BeautifulSoup(index_page_html.content, "html.parser")
report_url_tags = index_soup.find_all(
lambda tag: tag.name == "a"
and tag.text != "Spend over £500: archive (pre - May 2016)"
and tag.text.startswith("Spend over £500")
)
print(f"Got {len(report_url_tags)} tags\n")
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [
executor.submit(extract_report, tag["href"], tag.text)
for tag in report_url_tags
]
for future in concurrent.futures.as_completed(futures):
result = future.result()
print(result)
headers_df = pd.DataFrame(temp_headers)
headers_df.to_csv("headers.csv", index=False)
if __name__ == "__main__":
main()