Skip to content

Commit

Permalink
feat: Implement systemd-notify directly
Browse files Browse the repository at this point in the history
Dependency on sd-notify removed.
sd-notify is wrapping libsystemd, which requires installing
libsystemd-dev for building, and loads libsystemd.so at runtime.
Since we only need the sd_notify part of the lib, and the protocol
is trivial to implement, do so.

We also now bail out early in case a watchdog timeout is set and
the notification module could not be loaded, as in this case
systemd would constantly kill and restart zigbee2mqtt, resulting
in unstable operation that might not be obvious at first sight.
  • Loading branch information
srett committed Feb 22, 2025
1 parent 67e1a02 commit 102bd14
Show file tree
Hide file tree
Showing 7 changed files with 165 additions and 45 deletions.
21 changes: 8 additions & 13 deletions lib/controller.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import type {IClientPublishOptions} from 'mqtt';
import type * as SdNotify from 'sd-notify';

import type {Zigbee2MQTTAPI} from './types/api';

Expand Down Expand Up @@ -30,12 +29,11 @@ import ExtensionReceive from './extension/receive';
import MQTT from './mqtt';
import State from './state';
import logger from './util/logger';
import * as sdNotify from './util/sd-notify';
import * as settings from './util/settings';
import utils from './util/utils';
import Zigbee from './zigbee';

type SdNotifyType = typeof SdNotify;

