Skip to content

Commit de7eab1

Browse files
committed
Merge branch 'detect-leaks-and-inform-user-des-1332'
2 parents 654de1c + 2583500 commit de7eab1

File tree

32 files changed

+2164
-52
lines changed

32 files changed

+2164
-52
lines changed

Cargo.lock

+263-9
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+5
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ members = [
1818
"mullvad-fs",
1919
"mullvad-ios",
2020
"mullvad-jni",
21+
"mullvad-leak-checker",
2122
"mullvad-management-interface",
2223
"mullvad-nsis",
2324
"mullvad-paths",
@@ -83,6 +84,7 @@ hickory-server = { version = "0.24.2", features = ["resolver"] }
8384
tokio = { version = "1.42" }
8485
parity-tokio-ipc = "0.9"
8586
futures = "0.3.15"
87+
8688
# Tonic and related crates
8789
tonic = "0.12.3"
8890
tonic-build = { version = "0.10.0", default-features = false }
@@ -93,6 +95,7 @@ hyper-util = {version = "0.1.8", features = ["client", "client-legacy", "http2",
9395

9496
env_logger = "0.10.0"
9597
thiserror = "2.0"
98+
anyhow = "1.0"
9699
log = "0.4"
97100

98101
shadowsocks = "1.20.3"
@@ -106,8 +109,10 @@ once_cell = "1.16"
106109
serde = "1.0.204"
107110
serde_json = "1.0.122"
108111

112+
pnet_packet = "0.35.0"
109113
ipnetwork = "0.20"
110114
tun = { version = "0.7", features = ["async"] }
115+
socket2 = "0.5.7"
111116

112117
# Test dependencies
113118
proptest = "1.4"

mullvad-cli/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ name = "mullvad"
1515
path = "src/main.rs"
1616

1717
[dependencies]
18-
anyhow = "1.0"
18+
anyhow = { workspace = true }
1919
chrono = { workspace = true }
2020
clap = { workspace = true }
2121
thiserror = { workspace = true }

mullvad-daemon/Cargo.toml

+4
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ workspace = true
1515
api-override = ["mullvad-api/api-override"]
1616

1717
[dependencies]
18+
anyhow = { workspace = true }
1819
chrono = { workspace = true }
1920
thiserror = { workspace = true }
2021
either = "1.11"
@@ -27,6 +28,7 @@ serde = { workspace = true, features = ["derive"] }
2728
serde_json = { workspace = true }
2829
tokio = { workspace = true, features = ["fs", "io-util", "rt-multi-thread", "sync", "time"] }
2930
tokio-stream = "0.1"
31+
socket2 = { workspace = true }
3032

3133
mullvad-relay-selector = { path = "../mullvad-relay-selector" }
3234
mullvad-types = { path = "../mullvad-types" }
@@ -35,11 +37,13 @@ mullvad-encrypted-dns-proxy = { path = "../mullvad-encrypted-dns-proxy" }
3537
mullvad-fs = { path = "../mullvad-fs" }
3638
mullvad-paths = { path = "../mullvad-paths" }
3739
mullvad-version = { path = "../mullvad-version" }
40+
mullvad-leak-checker = { path = "../mullvad-leak-checker", default-features = false }
3841
talpid-core = { path = "../talpid-core" }
3942
talpid-future = { path = "../talpid-future" }
4043
talpid-platform-metadata = { path = "../talpid-platform-metadata" }
4144
talpid-time = { path = "../talpid-time" }
4245
talpid-types = { path = "../talpid-types" }
46+
talpid-routing = { path = "../talpid-routing" }
4347

4448
clap = { workspace = true }
4549
log-panics = "2.0.0"
+260
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,260 @@
1+
use futures::{select, FutureExt};
2+
pub use mullvad_leak_checker::LeakInfo;
3+
use std::time::Duration;
4+
use talpid_routing::RouteManagerHandle;
5+
use talpid_types::{net::Endpoint, tunnel::TunnelStateTransition};
6+
use tokio::sync::mpsc;
7+
8+
/// An actor that tries to leak traffic outside the tunnel while we are connected.
9+
pub struct LeakChecker {
10+
task_event_tx: mpsc::UnboundedSender<TaskEvent>,
11+
}
12+
13+
/// [LeakChecker] internal task state.
14+
struct Task {
15+
events_rx: mpsc::UnboundedReceiver<TaskEvent>,
16+
route_manager: RouteManagerHandle,
17+
callbacks: Vec<Box<dyn LeakCheckerCallback>>,
18+
}
19+
20+
enum TaskEvent {
21+
NewTunnelState(TunnelStateTransition),
22+
AddCallback(Box<dyn LeakCheckerCallback>),
23+
}
24+
25+
#[derive(PartialEq, Eq)]
26+
pub enum CallbackResult {
27+
/// Callback completed successfully
28+
Ok,
29+
30+
/// Callback is no longer valid and should be dropped.
31+
Drop,
32+
}
33+
34+
pub trait LeakCheckerCallback: Send + 'static {
35+
fn on_leak(&mut self, info: LeakInfo) -> CallbackResult;
36+
}
37+
38+
impl LeakChecker {
39+
pub fn new(route_manager: RouteManagerHandle) -> Self {
40+
let (task_event_tx, events_rx) = mpsc::unbounded_channel();
41+
42+
let task = Task {
43+
events_rx,
44+
route_manager,
45+
callbacks: vec![],
46+
};
47+
48+
tokio::task::spawn(task.run());
49+
50+
LeakChecker { task_event_tx }
51+
}
52+
53+
/// Call when we transition to a new tunnel state.
54+
pub fn on_tunnel_state_transition(&mut self, tunnel_state: TunnelStateTransition) {
55+
self.send(TaskEvent::NewTunnelState(tunnel_state))
56+
}
57+
58+
/// Call `callback` if a leak is detected.
59+
pub fn add_leak_callback(&mut self, callback: impl LeakCheckerCallback) {
60+
self.send(TaskEvent::AddCallback(Box::new(callback)))
61+
}
62+
63+
/// Send a [TaskEvent] to the running [Task];
64+
fn send(&mut self, event: TaskEvent) {
65+
if self.task_event_tx.send(event).is_err() {
66+
panic!("LeakChecker unexpectedly closed");
67+
}
68+
}
69+
}
70+
71+
impl Task {
72+
async fn run(mut self) {
73+
loop {
74+
let Some(event) = self.events_rx.recv().await else {
75+
break; // All LeakChecker handles dropped.
76+
};
77+
78+
match event {
79+
TaskEvent::NewTunnelState(s) => self.on_new_tunnel_state(s).await,
80+
TaskEvent::AddCallback(c) => self.on_add_callback(c),
81+
}
82+
}
83+
}
84+
85+
fn on_add_callback(&mut self, c: Box<dyn LeakCheckerCallback>) {
86+
self.callbacks.push(c);
87+
}
88+
89+
async fn on_new_tunnel_state(&mut self, mut tunnel_state: TunnelStateTransition) {
90+
'leak_test: loop {
91+
let TunnelStateTransition::Connected(tunnel) = &tunnel_state else {
92+
break 'leak_test;
93+
};
94+
95+
let ping_destination = tunnel.endpoint;
96+
let route_manager = self.route_manager.clone();
97+
let leak_test = async {
98+
// Give the connection a little time to settle before starting the test.
99+
tokio::time::sleep(Duration::from_millis(5000)).await;
100+
101+
check_for_leaks(&route_manager, ping_destination).await
102+
};
103+
104+
// Make sure the tunnel state doesn't change while we're doing the leak test.
105+
// If that happens, then our results might be invalid.
106+
let another_tunnel_state = async {
107+
'listen_for_events: while let Some(event) = self.events_rx.recv().await {
108+
let new_state = match event {
109+
TaskEvent::NewTunnelState(tunnel_state) => tunnel_state,
110+
TaskEvent::AddCallback(c) => {
111+
self.on_add_callback(c);
112+
continue 'listen_for_events;
113+
}
114+
};
115+
116+
if let TunnelStateTransition::Connected(..) = new_state {
117+
// Still connected, all is well...
118+
} else {
119+
// Tunnel state changed! We have to discard the leak test and try again.
120+
tunnel_state = new_state;
121+
break 'listen_for_events;
122+
}
123+
}
124+
};
125+
126+
let leak_result = select! {
127+
// If tunnel state changes, restart the test.
128+
_ = another_tunnel_state.fuse() => continue 'leak_test,
129+
130+
leak_result = leak_test.fuse() => leak_result,
131+
};
132+
133+
let leak_info = match leak_result {
134+
Ok(Some(leak_info)) => leak_info,
135+
Ok(None) => {
136+
log::debug!("No leak detected");
137+
break 'leak_test;
138+
}
139+
Err(e) => {
140+
log::debug!("Leak check errored: {e:#?}");
141+
break 'leak_test;
142+
}
143+
};
144+
145+
log::debug!("Leak detected: {leak_info:?}");
146+
147+
self.callbacks
148+
.retain_mut(|callback| callback.on_leak(leak_info.clone()) == CallbackResult::Ok);
149+
150+
break 'leak_test;
151+
}
152+
}
153+
}
154+
155+
#[cfg(target_os = "android")]
156+
#[allow(clippy::unused_async)]
157+
async fn check_for_leaks(
158+
_route_manager: &RouteManagerHandle,
159+
_destination: Endpoint,
160+
) -> anyhow::Result<Option<LeakInfo>> {
161+
// TODO: We currently don't have a way to get the non-tunnel interface on Android.
162+
Ok(None)
163+
}
164+
165+
#[cfg(not(target_os = "android"))]
166+
async fn check_for_leaks(
167+
route_manager: &RouteManagerHandle,
168+
destination: Endpoint,
169+
) -> anyhow::Result<Option<LeakInfo>> {
170+
use anyhow::{anyhow, Context};
171+
use mullvad_leak_checker::{traceroute::TracerouteOpt, LeakStatus};
172+
173+
#[cfg(target_os = "linux")]
174+
let interface = {
175+
// By setting FWMARK, we are effectively getting the same route as when using split tunneling.
176+
let route = route_manager
177+
.get_destination_route(destination.address.ip(), Some(mullvad_types::TUNNEL_FWMARK))
178+
.await
179+
.context("Failed to get route to relay")?
180+
.ok_or(anyhow!("No route to relay"))?;
181+
182+
route
183+
.get_node()
184+
.get_device()
185+
.context("No device for default route")?
186+
.to_string()
187+
.into()
188+
};
189+
190+
#[cfg(target_os = "macos")]
191+
let interface = {
192+
let (v4_route, v6_route) = route_manager
193+
.get_default_routes()
194+
.await
195+
.context("Failed to get default interface")?;
196+
let index = if destination.address.is_ipv4() {
197+
let v4_route = v4_route.context("Missing IPv4 default interface")?;
198+
v4_route.interface_index
199+
} else {
200+
let v6_route = v6_route.context("Missing IPv6 default interface")?;
201+
v6_route.interface_index
202+
};
203+
204+
let index =
205+
std::num::NonZeroU32::try_from(u32::from(index)).context("Interface index was 0")?;
206+
mullvad_leak_checker::Interface::Index(index)
207+
};
208+
209+
#[cfg(target_os = "windows")]
210+
let interface = {
211+
use std::net::IpAddr;
212+
use talpid_windows::net::AddressFamily;
213+
214+
let _ = route_manager; // don't need this on windows
215+
216+
let family = match destination.address.ip() {
217+
IpAddr::V4(..) => AddressFamily::Ipv4,
218+
IpAddr::V6(..) => AddressFamily::Ipv6,
219+
};
220+
221+
let route = talpid_routing::get_best_default_route(family)
222+
.context("Failed to get best default route")?
223+
.ok_or_else(|| anyhow!("No default route found"))?;
224+
225+
mullvad_leak_checker::Interface::Luid(route.iface)
226+
};
227+
228+
log::debug!("Attempting to leak traffic on interface {interface:?} to {destination}");
229+
230+
mullvad_leak_checker::traceroute::try_run_leak_test(&TracerouteOpt {
231+
interface,
232+
destination: destination.address.ip(),
233+
234+
#[cfg(unix)]
235+
port: None,
236+
#[cfg(unix)]
237+
exclude_port: None,
238+
#[cfg(unix)]
239+
icmp: true,
240+
})
241+
.await
242+
.map_err(|e| anyhow!("{e:#}"))
243+
.map(|status| match status {
244+
LeakStatus::NoLeak => None,
245+
LeakStatus::LeakDetected(info) => Some(info),
246+
})
247+
}
248+
249+
impl<T> LeakCheckerCallback for T
250+
where
251+
T: FnMut(LeakInfo) -> bool + Send + 'static,
252+
{
253+
fn on_leak(&mut self, info: LeakInfo) -> CallbackResult {
254+
if self(info) {
255+
CallbackResult::Ok
256+
} else {
257+
CallbackResult::Drop
258+
}
259+
}
260+
}

0 commit comments

Comments
 (0)