-
Notifications
You must be signed in to change notification settings - Fork 25
/
Copy pathproxy.py
executable file
·163 lines (132 loc) · 4.92 KB
/
proxy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import asyncio
import aiohttp
import msgpack
import traceback
class ProtocolError(Exception):
def __init__(self, msg):
self.msg = msg
class Protocol:
def __init__(self, session):
self.session = session
self.changes = []
async def search(self, query):
url = self.index_url + f"/{self.index_name}/_search"
data = msgpack.dumps({"q": query})
headers = {
"Content-Type": "application/vnd.msgpack",
"Accept": "application/vnd.msgpack",
}
async with self.session.post(url, data=data, headers=headers) as resp:
resp.raise_for_status()
body = msgpack.loads(await resp.content.read())
return [(r["i"], r["s"]) for r in body["r"]]
async def update(self, changes):
url = self.index_url + f"/{self.index_name}/_update"
data = msgpack.dumps({"c": changes})
headers = {
"Content-Type": "application/vnd.msgpack",
"Accept": "application/vnd.msgpack",
}
async with self.session.post(url, data=data, headers=headers) as resp:
body = await resp.content.read()
resp.raise_for_status()
async def get_attribute(self, name):
url = self.index_url + f"/{self.index_name}"
headers = {
"Content-Type": "application/vnd.msgpack",
"Accept": "application/vnd.msgpack",
}
async with self.session.get(url, headers=headers) as resp:
resp.raise_for_status()
body = msgpack.loads(await resp.content.read())
return body['a'].get(name, 0)
async def set_attribute(self, name, value):
changes = [
{
's': {'n': name, 'v': value}
}
]
await self.update(changes)
async def handle_request(self, request):
if not request:
raise ProtocolError("invalid command")
if request[0] == "search":
query = list(map(int, request[1].split(",")))
results = await self.search(query)
return " ".join(f"{docid}:{hits}" for (docid, hits) in results)
if request[0] == "begin":
self.changes = []
return ""
if request[0] == "rollback":
self.changes = []
return ""
if request[0] == "commit":
await self.update(self.changes)
self.changes = []
return ""
if request[0] == "insert":
self.changes.append(
{
"i": {
"i": int(request[1]),
"h": [int(v) for v in request[2].split(",")],
}
}
)
return ""
if request[0] == 'get':
if len(request) == 3 and request[1] == 'attribute':
value = await self.get_attribute(request[2])
return str(value)
elif len(request) == 2:
value = await self.get_attribute(request[1])
return str(value)
if request[0] == 'set':
if len(request) == 4 and request[1] == 'attribute':
await self.set_attribute(request[2], int(request[3]))
return ''
elif len(request) == 3:
await self.set_attribute(request[1], int(request[2]))
return ''
raise ProtocolError("invalid command")
class Server:
def __init__(self):
self.host = "127.0.0.1"
self.port = 6080
self.index_name = "main"
self.index_url = "http://localhost:8080"
async def run(self):
async with aiohttp.ClientSession() as session:
self.session = session
server = await asyncio.start_server(
self.handle_connection, self.host, self.port
)
async with server:
await server.serve_forever()
async def handle_connection(self, reader, writer):
try:
proto = Protocol(self.session)
proto.index_name = self.index_name
proto.index_url = self.index_url
while True:
try:
line = await reader.readuntil(b"\n")
except asyncio.exceptions.IncompleteReadError:
return
try:
response = await proto.handle_request(line.decode("ascii").split())
writer.write(b"OK " + response.encode("ascii") + b"\n")
except ProtocolError as ex:
writer.write(b"ERR " + ex.msg.encode("ascii") + b"\n")
except Exception:
traceback.print_exc()
writer.write(b"ERR internal error\n")
await writer.drain()
finally:
writer.close()
await writer.wait_closed()
async def main():
srv = Server()
await srv.run()
if __name__ == "__main__":
asyncio.run(main())