const AllExtensions = [
ExtensionPublish,
ExtensionReceive,
Expand Down Expand Up @@ -73,7 +71,6 @@ export class Controller {
private exitCallback: (code: number, restart: boolean) => Promise<void>;
private extensions: Extension[];
private extensionArgs: ExtensionArgs;
private sdNotify: SdNotifyType | undefined;

constructor(restartCallback: () => Promise<void>, exitCallback: (code: number, restart: boolean) => Promise<void>) {
logger.init();
Expand Down Expand Up @@ -129,11 +126,13 @@ export class Controller {
logger.info(`Starting Zigbee2MQTT version ${info.version} (commit #${info.commitHash})`);

try {
this.sdNotify = process.env.NOTIFY_SOCKET ? await import('sd-notify') : undefined;
await sdNotify.init();
logger.debug('sd-notify loaded');
/* v8 ignore start */
} catch {
logger.debug('sd-notify is not installed');
logger.error('sd-notify is not available, but service was started with Type=notify');
logger.error('Either make sure sd-notify is available, or switch service to Type=simple');
await this.exit(1);
}
/* v8 ignore stop */

Expand Down Expand Up @@ -198,11 +197,7 @@ export class Controller {

logger.info(`Zigbee2MQTT started!`);

const watchdogInterval = this.sdNotify?.watchdogInterval() || 0;
if (watchdogInterval > 0) {
this.sdNotify?.startWatchdogMode(Math.floor(watchdogInterval / 2));
}
this.sdNotify?.ready();
sdNotify.started();
}

@bind async enableDisableExtension(enable: boolean, name: string): Promise<void> {
Expand All @@ -227,7 +222,7 @@ export class Controller {
}

async stop(restart = false): Promise<void> {
this.sdNotify?.stopping(process.pid);
sdNotify.stopping();

// Call extensions
await this.callExtensions('stop', this.extensions);
Expand All @@ -246,7 +241,7 @@ export class Controller {
code = 1;
}

this.sdNotify?.stopWatchdogMode();
sdNotify.stopped();
return await this.exit(code, restart);
}

Expand Down
14 changes: 14 additions & 0 deletions lib/types/unix-dgram.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
declare module 'unix-dgram' {
import {EventEmitter} from 'events';
import {Buffer} from 'buffer';

export class UnixDgramSocket extends EventEmitter {
send(buf: Buffer, callback?: (err?: Error) => void): void;
send(buf: Buffer, offset: number, length: number, path: string, callback?: (err?: Error) => void): void;
bind(path: string): void;
connect(remotePath: string): void;
close(): void;
}

export function createSocket(type: 'unix_dgram', listener?: (msg: Buffer) => void): UnixDgramSocket;
}
60 changes: 60 additions & 0 deletions lib/util/sd-notify.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import type {UnixDgramSocket} from 'unix-dgram';

import logger from './logger';

// Handle sd_notify protocol, see https://www.freedesktop.org/software/systemd/man/latest/sd_notify.html
// All methods in here will be no-ops if run on unsupported platforms or without Type=notify

let socket: UnixDgramSocket | undefined;
let watchdog: NodeJS.Timeout | undefined;

function sendToSystemd(msg: string): void {
if (!socket) return;
const buffer = Buffer.from(msg);
socket.send(buffer, 0, buffer.byteLength, process.env.NOTIFY_SOCKET!, (err: Error | undefined) => {
/* v8 ignore start */
if (err) {
logger.warning(`Failed to send "${msg}" to systemd: ${err.message}`);
}
/* v8 ignore stop */
});
}

export async function init(): Promise<void> {
if (!process.env.NOTIFY_SOCKET) return;
try {
const {createSocket} = await import('unix-dgram');
socket = createSocket('unix_dgram');
/* v8 ignore start */
} catch (error) {
// Ignore error on Windows if not running on WSL, as UNIX sockets don't exist
// on Windows. Unlikely that NOTIFY_SOCKET is set anyways but better be safe.
if (process.platform === 'win32' && !process.env.WSL_DISTRO_NAME) return;
// Otherwise, pass on exception, so main process can bail out immediately
throw error;
}
/* v8 ignore start */
}

export function started(): void {
sendToSystemd('READY=1');
if (!socket || !process.env.WATCHDOG_USEC || watchdog) return;
const num = Math.max(0, parseInt(process.env.WATCHDOG_USEC, 10));
if (!num) {
logger.warning(`WATCHDOG_USEC invalid: "${process.env.WATCHDOG_USEC}", parsed to "${num}"`);
return;
}
// Convert us to ms, send twice as frequently as the timeout
const interval = num / 1000 / 2;
watchdog = setInterval(() => sendToSystemd('WATCHDOG=1'), interval);
}

export function stopping(): void {
sendToSystemd('STOPPING=1');
}

export function stopped(): void {
if (!watchdog) return;
clearInterval(watchdog);
watchdog = undefined;
}
3 changes: 1 addition & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@
"@types/node": "^22.13.4",
"@types/object-assign-deep": "^0.4.3",
"@types/readable-stream": "4.0.18",
"@types/sd-notify": "^2.8.2",
"@types/serve-static": "^1.15.7",
"@types/ws": "8.5.14",
"@vitest/coverage-v8": "^3.0.5",
Expand All @@ -98,6 +97,6 @@
"zigbee2mqtt": "cli.js"
},
"optionalDependencies": {
"sd-notify": "^2.8.0"
"unix-dgram": "^2.0.6"
}
}
24 changes: 3 additions & 21 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 0 additions & 9 deletions test/controller.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,8 @@ import {Controller as ZHController} from 'zigbee-herdsman';
import {Controller} from '../lib/controller';
import * as settings from '../lib/util/settings';

process.env.NOTIFY_SOCKET = 'mocked';
const LOG_MQTT_NS = 'z2m:mqtt';

vi.mock('sd-notify', () => ({
watchdogInterval: vi.fn(() => 3000),
startWatchdogMode: vi.fn(),
stopWatchdogMode: vi.fn(),
ready: vi.fn(),
stopping: vi.fn(),
}));

const mocksClear = [
mockZHController.stop,
mockMQTTEndAsync,
Expand Down
79 changes: 79 additions & 0 deletions test/sd-notify.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import {mockLogger} from './mocks/logger';

function expectMessage(nth: number, message: string): void {
expect(sendMock).toHaveBeenNthCalledWith(nth, Buffer.from(message), 0, expect.any(Number), 'mocked', expect.any(Function));
}

const sendMock = vi.fn();

vi.mock('unix-dgram', () => {
const mockUnixDgramSocket = {
send: sendMock,
};
return {
createSocket: vi.fn(() => mockUnixDgramSocket),
};
});

async function runTest(): Promise<void> {
const sd = await import('../lib/util/sd-notify');
await sd.init();
sd.started();
vi.advanceTimersByTime(6000);
sd.stopping();
sd.stopped();
}

const mocksClear = [mockLogger.log, mockLogger.debug, mockLogger.info, mockLogger.warning, mockLogger.error, sendMock];

describe('sd-notify', () => {
beforeAll(async () => {
vi.useFakeTimers();
});

afterAll(async () => {
vi.useRealTimers();
delete process.env.NOTIFY_SOCKET;
delete process.env.WATCHDOG_USEC;
});

beforeEach(() => {
vi.resetModules();
mocksClear.forEach((m) => m.mockClear());
});

it('No socket', async () => {
delete process.env.NOTIFY_SOCKET;
delete process.env.WATCHDOG_USEC;
await runTest();
expect(sendMock).toHaveBeenCalledTimes(0);
});

it('Socket only', async () => {
process.env.NOTIFY_SOCKET = 'mocked';
delete process.env.WATCHDOG_USEC;
await runTest();
expect(sendMock).toHaveBeenCalledTimes(2);
expectMessage(1, 'READY=1');
expectMessage(2, 'STOPPING=1');
});

it('Socket and watchdog', async () => {
process.env.NOTIFY_SOCKET = 'mocked';
process.env.WATCHDOG_USEC = '10000000';
await runTest();
expect(sendMock).toHaveBeenCalledTimes(3);
expectMessage(1, 'READY=1');
expectMessage(2, 'WATCHDOG=1');
expectMessage(3, 'STOPPING=1');
});

it('Invalid watchdog timeout', async () => {
process.env.NOTIFY_SOCKET = 'mocked';
process.env.WATCHDOG_USEC = 'mocked';
await runTest();
expect(sendMock).toHaveBeenCalledTimes(2);
expectMessage(1, 'READY=1');
expectMessage(2, 'STOPPING=1');
});
});

0 comments on commit 102bd14

Please sign in to comment.