Skip to content

Commit 2a2ae23

Browse files
committedDec 6, 2024
Add proxy emulating the legacy protocol
1 parent 2e8a5fd commit 2a2ae23

File tree

2 files changed

+135
-0
lines changed

2 files changed

+135
-0
lines changed
 

‎proxy/Dockerfile

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
FROM python:3
2+
3+
RUN pip install --no-cache-dir aiohttp
4+
5+
ADD proxy.py /usr/src/fpindex-proxy.py
6+
7+
CMD [ "python", "/usr/src/fpindex-proxy.py" ]

‎proxy/proxy.py

+128
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
import asyncio
2+
import aiohttp
3+
import msgpack
4+
import traceback
5+
6+
7+
class ProtocolError(Exception):
8+
9+
def __init__(self, msg):
10+
self.msg = msg
11+
12+
13+
class Protocol:
14+
15+
def __init__(self, session):
16+
self.session = session
17+
self.changes = []
18+
19+
async def search(self, query):
20+
url = self.index_url + f"/{self.index_name}/_search"
21+
data = msgpack.dumps({"q": query})
22+
headers = {
23+
"Content-Type": "application/vnd.msgpack",
24+
"Accept": "application/vnd.msgpack",
25+
}
26+
async with self.session.post(url, data=data, headers=headers) as resp:
27+
resp.raise_for_status()
28+
body = msgpack.loads(await resp.content.read())
29+
return [(r["i"], r["s"]) for r in body["r"]]
30+
31+
async def update(self, changes):
32+
url = self.index_url + f"/{self.index_name}/_update"
33+
data = msgpack.dumps({"c": changes})
34+
headers = {
35+
"Content-Type": "application/vnd.msgpack",
36+
"Accept": "application/vnd.msgpack",
37+
}
38+
async with self.session.post(url, data=data, headers=headers) as resp:
39+
body = await resp.content.read()
40+
resp.raise_for_status()
41+
42+
async def handle_request(self, request):
43+
if not request:
44+
raise ProtocolError("invalid command")
45+
46+
if request[0] == "search":
47+
query = list(map(int, request[1].split(",")))
48+
results = await self.search(query)
49+
return " ".join(f"{docid}:{hits}" for (docid, hits) in results)
50+
51+
if request[0] == "begin":
52+
self.changes = []
53+
return ""
54+
55+
if request[0] == "rollback":
56+
self.changes = []
57+
return ""
58+
59+
if request[0] == "commit":
60+
await self.update(self.changes)
61+
self.changes = []
62+
return ""
63+
64+
if request[0] == "insert":
65+
self.changes.append(
66+
{
67+
"i": {
68+
"i": int(request[1]),
69+
"h": [int(v) for v in request[2].split(",")],
70+
}
71+
}
72+
)
73+
return ""
74+
75+
raise ProtocolError("invalid command")
76+
77+
78+
class Server:
79+
80+
def __init__(self):
81+
self.host = "127.0.0.1"
82+
self.port = 6080
83+
self.index_name = "main"
84+
self.index_url = "http://localhost:8080"
85+
86+
async def run(self):
87+
async with aiohttp.ClientSession() as session:
88+
self.session = session
89+
server = await asyncio.start_server(
90+
self.handle_connection, self.host, self.port
91+
)
92+
async with server:
93+
await server.serve_forever()
94+
95+
async def handle_connection(self, reader, writer):
96+
try:
97+
proto = Protocol(self.session)
98+
proto.index_name = self.index_name
99+
proto.index_url = self.index_url
100+
101+
while True:
102+
try:
103+
line = await reader.readuntil(b"\n")
104+
except asyncio.exceptions.IncompleteReadError:
105+
return
106+
107+
try:
108+
response = await proto.handle_request(line.decode("ascii").split())
109+
writer.write(b"OK " + response.encode("ascii") + b"\n")
110+
except ProtocolError as ex:
111+
writer.write(b"ERR " + ex.msg.encode("ascii") + b"\n")
112+
except Exception:
113+
traceback.print_exc()
114+
writer.write(b"ERR internal error\n")
115+
116+
await writer.drain()
117+
finally:
118+
writer.close()
119+
await writer.wait_closed()
120+
121+
122+
async def main():
123+
srv = Server()
124+
await srv.run()
125+
126+
127+
if __name__ == "__main__":
128+
asyncio.run(main())

0 commit comments

Comments
 (0)