-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathredfin_crawler.py
350 lines (313 loc) · 14 KB
/
redfin_crawler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
import re
import json
import random
import requests
import logging
import time
import pandas as pd
import argparse
from concurrent.futures import ProcessPoolExecutor
from bs4 import BeautifulSoup
import sqlite3
from redfin_filters import apply_filters
LOGGER = None
HEADER = {
'User-agent': 'Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko)'\
' Chrome/49.0.2623.112 Safari/537.36'
}
SQLITE_DB_PATH = 'redfin_scraper_data.db'
def construct_proxy(ip_addr, port, user=None, password=None):
if user:
return {
'http': f'http://{user}:{password}@{ip_addr}:{port}',
'https': f'https://{user}:{password}@{ip_addr}:{port}',
}
return {
'http': f'http://{ip_addr}:{port}',
'https': f'https://{ip_addr}:{port}',
}
def create_tables_if_not_exist():
conn = sqlite3.connect(SQLITE_DB_PATH)
conn.execute('''CREATE TABLE IF NOT EXISTS URLS
(
URL TEXT NOT NULL,
NUM_PROPERTIES INT,
NUM_PAGES INT,
PER_PAGE_PROPERTIES INT);''')
conn.execute('''CREATE TABLE IF NOT EXISTS LISTINGS
(
URL TEXT NOT NULL,
INFO TEXT);''')
conn.execute('''CREATE TABLE IF NOT EXISTS LISTING_DETAILS
(
URL TEXT NOT NULL,
NUMBER_OF_ROOMS INT,
NAME TEXT,
COUNTRY TEXT,
REGION TEXT,
LOCALITY TEXT,
STREET TEXT,
POSTOAL TEXT,
TYPE TEXT,
PRICE REAL
);''')
conn.close()
def get_page_info(url_and_proxy):
"""Return property count, page count and total properties under a given URL."""
url, proxy = url_and_proxy
time.sleep(random.random() * 10)
session = requests.Session()
total_properties, num_pages, properties_per_page = None, None, None
try:
resp = session.get(url, headers=HEADER, proxies=proxy)
resp.raise_for_status()
if resp.status_code == 200:
bf = BeautifulSoup(resp.text, 'lxml')
page_description_div = bf.find('div', {'class': 'homes summary'})
if not page_description_div:
# The page has nothing!
return(url, 0, 0, 20)
page_description = page_description_div.get_text()
if 'of' in page_description:
property_cnt_pattern = r'Showing ([0-9]+) of ([0-9]+) .*'
m = re.match(property_cnt_pattern, page_description)
if m:
properties_per_page = int(m.group(1))
total_properties = int(m.group(2))
pages = [int(x.get_text()) for x in bf.find_all('a', {'class': "goToPage"})]
num_pages = max(pages)
else:
property_cnt_pattern = r'Showing ([0-9]+) .*'
m = re.match(property_cnt_pattern, page_description)
if m:
properties_per_page = int(m.group(1))
num_pages = 1
except Exception as e:
LOGGER.exception('Swallowing exception {} on url {}'.format(e, url))
return (url, total_properties, num_pages, properties_per_page)
def url_partition(base_url, proxies, max_levels=6):
"""Partition the listings for a given url into multiple sub-urls,
such that each url contains at most 20 properties.
"""
urls = [base_url]
num_levels = 0
partitioned_urls = []
while urls and (num_levels < max_levels):
rand_move = random.randint(0, len(proxies) - 1)
partition_inputs = []
for i, url in enumerate(urls):
proxy = construct_proxy(*proxies[(rand_move + i) % len(proxies)])
LOGGER.debug(f"scraping url {url} with proxy {proxy}")
partition_inputs.append((url, proxy))
scraper_results = []
with ProcessPoolExecutor(max_workers=min(50, len(partition_inputs))) as executor:
scraper_results = list(executor.map(get_page_info, partition_inputs))
LOGGER.info('Getting {} results'.format(len(scraper_results)))
with sqlite3.connect(SQLITE_DB_PATH) as db:
LOGGER.info('stage {} saving to db!'.format(num_levels))
values = []
for result in scraper_results:
to_nulls = [x if x else 'NULL' for x in result]
values.append("('{}', {}, {}, {})".format(*to_nulls))
cursor = db.cursor()
cursor.execute("""
INSERT INTO URLS (URL, NUM_PROPERTIES, NUM_PAGES, PER_PAGE_PROPERTIES)
VALUES {};
""".format(','.join(values)))
LOGGER.info('Writing to sqlite {} results'.format(len(scraper_results)))
new_urls = []
for result in scraper_results:
if (result[1] and result[2] and result[3] and result[1] > result[2] * result[3]) or (num_levels == 0):
expanded_urls = apply_filters(result[0], base_url)
if len(expanded_urls) == 1 and expanded_urls[0] == result[0]:
LOGGER.info('Cannot further split {}'.format(result[0]))
else:
new_urls.extend(expanded_urls)
else:
partitioned_urls.append(result)
LOGGER.info('stage {}: running for {} urls. We already captured {} urls'.format(
num_levels, len(new_urls), len(partitioned_urls)))
urls = new_urls
num_levels += 1
time.sleep(random.randint(2, 5))
return partitioned_urls
def parse_addresses():
listing_details = {}
with sqlite3.connect(SQLITE_DB_PATH) as db:
cur = db.cursor()
cur.execute("SELECT * FROM listings")
rows = cur.fetchall()
urls = set()
for url, json_details in rows:
if url in urls:
continue
urls.add(url)
listings_on_page = (json.loads(json_details))
for listing in listings_on_page:
# print('listing {}'.format(listing))
num_rooms, name, country, region, locality, street, postal, house_type, price = \
None, None, None, None, None, None, None, None, None
listing_url = None
if (not isinstance(listing, list)) and (not isinstance(listing, dict)):
continue
if isinstance(listing, dict):
info = listing
if ('url' in info) and ('address' in info):
listing_url = info.get('url')
address_details = info['address']
num_rooms = info.get('numberOfRooms')
name = info.get('name')
country = address_details.get('addressCountry')
region = address_details.get('addressRegion')
locality = address_details.get('addressLocality')
street = address_details.get('streetAddress')
postal = address_details.get('postalCode')
house_type = info.get('@type')
listing_details[listing_url] = (listing_url, num_rooms, name, country,
region, locality, street, postal, house_type, price)
continue
for info in listing:
if ('url' in info) and ('address' in info):
listing_url = info.get('url')
address_details = info['address']
num_rooms = info.get('numberOfRooms')
name = info.get('name')
country = address_details.get('addressCountry')
region = address_details.get('addressRegion')
locality = address_details.get('addressLocality')
street = address_details.get('streetAddress')
postal = address_details.get('postalCode')
house_type = info.get('@type')
if 'offers' in info:
price = info['offers'].get('price')
if listing_url:
listing_details[listing_url] = (listing_url, num_rooms, name, country,
region, locality, street, postal, house_type, price)
# print(listing_details)
with sqlite3.connect(SQLITE_DB_PATH) as db:
cursor = db.cursor()
try:
cursor.executemany("""
INSERT INTO LISTING_DETAILS (
URL,
NUMBER_OF_ROOMS,
NAME ,
COUNTRY ,
REGION ,
LOCALITY ,
STREET ,
POSTOAL ,
TYPE ,
PRICE)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
""", listing_details.values())
except Exception as e:
LOGGER.info(e)
def scrape_page(url_proxy):
time.sleep(random.random() * 16)
details = []
try:
url, proxy = url_proxy
session = requests.Session()
resp = session.get(url, headers=HEADER, proxies=proxy)
bf = BeautifulSoup(resp.text, 'lxml')
details = [json.loads(x.text) for x in bf.find_all('script', type='application/ld+json')]
except Exception as e:
LOGGER.exception('failed for url {}, proxy {}'.format(url, proxy))
return url, json.dumps(details)
def get_paginated_urls(prefix):
# Return a set of paginated urls with at most 20 properties each.
paginated_urls = []
with sqlite3.connect(SQLITE_DB_PATH) as db:
cursor = db.execute("""
SELECT URL, NUM_PROPERTIES, NUM_PAGES, PER_PAGE_PROPERTIES
FROM URLS
""")
seen_urls = set()
for row in cursor:
url, num_properties, num_pages, per_page_properties = row
if prefix and (prefix not in url):
continue
if url in seen_urls:
continue
if num_properties == 0:
continue
urls = []
# print(num_properties, num_pages, per_page_properties, url)
if not num_pages:
urls = [url]
elif (not num_properties) and int(num_pages) == 1 and per_page_properties:
urls = ['{},sort=lo-price/page-1'.format(url)]
elif num_properties < num_pages * per_page_properties:
# Build per page urls.
# print('num pages {}'.format(num_pages))
urls = ['{},sort=lo-price/page-{}'.format(url, p) for p in range(1, num_pages + 1)]
paginated_urls.extend(urls)
return list(set(paginated_urls))
def crawl_redfin_with_proxies(proxies, prefix=''):
small_urls = get_paginated_urls(prefix)
rand_move = random.randint(0, len(proxies) - 1)
scrape_inputs, scraper_results = [], []
for i, url in enumerate(small_urls):
proxy = construct_proxy(*proxies[(rand_move + i) % len(proxies)])
scrape_inputs.append((url, proxy))
with ProcessPoolExecutor(max_workers=min(50, len(scrape_inputs))) as executor:
scraper_results = list(executor.map(scrape_page, scrape_inputs))
LOGGER.warning('Finished scraping!')
with sqlite3.connect(SQLITE_DB_PATH) as db:
cursor = db.cursor()
for result in scraper_results:
url, info = result
try:
cursor.execute("""
INSERT INTO LISTINGS (URL, INFO)
VALUES (?, ?)""", (url, info))
except Exception as e:
LOGGER.info('failed record: {}'.format(result))
LOGGER.info(e)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Scrape Redfin property data.')
parser.add_argument(
'proxy_csv_path',
help='proxies csv path. '
'It should contain ip_addr,port,user,password if using proxies with auth. '
'Or just contain ip_addr,port columns if no auth needed.'
)
parser.add_argument(
'redfin_base_url',
help='Redfin base url to specify the crawling location, '
'e.g., https://www.redfin.com/city/11203/CA/Los-Angeles/'
)
parser.add_argument('--type', default='pages',
choices=['properties', 'pages', 'property_details', 'filtered_properties'],
help='pages or properties (default: properties)')
parser.add_argument('--property_prefix', default='',
help='The property prefix for crawling')
parser.add_argument('--partition_levels',
help="Determine the depth of partition. The higher the more properties scraped.",
type=int,
default=12)
parser.add_argument('--logging_level', default='info', choices=['info', 'debug'])
args = parser.parse_args()
if args.logging_level == 'info':
logging.basicConfig(level=logging.INFO)
elif args.logging_level == 'debug':
logging.basicConfig(level=logging.DEBUG)
LOGGER = logging.getLogger(__name__)
create_tables_if_not_exist()
redfin_base_url = args.redfin_base_url
if redfin_base_url[-1] != '/':
redfin_base_url += '/'
proxies = pd.read_csv(args.proxy_csv_path, encoding='utf-8').values
if args.type == 'pages':
url_partition(redfin_base_url, proxies, max_levels=args.partition_levels)
elif args.type == 'properties':
url_partition(redfin_base_url, proxies, max_levels=args.partition_levels)
crawl_redfin_with_proxies(proxies)
parse_addresses()
elif args.type == 'property_details':
parse_addresses()
elif args.type == 'filtered_properties':
crawl_redfin_with_proxies(proxies, args.property_prefix)
else:
raise Exception('Unknown type {}'.format(args.type))