-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathappserver.py
128 lines (105 loc) · 3.72 KB
/
appserver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
from flask import Flask
from dotenv import load_dotenv, find_dotenv
from flask import Response
import threading
from fauna import fql
from fauna.client import Client, StreamOptions
from fauna.errors import FaunaException
from fauna.query.models import StreamToken
import os
load_dotenv(find_dotenv())
stream_token = None
thread = None
def do_stream():
global stream_token
stream_client = Client(secret=os.getenv("FAUNA_SECRET"), endpoint=os.getenv("FAUNA_ENDPOINT"))
# If the app server restarted, we'll get the last timestamp of a healthcheck ping, and the original stream_token
res = stream_client.query(
fql("""
let latest = healthcheck_ts.last_ping().first()
if (latest != null) {
{
start_ts: latest!.ping.toMicros(),
stream_token: latest!.stream_token
}
} else {
null
}
""")
)
start_ts = int(res.data["start_ts"]) if res.data is not None else None
stream_token = res.data["stream_token"] if res.data is not None else None
if stream_token is None:
try:
#----------------------------------#
# your stream query here
#----------------------------------#
q = fql("""
let s = Foo.where(.foo == "bar") { id, foo }
s.toStream()
""")
res = stream_client.query(q)
stream_token = res.data.token
except FaunaException as err:
return "Unable to obtain a stream token. ERR: {}".format(err)
opts = StreamOptions(max_attempts=5, max_backoff=30, start_ts=start_ts)
with stream_client.stream(StreamToken(stream_token), opts) as stream:
for event in stream:
try:
#--------------------------------------------------------#
# send these events to SNS, SQS, where you want them, etc.
#--------------------------------------------------------#
print(event)
# access fields on the document
# print(event["data"].ts) # fauna provided top level fields
# print(event["data"].id)
# print(event["data"]["name"]) # user provided fields
except FaunaException as e:
print(e)
except Exception as e:
print("EXCEPTION {}".format(e))
def start_stream():
try:
global thread
if thread is None:
thread = threading.Thread(target=do_stream)
thread.start()
return "Stream thread started"
else:
return "Already started"
except Exception as error:
return str(error)
def do_health_check():
global stream_token
try:
print("checking health")
client = Client(secret=os.getenv("FAUNA_SECRET"), endpoint=os.getenv("FAUNA_ENDPOINT"))
client.query(
fql("""
healthcheck_ts.create({
ping: Time.now(),
stream_token: ${stream_token}
})
""",
stream_token=stream_token
)
)
return Response("OK")
except FaunaException as err:
print(err)
return Response("{}".format(err), status=400)
def create_app():
app = Flask(__name__)
@app.route("/health")
def health_checker():
thread = threading.Thread(target=do_health_check)
thread.start()
return "ok"
return app
if __name__ == '__main__':
app = create_app()
start_stream()
app.run(threaded=True)
else:
gunicorn_app = create_app()
start_stream()