Skip to content

Commit f4f5e2c

Browse files
authored
Merge pull request #1 from egbertbouman/stats
Add message statistics and speedtest messages
2 parents c274977 + e293263 commit f4f5e2c

File tree

14 files changed

+705
-306
lines changed

14 files changed

+705
-306
lines changed

.github/workflows/publish.yml

Lines changed: 39 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,22 @@ permissions:
1111

1212
jobs:
1313
linux:
14-
runs-on: ubuntu-latest
14+
runs-on: ${{ matrix.platform.runner }}
1515
strategy:
1616
matrix:
17-
target: [x86_64, x86, aarch64, armv7, s390x, ppc64le]
17+
platform:
18+
- runner: ubuntu-latest
19+
target: x86_64
20+
- runner: ubuntu-latest
21+
target: x86
22+
- runner: ubuntu-latest
23+
target: aarch64
24+
- runner: ubuntu-latest
25+
target: armv7
26+
- runner: ubuntu-latest
27+
target: s390x
28+
- runner: ubuntu-latest
29+
target: ppc64le
1830
steps:
1931
- uses: actions/checkout@v3
2032
- uses: ./.github/actions/set-version
@@ -25,20 +37,24 @@ jobs:
2537
uses: PyO3/maturin-action@v1
2638
with:
2739
target: ${{ matrix.target }}
28-
args: --release --out dist -i 3.8 -i 3.9 -i 3.10 -i 3.11 -i 3.12
40+
args: --release --out dist -i 3.9 -i 3.10 -i 3.11 -i 3.12 -i 3.13
2941
sccache: 'true'
3042
manylinux: auto
3143
- name: Upload wheels
32-
uses: actions/upload-artifact@v3
44+
uses: actions/upload-artifact@v4
3345
with:
34-
name: wheels
46+
name: wheels-linux-${{ matrix.platform.target }}
3547
path: dist
3648

3749
windows:
38-
runs-on: windows-latest
50+
runs-on: ${{ matrix.platform.runner }}
3951
strategy:
4052
matrix:
41-
target: [x64, x86]
53+
platform:
54+
- runner: windows-latest
55+
target: x64
56+
- runner: windows-latest
57+
target: x86
4258
steps:
4359
- uses: actions/checkout@v3
4460
- uses: ./.github/actions/set-version
@@ -50,19 +66,23 @@ jobs:
5066
uses: PyO3/maturin-action@v1
5167
with:
5268
target: ${{ matrix.target }}
53-
args: --release --out dist -i 3.8 -i 3.9 -i 3.10 -i 3.11 -i 3.12
69+
args: --release --out dist -i 3.9 -i 3.10 -i 3.11 -i 3.12 -i 3.13
5470
sccache: 'true'
5571
- name: Upload wheels
56-
uses: actions/upload-artifact@v3
72+
uses: actions/upload-artifact@v4
5773
with:
58-
name: wheels
74+
name: wheels-windows-${{ matrix.platform.target }}
5975
path: dist
6076

6177
macos:
62-
runs-on: macos-latest
78+
runs-on: ${{ matrix.platform.runner }}
6379
strategy:
6480
matrix:
65-
target: [x86_64, aarch64]
81+
platform:
82+
- runner: macos-13
83+
target: x86_64
84+
- runner: macos-14
85+
target: aarch64
6686
steps:
6787
- uses: actions/checkout@v3
6888
- uses: ./.github/actions/set-version
@@ -73,12 +93,12 @@ jobs:
7393
uses: PyO3/maturin-action@v1
7494
with:
7595
target: ${{ matrix.target }}
76-
args: --release --out dist -i 3.8 -i 3.9 -i 3.10 -i 3.11 -i 3.12
96+
args: --release --out dist -i 3.9 -i 3.10 -i 3.11 -i 3.12 -i 3.13
7797
sccache: 'true'
7898
- name: Upload wheels
79-
uses: actions/upload-artifact@v3
99+
uses: actions/upload-artifact@v4
80100
with:
81-
name: wheels
101+
name: wheels-macos-${{ matrix.platform.target }}
82102
path: dist
83103

84104
sdist:
@@ -92,9 +112,9 @@ jobs:
92112
command: sdist
93113
args: --out dist
94114
- name: Upload sdist
95-
uses: actions/upload-artifact@v3
115+
uses: actions/upload-artifact@v4
96116
with:
97-
name: wheels
117+
name: wheels-sdist
98118
path: dist
99119

