-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathdance4fun.py
67 lines (51 loc) · 1.94 KB
/
dance4fun.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import concurrent.futures
from datetime import datetime
import requests
from event import DanceEvent
def parse_datetimes(date: str, time: str) -> datetime:
return datetime.strptime(f"{date} {time}", "%Y-%m-%d %H:%M")
def download_dance4fun_page(page: int) -> list[DanceEvent]:
url = f"https://retro.danceforfun.at/termine.php?page={page}"
response = requests.get(url, timeout=10)
response.raise_for_status()
data = response.json()
events = []
if not data.get("data"):
return events
for event in data.get("data", []):
if event.get("kurstypname") != "Perfektion":
continue
if "tag" not in data:
continue
starts_at = parse_datetimes(data["tag"], event["von"])
ends_at = parse_datetimes(data["tag"], event["bis"])
events.append(
DanceEvent(
starts_at=starts_at,
ends_at=ends_at,
name=f"{event['kursname']}",
price_euro_cent=350, # static price since it is not provided in the API FIXME
description=event.get("inhalte", ""),
dancing_school="Dance4Fun",
website="https://danceforfun.at/termine/",
)
)
return events
def download_dance4fun() -> list[DanceEvent]:
max_pages = 100 # Adjust this value if we want less events FIXME
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
future_to_page = {
executor.submit(download_dance4fun_page, page): page
for page in range(max_pages)
}
all_events = []
for future in concurrent.futures.as_completed(future_to_page):
page = future_to_page[future]
try:
events = future.result()
all_events.extend(events)
if page >= max_pages - 1:
break
except Exception:
pass
return all_events