Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Properly synchronize slog sender termination #11052

Merged
merged 9 commits into from
Mar 1, 2025
2 changes: 1 addition & 1 deletion packages/SwingSet/src/controller/controller.js
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ export async function makeSwingsetController(
const sloggingKernelConsole = makeLimitedConsole(level => {
return (...args) => {
kernelConsole[level](...args);
writeSlogObject({ type: 'console', source: 'kernel', args });
writeSlogObject({ type: 'console', source: 'kernel', level, args });
};
});
writeSlogObject({ type: 'import-kernel-start' });
Expand Down
132 changes: 51 additions & 81 deletions packages/SwingSet/src/kernel/slogger.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { q } from '@endo/errors';
import { objectMap } from '@agoric/internal';
import { makeLimitedConsole } from '@agoric/internal/src/ses-utils.js';

/** @import {Callable} from '@agoric/internal'; */
/** @import {LimitedConsole} from '@agoric/internal/src/js-utils.js'; */

const IDLE = 'idle';
Expand All @@ -13,69 +15,62 @@ const noopFinisher = harden(() => {});
/** @typedef {Partial<Record<Exclude<keyof KernelSlog, 'write'>, (methodName: string, args: unknown[], finisher: AnyFinisher) => unknown>>} SlogWrappers */

/**
* Support composition of asynchronous callbacks that are invoked at the start
* Support asynchronous slog callbacks that are invoked at the start
* of an operation and return either a non-function result or a "finisher"
* function to be invoked upon operation completion.
* This maker accepts a collection of wrapper functions that receive the same
* arguments as the method they wrap, along with the result of that method
* (e.g., its finisher), and are expected to return a finisher of their own that
* will invoke that wrapped finisher.
*
* @param {SlogWrappers} wrappers
* @template {Record<string, Callable>} Methods
* @param {SlogWrappers} slogCallbacks
* @param {string} unusedMsgPrefix prefix for warn-level logging about unused callbacks
* @param {Methods} methods to wrap
* @returns {Methods}
*/
function makeFinishersKit(wrappers) {
const unused = new Set(Object.keys(wrappers));
return harden({
/**
* Robustly wrap a method if a wrapper is defined.
*
* @template {(...args: unknown[]) => (Finisher | unknown)} F
* @template {AnyFinisher} [Finisher=AnyFinisher]
* @param {string} method name
* @param {F} impl the original implementation
* @returns {F} the wrapped method
*/
wrap(method, impl) {
unused.delete(method);
const wrapper = wrappers[method];
function addSlogCallbacks(slogCallbacks, unusedMsgPrefix, methods) {
const unused = new Set(Object.keys(slogCallbacks));
const wrappedMethods = /** @type {Methods} */ (
objectMap(methods, (impl, methodKey) => {
const methodName = /** @type {keyof typeof slogCallbacks} */ (methodKey);
unused.delete(methodName);
const wrapper = slogCallbacks[methodName];

// If there is no registered wrapper, return the implementation directly.
if (!wrapper) return impl;

const wrapped = (...args) => {
const maybeFinisher = /** @type {Finisher} */ (impl(...args));
const maybeFinisher = /** @type {AnyFinisher} */ (impl(...args));
try {
// Allow the callback to observe the call synchronously, and replace
// the implementation's finisher function, but not to throw an exception.
const wrapperFinisher = wrapper(method, args, maybeFinisher);
const wrapperFinisher = wrapper(methodName, args, maybeFinisher);
if (typeof maybeFinisher !== 'function') return wrapperFinisher;

// We wrap the finisher in the callback's return value.
return (...finishArgs) => {
try {
return /** @type {Finisher} */ (wrapperFinisher)(...finishArgs);
return /** @type {AnyFinisher} */ (wrapperFinisher)(
...finishArgs,
);
} catch (e) {
console.error(`${method} wrapper finisher failed:`, e);
console.error(`${methodName} wrapper finisher failed:`, e);
return maybeFinisher(...finishArgs);
}
};
} catch (e) {
console.error(`${method} wrapper failed:`, e);
console.error(`${methodName} wrapper failed:`, e);
return maybeFinisher;
}
};
return /** @type {F} */ (wrapped);
},
/**
* Declare that all wrapping is done.
*
* @param {string} msg message to display if there are unused wrappers
*/
done(msg = 'Unused wrappers') {
if (!unused.size) return;
console.warn(msg, ...[...unused.keys()].sort().map(q));
},
});
return /** @type {typeof impl} */ (/** @type {unknown} */ (wrapped));
})
);
if (unused.size) {
console.warn(unusedMsgPrefix, ...[...unused.keys()].sort().map(q));
}
return wrappedMethods;
}

export const badConsole = makeLimitedConsole(level => () => {
Expand All @@ -89,22 +84,19 @@ export const noopConsole = makeLimitedConsole(_level => () => {});
* @returns {KernelSlog}
*/
export function makeDummySlogger(slogCallbacks, dummyConsole = badConsole) {
const { wrap, done } = makeFinishersKit(slogCallbacks);
const dummySlogger = harden({
provideVatSlogger: wrap('provideVatSlogger', () => {
return harden({ vatSlog: { delivery: () => noopFinisher } });
}),
vatConsole: wrap('vatConsole', () => dummyConsole),
startup: wrap('startup', () => noopFinisher),
replayVatTranscript: wrap('replayVatTranscript', () => noopFinisher),
delivery: wrap('delivery', () => noopFinisher),
syscall: wrap('syscall', () => noopFinisher),
changeCList: wrap('changeCList', () => noopFinisher),
terminateVat: wrap('terminateVat', () => noopFinisher),
write: noopFinisher,
const unusedWrapperPrefix =
'Unused methods in makeDummySlogger slogCallbacks';
const wrappedMethods = addSlogCallbacks(slogCallbacks, unusedWrapperPrefix, {
provideVatSlogger: () =>
harden({ vatSlog: { delivery: () => noopFinisher } }),
vatConsole: () => dummyConsole,
startup: () => noopFinisher,
delivery: () => noopFinisher,
syscall: () => noopFinisher,
changeCList: () => noopFinisher,
terminateVat: () => noopFinisher,
});
done('Unused makeDummySlogger slogCallbacks method names');
return dummySlogger;
return harden({ ...wrappedMethods, write: noopFinisher });
}

/**
Expand Down Expand Up @@ -250,43 +242,21 @@ export function makeSlogger(slogCallbacks, writeObj) {
return { vatSlog, starting: true };
}

function replayVatTranscript(vatID) {
safeWrite({ type: 'replay-transcript-start', vatID });
function finish() {
safeWrite({ type: 'replay-transcript-finish', vatID });
}
return harden(finish);
}

// function annotateVat(vatID, data) {
// safeWrite({ type: 'annotate-vat', vatID, data });
// }

const { wrap, done } = makeFinishersKit(slogCallbacks);
const slogger = harden({
provideVatSlogger: wrap('provideVatSlogger', provideVatSlogger),
vatConsole: wrap('vatConsole', (vatID, ...args) =>
const unusedWrapperPrefix = 'Unused methods in makeSlogger slogCallbacks';
const wrappedMethods = addSlogCallbacks(slogCallbacks, unusedWrapperPrefix, {
provideVatSlogger,
vatConsole: (vatID, ...args) =>
provideVatSlogger(vatID).vatSlog.vatConsole(...args),
),
startup: wrap('startup', (vatID, ...args) =>
startup: (vatID, ...args) =>
provideVatSlogger(vatID).vatSlog.startup(...args),
),
// TODO: Remove this seemingly dead code.
replayVatTranscript,
delivery: wrap('delivery', (vatID, ...args) =>
delivery: (vatID, ...args) =>
provideVatSlogger(vatID).vatSlog.delivery(...args),
),
syscall: wrap('syscall', (vatID, ...args) =>
syscall: (vatID, ...args) =>
provideVatSlogger(vatID).vatSlog.syscall(...args),
),
changeCList: wrap('changeCList', (vatID, ...args) =>
changeCList: (vatID, ...args) =>
provideVatSlogger(vatID).vatSlog.changeCList(...args),
),
terminateVat: wrap('terminateVat', (vatID, ...args) =>
terminateVat: (vatID, ...args) =>
provideVatSlogger(vatID).vatSlog.terminateVat(...args),
),
write: safeWrite,
});
done('Unused makeSlogger slogCallbacks method names');
return slogger;
return harden({ ...wrappedMethods, write: safeWrite });
}
10 changes: 8 additions & 2 deletions packages/cosmic-swingset/src/chain-main.js
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,8 @@ export const makeLaunchChain = (
serviceName: TELEMETRY_SERVICE_NAME,
});

const slogSender = await (testingOverrides.slogSender ||
const providedSlogSender = await testingOverrides.slogSender;
const slogSender = await (providedSlogSender ||
makeSlogSender({
stateDir: stateDBDir,
env,
Expand Down Expand Up @@ -559,7 +560,12 @@ export const makeLaunchChain = (
swingsetConfig,
});
savedChainSends = s.savedChainSends;
return s;
const shutdown = async () => {
await s.shutdown?.();
if (providedSlogSender) return;
await slogSender?.shutdown?.();
};
return { ...s, shutdown };
};

return launchChain;
Expand Down
3 changes: 2 additions & 1 deletion packages/cosmic-swingset/src/launch-chain.js
Original file line number Diff line number Diff line change
Expand Up @@ -1393,7 +1393,8 @@ export async function launchAndShareInternals({
}

async function shutdown() {
return controller.shutdown();
await controller.shutdown();
await afterCommitWorkDone;
}

function writeSlogObject(obj) {
Expand Down
47 changes: 23 additions & 24 deletions packages/internal/src/node/fs-stream.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { createWriteStream } from 'node:fs';
import process from 'node:process';
import { open } from 'node:fs/promises';
import process from 'node:process';
import { promisify } from 'node:util';

/**
* @param {import('fs').ReadStream
Expand Down Expand Up @@ -40,23 +40,29 @@ export const fsStreamReady = stream =>
stream.on('error', onError);
});

const noPath = /** @type {import('fs').PathLike} */ (
/** @type {unknown} */ (undefined)
);

/** @typedef {NonNullable<Awaited<ReturnType<typeof makeFsStreamWriter>>>} FsStreamWriter */
/** @param {string | undefined | null} filePath */
export const makeFsStreamWriter = async filePath => {
if (!filePath) {
return undefined;
}

const handle = await (filePath !== '-' ? open(filePath, 'a') : undefined);

const stream = handle
? createWriteStream(noPath, { fd: handle.fd })
: process.stdout;
const useStdout = filePath === '-';
const { handle, stream } = await (async () => {
if (useStdout) {
return { handle: undefined, stream: process.stdout };
}
const fh = await open(filePath, 'a');
return { handle: fh, stream: fh.createWriteStream({ flush: true }) };
})();
await fsStreamReady(stream);
const writeAsync = promisify(stream.write.bind(stream));
const closeAsync =
useStdout || !(/** @type {any} */ (stream).close)
? undefined
: promisify(
/** @type {import('fs').WriteStream} */ (stream).close.bind(stream),
);

let flushed = Promise.resolve();
let closed = false;
Expand All @@ -77,20 +83,14 @@ export const makeFsStreamWriter = async filePath => {
};

const write = async data => {
/** @type {Promise<void>} */
const written = closed
? Promise.reject(Error('Stream closed'))
: new Promise((resolve, reject) => {
stream.write(data, err => {
if (err) {
reject(err);
} else {
resolve();
}
});
});
: writeAsync(data);
updateFlushed(written);
return written;
const waitForDrain = await written;
if (waitForDrain) {
await new Promise(resolve => stream.once('drain', resolve));
}
};
Comment on lines +90 to +93
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's a little more respectful of the drain signal.


const flush = async () => {
Expand All @@ -107,8 +107,7 @@ export const makeFsStreamWriter = async filePath => {
// TODO: Consider creating a single Error here to use a write rejection
closed = true;
await flush();
// @ts-expect-error calling a possibly missing method
stream.close?.();
await closeAsync?.();
};

stream.on('error', err => updateFlushed(Promise.reject(err)));
Expand Down
19 changes: 13 additions & 6 deletions packages/telemetry/src/flight-recorder.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ const initializeCircularBuffer = async (bufferFile, circularBufferSize) => {
* @param {(outbuf: Uint8Array, readStart: number, firstReadLength: number) => void} readRecord
* @param {(record: Uint8Array, firstWriteLength: number, circEnd: bigint) => Promise<void>} writeRecord
*/
function finishCircularBuffer(arenaSize, header, readRecord, writeRecord) {
function makeCircBufMethods(arenaSize, header, readRecord, writeRecord) {
const readCircBuf = (outbuf, offset = 0) => {
offset + outbuf.byteLength <= arenaSize ||
Fail`Reading past end of circular buffer`;
Expand Down Expand Up @@ -269,14 +269,17 @@ export const makeSimpleCircularBuffer = async ({
await file.write(headerBuffer, undefined, undefined, 0);
};

return finishCircularBuffer(arenaSize, header, readRecord, writeRecord);
return {
fileHandle: file,
...makeCircBufMethods(arenaSize, header, readRecord, writeRecord),
};
};

/**
*
* @param {Pick<EReturn<typeof makeSimpleCircularBuffer>, 'writeCircBuf'>} circBuf
* @param {Pick<CircularBuffer, 'fileHandle' | 'writeCircBuf'>} circBuf
*/
export const makeSlogSenderFromBuffer = ({ writeCircBuf }) => {
export const makeSlogSenderFromBuffer = ({ fileHandle, writeCircBuf }) => {
/** @type {Promise<void>} */
let toWrite = Promise.resolve();
const writeJSON = (obj, serialized = serializeSlogObj(obj)) => {
Expand All @@ -289,6 +292,10 @@ export const makeSlogSenderFromBuffer = ({ writeCircBuf }) => {
forceFlush: async () => {
await toWrite;
},
shutdown: async () => {
await toWrite;
await fileHandle.close();
},
usesJsonObject: true,
});
};
Expand All @@ -299,6 +306,6 @@ export const makeSlogSenderFromBuffer = ({ writeCircBuf }) => {
* @type {import('./index.js').MakeSlogSender}
*/
export const makeSlogSender = async opts => {
const { writeCircBuf } = await makeSimpleCircularBuffer(opts);
return makeSlogSenderFromBuffer({ writeCircBuf });
const { fileHandle, writeCircBuf } = await makeSimpleCircularBuffer(opts);
return makeSlogSenderFromBuffer({ fileHandle, writeCircBuf });
};
7 changes: 5 additions & 2 deletions packages/telemetry/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ export const tryFlushSlogSender = async (
slogSender,
{ env = {}, log } = {},
) => {
await Promise.resolve(slogSender?.forceFlush?.()).catch(err => {
await null;
try {
await slogSender?.forceFlush?.();
} catch (err) {
log?.('Failed to flush slog sender', err);
if (err.errors) {
for (const error of err.errors) {
Expand All @@ -44,7 +47,7 @@ export const tryFlushSlogSender = async (
if (env.SLOGSENDER_FAIL_ON_ERROR) {
throw err;
}
});
}
};

export const getResourceAttributes = ({
Expand Down
Loading
Loading