Skip to content

Commit 018bc45

Browse files
Merge branch 'graceful-grpc-server-shutdown'
2 parents 79f4d12 + 4df215f commit 018bc45

File tree

7 files changed

+144
-147
lines changed

7 files changed

+144
-147
lines changed

mullvad-daemon/src/access_method.rs

+2-5
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::{api, settings, Daemon, EventListener};
1+
use crate::{api, settings, Daemon};
22
use mullvad_api::{proxy::ApiConnectionMode, rest, ApiProxy};
33
use mullvad_types::{
44
access_method::{self, AccessMethod, AccessMethodSetting},
@@ -28,10 +28,7 @@ pub enum Error {
2828
Settings(#[from] settings::Error),
2929
}
3030

31-
impl<L> Daemon<L>
32-
where
33-
L: EventListener,
34-
{
31+
impl Daemon {
3532
/// Add a [`AccessMethod`] to the daemon's settings.
3633
///
3734
/// If the daemon settings are successfully updated, the

mullvad-daemon/src/custom_list.rs

+2-5
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,12 @@
1-
use crate::{new_selector_config, Daemon, Error, EventListener};
1+
use crate::{new_selector_config, Daemon, Error};
22
use mullvad_types::{
33
constraints::Constraint,
44
custom_list::{CustomList, Id},
55
relay_constraints::{BridgeState, LocationConstraint, RelaySettings, ResolvedBridgeSettings},
66
};
77
use talpid_types::net::TunnelType;
88

9-
impl<L> Daemon<L>
10-
where
11-
L: EventListener,
12-
{
9+
impl Daemon {
1310
/// Create a new custom list.
1411
///
1512
/// Returns an error if the name is not unique.

mullvad-daemon/src/lib.rs

+40-50
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use futures::{
3737
StreamExt,
3838
};
3939
use geoip::GeoIpHandler;
40+
use management_interface::ManagementInterfaceServer;
4041
use mullvad_relay_selector::{
4142
AdditionalRelayConstraints, AdditionalWireguardConstraints, RelaySelector, SelectorConfig,
4243
};
@@ -111,6 +112,9 @@ pub enum Error {
111112
#[error("REST request failed")]
112113
RestError(#[source] mullvad_api::rest::Error),
113114

115+
#[error("Management interface error")]
116+
ManagementInterfaceError(#[source] management_interface::Error),
117+
114118
#[error("API availability check failed")]
115119
ApiCheckError(#[source] mullvad_api::availability::Error),
116120

@@ -549,40 +553,15 @@ where
549553
}
550554
}
551555

552-
/// Trait representing something that can broadcast daemon events.
553-
pub trait EventListener: Clone + Send + Sync + 'static {
554-
/// Notify that the tunnel state changed.
555-
fn notify_new_state(&self, new_state: TunnelState);
556-
557-
/// Notify that the settings changed.
558-
fn notify_settings(&self, settings: Settings);
559-
560-
/// Notify that the relay list changed.
561-
fn notify_relay_list(&self, relay_list: RelayList);
562-
563-
/// Notify that info about the latest available app version changed.
564-
/// Or some flag about the currently running version is changed.
565-
fn notify_app_version(&self, app_version_info: AppVersionInfo);
566-
567-
/// Notify that device changed (login, logout, or key rotation).
568-
fn notify_device_event(&self, event: DeviceEvent);
569-
570-
/// Notify that a device was revoked using `RemoveDevice`.
571-
fn notify_remove_device_event(&self, event: RemoveDeviceEvent);
572-
573-
/// Notify that the api access method changed.
574-
fn notify_new_access_method_event(&self, new_access_method: AccessMethodSetting);
575-
}
576-
577-
pub struct Daemon<L: EventListener> {
556+
pub struct Daemon {
578557
tunnel_state: TunnelState,
579558
target_state: PersistentTargetState,
580559
#[cfg(target_os = "linux")]
581560
exclude_pids: split_tunnel::PidManager,
582561
rx: mpsc::UnboundedReceiver<InternalDaemonEvent>,
583562
tx: DaemonEventSender,
584563
reconnection_job: Option<AbortHandle>,
585-
event_listener: L,
564+
management_interface: ManagementInterfaceServer,
586565
migration_complete: migrations::MigrationComplete,
587566
settings: SettingsPersister,
588567
account_history: account_history::AccountHistory,
@@ -602,26 +581,29 @@ pub struct Daemon<L: EventListener> {
602581
location_handler: GeoIpHandler,
603582
}
604583

605-
impl<L> Daemon<L>
606-
where
607-
L: EventListener,
608-
{
584+
impl Daemon {
585+
#[allow(clippy::too_many_arguments)]
609586
pub async fn start(
610587
log_dir: Option<PathBuf>,
611588
resource_dir: PathBuf,
612589
settings_dir: PathBuf,
613590
cache_dir: PathBuf,
614-
event_listener: L,
615-
command_channel: DaemonCommandChannel,
591+
rpc_socket_path: PathBuf,
616592
#[cfg(target_os = "android")] android_context: AndroidContext,
617593
) -> Result<Self, Error> {
618594
#[cfg(target_os = "macos")]
619595
macos::bump_filehandle_limit();
620596

621-
mullvad_api::proxy::ApiConnectionMode::try_delete_cache(&cache_dir).await;
597+
let command_channel = DaemonCommandChannel::new();
598+
let command_sender = command_channel.sender();
599+
600+
let management_interface =
601+
ManagementInterfaceServer::start(command_sender, rpc_socket_path)
602+
.map_err(Error::ManagementInterfaceError)?;
622603

623604
let (internal_event_tx, internal_event_rx) = command_channel.destructure();
624605

606+
mullvad_api::proxy::ApiConnectionMode::try_delete_cache(&cache_dir).await;
625607
let api_runtime = mullvad_api::Runtime::with_cache(
626608
&cache_dir,
627609
true,
@@ -644,7 +626,7 @@ where
644626
None
645627
});
646628

647-
let settings_event_listener = event_listener.clone();
629+
let settings_event_listener = management_interface.notifier().clone();
648630
let mut settings = SettingsPersister::load(&settings_dir).await;
649631
settings.register_change_listener(move |settings| {
650632
// Notify management interface server of changes to the settings
@@ -804,7 +786,7 @@ where
804786

805787
api::forward_offline_state(api_availability.clone(), offline_state_rx);
806788

807-
let relay_list_listener = event_listener.clone();
789+
let relay_list_listener = management_interface.notifier().clone();
808790
let on_relay_list_update = move |relay_list: &RelayList| {
809791
relay_list_listener.notify_relay_list(relay_list.clone());
810792
};
@@ -844,7 +826,7 @@ where
844826
rx: internal_event_rx,
845827
tx: internal_event_tx,
846828
reconnection_job: None,
847-
event_listener,
829+
management_interface,
848830
migration_complete,
849831
settings,
850832
account_history,
@@ -915,7 +897,7 @@ where
915897
/// be destroyed, and executing shutdown tasks
916898
async fn finalize(self) {
917899
let Daemon {
918-
event_listener,
900+
management_interface,
919901
shutdown_tasks,
920902
api_runtime,
921903
tunnel_state_machine_handle,
@@ -932,8 +914,9 @@ where
932914
account_manager.shutdown().await;
933915

934916
tunnel_state_machine_handle.try_join().await;
917+
// Wait for the management interface server to shut down
918+
management_interface.stop().await;
935919

936-
drop(event_listener);
937920
drop(api_runtime);
938921
}
939922

@@ -1042,7 +1025,9 @@ where
10421025
}
10431026

10441027
self.tunnel_state = tunnel_state.clone();
1045-
self.event_listener.notify_new_state(tunnel_state);
1028+
self.management_interface
1029+
.notifier()
1030+
.notify_new_state(tunnel_state);
10461031
self.fetch_am_i_mullvad();
10471032
}
10481033

@@ -1110,7 +1095,8 @@ where
11101095
_ => return,
11111096
};
11121097

1113-
self.event_listener
1098+
self.management_interface
1099+
.notifier()
11141100
.notify_new_state(self.tunnel_state.clone());
11151101
}
11161102

@@ -1125,7 +1111,8 @@ where
11251111
// Make sure to update the daemon's actual tunnel state. Otherwise feature indicator changes won't be persisted.
11261112
self.tunnel_state
11271113
.set_feature_indicators(new_feature_indicators);
1128-
self.event_listener
1114+
self.management_interface
1115+
.notifier()
11291116
.notify_new_state(self.tunnel_state.clone());
11301117
}
11311118
}
@@ -1287,7 +1274,9 @@ where
12871274
}
12881275

12891276
fn handle_new_app_version_info(&mut self, app_version_info: AppVersionInfo) {
1290-
self.event_listener.notify_app_version(app_version_info);
1277+
self.management_interface
1278+
.notifier()
1279+
.notify_app_version(app_version_info);
12911280
}
12921281

12931282
async fn handle_device_event(&mut self, event: AccountEvent) {
@@ -1338,7 +1327,8 @@ where
13381327
_ => (),
13391328
}
13401329
if let AccountEvent::Device(event) = event {
1341-
self.event_listener
1330+
self.management_interface
1331+
.notifier()
13421332
.notify_device_event(DeviceEvent::from(event));
13431333
}
13441334
}
@@ -1367,14 +1357,14 @@ where
13671357
// currently active access method. The announcement should be
13681358
// made after the firewall policy has been updated, since the
13691359
// new access method will be useless before then.
1370-
let event_listener = self.event_listener.clone();
1360+
let notifier = self.management_interface.notifier().clone();
13711361
tokio::spawn(async move {
13721362
// Wait for the firewall policy to be updated.
13731363
let _ = completion_rx.await;
13741364
// Let the emitter of this event know that the firewall has been updated.
13751365
let _ = endpoint_active_tx.send(());
13761366
// Notify clients about the change if necessary.
1377-
event_listener.notify_new_access_method_event(setting);
1367+
notifier.notify_new_access_method_event(setting);
13781368
});
13791369
}
13801370
}
@@ -1385,7 +1375,7 @@ where
13851375
result: Result<PrivateAccountAndDevice, device::Error>,
13861376
) {
13871377
let account_manager = self.account_manager.clone();
1388-
let event_listener = self.event_listener.clone();
1378+
let notifier = self.management_interface.notifier().clone();
13891379
tokio::spawn(async move {
13901380
if let Ok(Some(_)) = account_manager
13911381
.data_after_login()
@@ -1414,7 +1404,7 @@ where
14141404
new_state: DeviceState::LoggedOut,
14151405
},
14161406
};
1417-
event_listener.notify_device_event(event);
1407+
notifier.notify_device_event(event);
14181408
}
14191409
});
14201410
}
@@ -1639,7 +1629,7 @@ where
16391629
device_id: DeviceId,
16401630
) {
16411631
let device_service = self.account_manager.device_service.clone();
1642-
let event_listener = self.event_listener.clone();
1632+
let notifier = self.management_interface.notifier().clone();
16431633

16441634
tokio::spawn(async move {
16451635
let result = device_service
@@ -1648,7 +1638,7 @@ where
16481638
.map(move |new_devices| {
16491639
// FIXME: We should be able to get away with only returning the removed ID,
16501640
// and not have to request the list from the API.
1651-
event_listener.notify_remove_device_event(RemoveDeviceEvent {
1641+
notifier.notify_remove_device_event(RemoveDeviceEvent {
16521642
account_token,
16531643
new_devices,
16541644
});

mullvad-daemon/src/main.rs

+5-35
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,8 @@
1+
use std::{path::PathBuf, thread, time::Duration};
2+
13
#[cfg(not(windows))]
24
use mullvad_daemon::cleanup_old_rpc_socket;
3-
use mullvad_daemon::{
4-
logging,
5-
management_interface::{ManagementInterfaceEventBroadcaster, ManagementInterfaceServer},
6-
rpc_uniqueness_check, runtime, version, Daemon, DaemonCommandChannel, DaemonCommandSender,
7-
};
8-
use std::{
9-
path::{Path, PathBuf},
10-
thread,
11-
time::Duration,
12-
};
5+
use mullvad_daemon::{logging, rpc_uniqueness_check, runtime, version, Daemon};
136
use talpid_types::ErrorExt;
147

158
mod cli;
@@ -196,48 +189,25 @@ async fn run_standalone(log_dir: Option<PathBuf>) -> Result<(), String> {
196189
Ok(())
197190
}
198191

199-
async fn create_daemon(
200-
log_dir: Option<PathBuf>,
201-
) -> Result<Daemon<ManagementInterfaceEventBroadcaster>, String> {
192+
async fn create_daemon(log_dir: Option<PathBuf>) -> Result<Daemon, String> {
202193
let rpc_socket_path = mullvad_paths::get_rpc_socket_path();
203194
let resource_dir = mullvad_paths::get_resource_dir();
204195
let settings_dir = mullvad_paths::settings_dir()
205196
.map_err(|e| e.display_chain_with_msg("Unable to get settings dir"))?;
206197
let cache_dir = mullvad_paths::cache_dir()
207198
.map_err(|e| e.display_chain_with_msg("Unable to get cache dir"))?;
208199

209-
let command_channel = DaemonCommandChannel::new();
210-
let event_listener = spawn_management_interface(command_channel.sender(), rpc_socket_path)?;
211-
212200
Daemon::start(
213201
log_dir,
214202
resource_dir,
215203
settings_dir,
216204
cache_dir,
217-
event_listener,
218-
command_channel,
205+
rpc_socket_path,
219206
)
220207
.await
221208
.map_err(|e| e.display_chain_with_msg("Unable to initialize daemon"))
222209
}
223210

224-
fn spawn_management_interface(
225-
command_sender: DaemonCommandSender,
226-
rpc_socket_path: impl AsRef<Path>,
227-
) -> Result<ManagementInterfaceEventBroadcaster, String> {
228-
let event_broadcaster = ManagementInterfaceServer::start(command_sender, &rpc_socket_path)
229-
.map_err(|error| {
230-
error.display_chain_with_msg("Unable to start management interface server")
231-
})?;
232-
233-
log::info!(
234-
"Management interface listening on {}",
235-
rpc_socket_path.as_ref().display()
236-
);
237-
238-
Ok(event_broadcaster)
239-
}
240-
241211
#[cfg(unix)]
242212
fn running_as_admin() -> bool {
243213
let uid = unsafe { libc::getuid() };

0 commit comments

Comments
 (0)