Skip to content

Commit

Permalink
Merge pull request #274 from sixwaaaay/sync
Browse files Browse the repository at this point in the history
fix: update for open telemetry
  • Loading branch information
sixwaaaay authored Sep 15, 2024
2 parents ce0d836 + a4b8961 commit 214ab08
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 20 deletions.
15 changes: 10 additions & 5 deletions replica/essync/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
FROM python:3.11-alpine
COPY . /src
RUN pip install elasticsearch && \
pip install mysql-replication && \
pip install opentelemetry-api && \

RUN pip install -r /src/requirements.txt

RUN pip install opentelemetry-api && \
pip install opentelemetry-sdk && \
pip install opentelemetry-exporter-otlp-proto-http
CMD ["python", "/src/cdc.py"]
pip install opentelemetry-exporter-otlp-proto-http && \
pip install opentelemetry-distro

RUN opentelemetry-bootstrap -a install

ENTRYPOINT [ "opentelemetry-instrument", "python", "/src/cdc.py"]
31 changes: 16 additions & 15 deletions replica/essync/cdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@
from elasticsearch.helpers import bulk
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.event import RotateEvent
from pymysqlreplication.row_event import DeleteRowsEvent, UpdateRowsEvent, WriteRowsEvent
from pymysqlreplication.row_event import (
DeleteRowsEvent,
UpdateRowsEvent,
WriteRowsEvent,
)
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
Expand All @@ -36,20 +40,17 @@ def load_conf():


def save_position(next_binlog, position):
data = {
'next_binlog': next_binlog,
'position': position
}
with open('position.json', 'w') as f:
data = {"next_binlog": next_binlog, "position": position}
with open("position.json", "w") as f:
json.dump(data, f)
logging.info(f'Saved position: {data}')
logging.info(f"Saved position: {data}")


def load_position():
try:
with open('position.json', 'r') as f:
with open("position.json", "r") as f:
data = json.load(f)
return data['next_binlog'], data['position']
return data["next_binlog"], data["position"]
except FileNotFoundError:
return None, None

Expand All @@ -68,25 +69,25 @@ def sync(stream: BinLogStreamReader, client: Elasticsearch, tracer: trace.Tracer
if isinstance(binlog_event, DeleteRowsEvent):
vals = row["values"]
action = {
"_op_type": 'delete',
"_op_type": "delete",
"_index": table,
"_id": identity(vals)
"_id": identity(vals),
}
elif isinstance(binlog_event, UpdateRowsEvent):
vals = row["after_values"]
action = {
"_op_type": 'index',
"_op_type": "index",
"_index": table,
"_id": identity(vals),
"_source": vals
"_source": vals,
}
elif isinstance(binlog_event, WriteRowsEvent):
vals = row["values"]
action = {
"_op_type": 'index',
"_op_type": "index",
"_index": table,
"_id": identity(vals),
"_source": vals
"_source": vals,
}
else:
action = None
Expand Down
12 changes: 12 additions & 0 deletions replica/essync/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
certifi==2024.8.30
# via elastic-transport
elastic-transport==8.15.0
# via elasticsearch
elasticsearch==8.15.1
mysql-replication==1.0.9
packaging==24.1
# via mysql-replication
pymysql==1.1.1
# via mysql-replication
urllib3==2.2.3
# via elastic-transport

0 comments on commit 214ab08

Please sign in to comment.