100120
release:
@@ -103,13 +123,11 @@ jobs:
103123
if: "startsWith(github.ref, 'refs/tags/')"
104124
needs: [linux, windows, macos, sdist]
105125
steps:
106-
- uses: actions/download-artifact@v3
107-
with:
108-
name: wheels
126+
- uses: actions/download-artifact@v4
109127
- name: Publish to PyPI
110128
uses: PyO3/maturin-action@v1
111129
env:
112130
MATURIN_PYPI_TOKEN: ${{ secrets.PYPI_TOKEN }}
113131
with:
114132
command: upload
115-
args: --non-interactive --skip-existing *
133+
args: --non-interactive --skip-existing wheels-*/*

.github/workflows/test.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@ jobs:
1616
toolchain: stable
1717
components: rustfmt
1818
target: x86_64-unknown-linux-gnu
19-
- name: Setup Python 3.8
20-
uses: actions/setup-python@v4
19+
- name: Setup Python 3.9
20+
uses: actions/setup-python@v5
2121
with:
22-
python-version: 3.8
22+
python-version: 3.9
2323
- name: Setup dependencies
2424
run: |
2525
pip install --upgrade pip

Cargo.toml

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,14 @@ name = "rust_endpoint"
1616
crate-type = ["cdylib"]
1717

1818
[dependencies]
19-
pyo3 = { version = "0.20.0", features = ["extension-module"] }
20-
tokio = { version = "1.34.0", features = ["full"] }
21-
env_logger = "0.10.1"
22-
log = "0.4.20"
23-
arc-swap = "1.6.0"
19+
pyo3 = { version = "0.23.4", features = ["extension-module"] }
20+
tokio = { version = "1.43.0", features = ["full"] }
21+
env_logger = "0.11.6"
22+
log = "0.4.25"
23+
arc-swap = "1.7.1"
2424
chacha20poly1305 = "0.10.1"
25-
socks5-proto = "0.4.0"
26-
socks5-server = "0.10.0"
27-
bytes = "1.5.0"
28-
rand = "0.8.5"
29-
map-macro = "0.2.6"
25+
socks5-proto = "0.4.1"
26+
socks5-server = "0.10.1"
27+
bytes = "1.10.0"
28+
rand = "0.9.0"
29+
map-macro = "0.3.0"

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
# IPv8-rust-tunnels
2-
[![](https://img.shields.io/pypi/v/ipv8-rust-tunnels.svg?label=PyPI)](https://pypi.org/project/ipv8-rust-tunnels/)   ![Unit tests](https://github.com/egbertbouman/ipv8-rust-tunnels/actions/workflows/test.yml/badge.svg)
2+
[![](https://img.shields.io/pypi/v/ipv8-rust-tunnels.svg?label=PyPI)](https://pypi.org/project/ipv8-rust-tunnels/)   ![Unit tests](https://github.com/Tribler/ipv8-rust-tunnels/actions/workflows/test.yml/badge.svg)
33

44
This module provides a set of performance enhancements to the `TunnelCommunity`, the anonymization layer used in [IPv8](https://github.com/Tribler/py-ipv8) and [Tribler](https://github.com/Tribler/tribler). It works by handling the tunnel data traffic in Rust, while letting the Python anonymization layer handle the tunnel control logic.

ipv8_rust_tunnels/endpoint.py

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
import ipv8_rust_tunnels.rust_endpoint as rust
1414

1515
from ipv8.messaging.anonymization.crypto import CryptoEndpoint
16+
from ipv8.messaging.interfaces.endpoint import EndpointListener
17+
from ipv8.messaging.interfaces.network_stats import NetworkStat
1618
from ipv8.messaging.interfaces.udp.endpoint import Endpoint, EndpointClosedException, UDPv4Address
1719
from ipv8.taskmanager import TaskManager
1820
from ipv8.util import succeed
@@ -65,6 +67,24 @@ def __init__(self, port: int = 0, ip: str = "0.0.0.0", worker_threads: int = 4)
6567

6668
self.register_task('update_stats', self.update_stats, interval=1)
6769

70+
def add_prefix_listener(self, listener: EndpointListener, prefix: bytes) -> None:
71+
"""
72+
Add an EndpointListener to our listeners, only triggers on packets with a specific prefix.
73+
74+
:raises: IllegalEndpointListenerError if the provided listener is not an EndpointListener
75+
"""
76+
super().add_prefix_listener(listener, prefix)
77+
if self.rust_ep.is_open():
78+
self.rust_ep.set_prefixes(list(self._prefix_map.keys()))
79+
80+
def remove_listener(self, listener: EndpointListener) -> None:
81+
"""
82+
Remove a listener from our listeners, if it is registered.
83+
"""
84+
super().remove_listener(listener)
85+
if self.rust_ep.is_open():
86+
self.rust_ep.set_prefixes(list(self._prefix_map.keys()))
87+
6888
def update_stats(self) -> None:
6989
"""
7090
Updates the statistics of the routing objects using the most recent data from Rust.
@@ -78,6 +98,25 @@ def update_stats(self) -> None:
7898
for exit_socket in self.exit_sockets.values():
7999
self.rust_ep.update_exit_stats(exit_socket.circuit_id, exit_socket)
80100

101+
def get_statistics(self, prefix: bytes) -> dict[int, NetworkStat]:
102+
"""
103+
Get the message statistics per message identifier for the given prefix.
104+
"""
105+
result = {}
106+
for msg_id, counters in self.rust_ep.get_message_statistics(prefix).items():
107+
stat = result[msg_id] = NetworkStat(msg_id)
108+
stat.num_up = counters[0]
109+
stat.num_down = counters[2]
110+
stat.bytes_up = counters[1]
111+
stat.bytes_down = counters[3]
112+
return result
113+
114+
def enable_community_statistics(self, community_prefix: bytes, enabled: bool) -> None:
115+
"""
116+
Start tracking stats for packets with the given prefix.
117+
"""
118+
pass
119+
81120
def setup_tunnels(self, tunnel_community: TunnelCommunity, settings: TunnelSettings) -> None:
82121
"""
83122
Set up the TunnelCommunity.
@@ -93,6 +132,7 @@ def apply_settings(self) -> None:
93132
"""
94133
if self.prefix and self.settings and self.is_open():
95134
self.rust_ep.set_prefix(self.prefix)
135+
self.rust_ep.set_prefixes(list(self._prefix_map.keys()))
96136
self.rust_ep.set_max_relay_early(self.settings.max_relay_early)
97137
self.rust_ep.set_peer_flags(self.settings.peer_flags)
98138

@@ -210,11 +250,12 @@ def reset_byte_counters(self) -> None:
210250
self.bytes_up = 0
211251
self.bytes_down = 0
212252

213-
def run_speedtest(self, target_addr: str, associate_port: int, num_packets: int, request_size: int,
214-
response_size: int, timeout_ms: int, window_size: int, callback: Callable) -> None:
253+
def run_speedtest(self, circuit_id: int, test_time: int, request_size: int,
254+
response_size: int, target_rtt: int, callback: Callable, callback_interval: int = 0) -> None:
215255
"""
216-
Perform a TunnelCommunity speedtest. Connects to an existing UDP associate
217-
port and sends test messages to a given target address.
256+
Perform a TunnelCommunity speedtest.
218257
"""
219-
self.rust_ep.run_speedtest(target_addr, associate_port, num_packets, request_size,
220-
response_size, timeout_ms, window_size, callback)
258+
def callback_threadsafe(*args):
259+
self.loop.call_soon_threadsafe(callback, *args)
260+
return self.rust_ep.run_speedtest(circuit_id, test_time, request_size,
261+
response_size, target_rtt, callback_threadsafe, callback_interval)

ipv8_rust_tunnels/tests/test_tunnel_community.py

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
1+
import asyncio
12
import unittest
3+
from unittest.mock import Mock
24

35
from ipv8_rust_tunnels.endpoint import RustEndpoint
46

7+
from ipv8.messaging.anonymization.community import CIRCUIT_TYPE_RP_DOWNLOADER
8+
from ipv8.messaging.anonymization.tunnel import PEER_FLAG_SPEED_TEST
9+
from ipv8.messaging.interfaces.endpoint import EndpointListener
510
from ipv8.messaging.interfaces.udp.endpoint import UDPv4Address
611
from ipv8.test.messaging.anonymization.test_community import TestTunnelCommunity
712
from ipv8.test.messaging.anonymization.test_hiddenservices import TestHiddenServices
@@ -26,6 +31,12 @@ def create_node(org, *args, **kwargs): # noqa: ANN201, ANN002, ANN001, D103
2631
ep.remove_listener(ipv8.overlay)
2732
ep.add_prefix_listener(ipv8.overlay, overlay.get_prefix())
2833

34+
# Some unittests use prefixes that we don't want blocked. So, we add a fake EndpointListener.
35+
mock = Mock()
36+
mock.__class__ = EndpointListener
37+
ep.add_prefix_listener(mock, b'\x00' * 22)
38+
ep.add_prefix_listener(mock, b'\x00\x01' + b'\x00' * 20)
39+
2940
overlay.circuits = ep.circuits
3041
overlay.relay_from_to = ep.relays
3142
overlay.exit_sockets = ep.exit_sockets
@@ -41,17 +52,70 @@ def replace(old_func, new_func): # noqa: ANN001, D103, ANN201
4152
return lambda *args, org=old_func, **kwargs: new_func(org, *args, **kwargs)
4253

4354

55+
async def new_test_test_request(self) -> None:
56+
"""
57+
Check if sending test-request messages works as expected.
58+
"""
59+
self.add_node_to_experiment(self.create_node())
60+
self.settings(1).peer_flags |= {PEER_FLAG_SPEED_TEST}
61+
await self.introduce_nodes()
62+
circuit = self.overlay(0).create_circuit(2, exit_flags=[PEER_FLAG_SPEED_TEST])
63+
await circuit.ready
64+
65+
callback = Mock()
66+
self.overlay(0).endpoint.run_speedtest(circuit.circuit_id, 10, 10, 10, 1, callback)
67+
await asyncio.sleep(.1)
68+
69+
callback.assert_called_once()
70+
self.assertEqual(len(callback.call_args_list), 1)
71+
msg_stats = list(callback.call_args_list[0].args[0].values())
72+
self.assertTrue(sum([stat[1] for stat in msg_stats]) > 0)
73+
self.assertTrue(sum([stat[3] for stat in msg_stats]) > 0)
74+
75+
76+
async def new_test_test_request_e2e(self, *args) -> None:
77+
"""
78+
Check if sending test-request messages over an e2e circuit works as expected.
79+
"""
80+
future = asyncio.Future()
81+
82+
self.overlay(0).join_swarm(self.service, 1, future.set_result, seeding=False)
83+
self.overlay(2).join_swarm(self.service, 1, future.set_result)
84+
self.overlay(2).settings.peer_flags.add(PEER_FLAG_SPEED_TEST)
85+
86+
await self.introduce_nodes()
87+
await self.create_intro(2, self.service)
88+
await self.assign_exit_node(0)
89+
90+
await self.overlay(0).do_peer_discovery()
91+
await self.deliver_messages()
92+
93+
await future
94+
95+
circuit, = self.overlay(0).find_circuits(ctype=CIRCUIT_TYPE_RP_DOWNLOADER)
96+
callback = Mock()
97+
self.overlay(0).endpoint.run_speedtest(circuit.circuit_id, 10, 3, 6, 1, callback)
98+
await asyncio.sleep(.1)
99+
100+
callback.assert_called_once()
101+
self.assertEqual(len(callback.call_args_list), 1)
102+
msg_stats = list(callback.call_args_list[0].args[0].values())
103+
self.assertTrue(sum([stat[1] for stat in msg_stats]) > 0)
104+
self.assertTrue(sum([stat[3] for stat in msg_stats]) > 0)
105+
106+
44107
TestTunnelCommunity.create_node = replace(TestTunnelCommunity.create_node, create_node)
45108
TestTunnelCommunity.setUp = replace(TestTunnelCommunity.setUp, set_up)
109+
TestTunnelCommunity.test_test_request = new_test_test_request
46110
TestTunnelCommunity.test_tunnel_unicode_destination = \
47111
unittest.skip("not available in RustEndpoint")(TestTunnelCommunity.test_tunnel_unicode_destination)
48112

49113
TestHiddenServices.create_node = replace(TestHiddenServices.create_node, create_node)
50114
TestHiddenServices.setUp = replace(TestHiddenServices.setUp, set_up)
115+
TestHiddenServices.test_test_request_e2e = new_test_test_request_e2e
51116
TestHiddenServices.test_dht_lookup_with_counterparty = \
52117
unittest.skip("not available in RustEndpoint")(TestHiddenServices.test_dht_lookup_with_counterparty)
53118

54-
55119
if __name__ == '__main__':
56120
runner = unittest.TextTestRunner()
57121
runner.run(unittest.makeSuite(TestTunnelCommunity))

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ build-backend = "maturin"
55
[project]
66
name = "ipv8-rust-tunnels"
77
description = "IPv8 tunnel performance enhancements"
8-
requires-python = ">=3.8"
8+
requires-python = ">=3.9"
99
classifiers = [
1010
"Programming Language :: Rust",
1111
"Programming Language :: Python :: Implementation :: CPython",

0 commit comments

Comments
 (0)