-
Notifications
You must be signed in to change notification settings - Fork 67
/
Copy pathdatabase.py
69 lines (56 loc) · 2.17 KB
/
database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import logging
import pymongo
import pandas as pds
import expiringdict
import utils
client = pymongo.MongoClient()
logger = logging.Logger(__name__)
utils.setup_logger(logger, 'db.log')
RESULT_CACHE_EXPIRATION = 10 # seconds
def upsert_bpa(df):
"""
Update MongoDB database `energy` and collection `energy` with the given `DataFrame`.
"""
db = client.get_database("energy")
collection = db.get_collection("energy")
update_count = 0
for record in df.to_dict('records'):
result = collection.replace_one(
filter={'Datetime': record['Datetime']}, # locate the document if exists
replacement=record, # latest document
upsert=True) # update if exists, insert if not
if result.matched_count > 0:
update_count += 1
logger.info("rows={}, update={}, ".format(df.shape[0], update_count) +
"insert={}".format(df.shape[0]-update_count))
def fetch_all_bpa():
db = client.get_database("energy")
collection = db.get_collection("energy")
ret = list(collection.find())
logger.info(str(len(ret)) + ' documents read from the db')
return ret
_fetch_all_bpa_as_df_cache = expiringdict.ExpiringDict(max_len=1,
max_age_seconds=RESULT_CACHE_EXPIRATION)
def fetch_all_bpa_as_df(allow_cached=False):
"""Converts list of dicts returned by `fetch_all_bpa` to DataFrame with ID removed
Actual job is done in `_worker`. When `allow_cached`, attempt to retrieve timed cached from
`_fetch_all_bpa_as_df_cache`; ignore cache and call `_work` if cache expires or `allow_cached`
is False.
"""
def _work():
data = fetch_all_bpa()
if len(data) == 0:
return None
df = pds.DataFrame.from_records(data)
df.drop('_id', axis=1, inplace=True)
return df
if allow_cached:
try:
return _fetch_all_bpa_as_df_cache['cache']
except KeyError:
pass
ret = _work()
_fetch_all_bpa_as_df_cache['cache'] = ret
return ret
if __name__ == '__main__':
print(fetch_all_bpa_as_df())