diff --git a/packages/SwingSet/src/controller/controller.js b/packages/SwingSet/src/controller/controller.js index de632ec9630..c4d09d5d455 100644 --- a/packages/SwingSet/src/controller/controller.js +++ b/packages/SwingSet/src/controller/controller.js @@ -216,7 +216,7 @@ export async function makeSwingsetController( const sloggingKernelConsole = makeLimitedConsole(level => { return (...args) => { kernelConsole[level](...args); - writeSlogObject({ type: 'console', source: 'kernel', args }); + writeSlogObject({ type: 'console', source: 'kernel', level, args }); }; }); writeSlogObject({ type: 'import-kernel-start' }); diff --git a/packages/SwingSet/src/kernel/slogger.js b/packages/SwingSet/src/kernel/slogger.js index 5ddb274ad97..f8037b7ac6a 100644 --- a/packages/SwingSet/src/kernel/slogger.js +++ b/packages/SwingSet/src/kernel/slogger.js @@ -1,6 +1,8 @@ import { q } from '@endo/errors'; +import { objectMap } from '@agoric/internal'; import { makeLimitedConsole } from '@agoric/internal/src/ses-utils.js'; +/** @import {Callable} from '@agoric/internal'; */ /** @import {LimitedConsole} from '@agoric/internal/src/js-utils.js'; */ const IDLE = 'idle'; @@ -13,7 +15,7 @@ const noopFinisher = harden(() => {}); /** @typedef {Partial, (methodName: string, args: unknown[], finisher: AnyFinisher) => unknown>>} SlogWrappers */ /** - * Support composition of asynchronous callbacks that are invoked at the start + * Support asynchronous slog callbacks that are invoked at the start * of an operation and return either a non-function result or a "finisher" * function to be invoked upon operation completion. * This maker accepts a collection of wrapper functions that receive the same @@ -21,61 +23,54 @@ const noopFinisher = harden(() => {}); * (e.g., its finisher), and are expected to return a finisher of their own that * will invoke that wrapped finisher. * - * @param {SlogWrappers} wrappers + * @template {Record} Methods + * @param {SlogWrappers} slogCallbacks + * @param {string} unusedMsgPrefix prefix for warn-level logging about unused callbacks + * @param {Methods} methods to wrap + * @returns {Methods} */ -function makeFinishersKit(wrappers) { - const unused = new Set(Object.keys(wrappers)); - return harden({ - /** - * Robustly wrap a method if a wrapper is defined. - * - * @template {(...args: unknown[]) => (Finisher | unknown)} F - * @template {AnyFinisher} [Finisher=AnyFinisher] - * @param {string} method name - * @param {F} impl the original implementation - * @returns {F} the wrapped method - */ - wrap(method, impl) { - unused.delete(method); - const wrapper = wrappers[method]; +function addSlogCallbacks(slogCallbacks, unusedMsgPrefix, methods) { + const unused = new Set(Object.keys(slogCallbacks)); + const wrappedMethods = /** @type {Methods} */ ( + objectMap(methods, (impl, methodKey) => { + const methodName = /** @type {keyof typeof slogCallbacks} */ (methodKey); + unused.delete(methodName); + const wrapper = slogCallbacks[methodName]; // If there is no registered wrapper, return the implementation directly. if (!wrapper) return impl; const wrapped = (...args) => { - const maybeFinisher = /** @type {Finisher} */ (impl(...args)); + const maybeFinisher = /** @type {AnyFinisher} */ (impl(...args)); try { // Allow the callback to observe the call synchronously, and replace // the implementation's finisher function, but not to throw an exception. - const wrapperFinisher = wrapper(method, args, maybeFinisher); + const wrapperFinisher = wrapper(methodName, args, maybeFinisher); if (typeof maybeFinisher !== 'function') return wrapperFinisher; // We wrap the finisher in the callback's return value. return (...finishArgs) => { try { - return /** @type {Finisher} */ (wrapperFinisher)(...finishArgs); + return /** @type {AnyFinisher} */ (wrapperFinisher)( + ...finishArgs, + ); } catch (e) { - console.error(`${method} wrapper finisher failed:`, e); + console.error(`${methodName} wrapper finisher failed:`, e); return maybeFinisher(...finishArgs); } }; } catch (e) { - console.error(`${method} wrapper failed:`, e); + console.error(`${methodName} wrapper failed:`, e); return maybeFinisher; } }; - return /** @type {F} */ (wrapped); - }, - /** - * Declare that all wrapping is done. - * - * @param {string} msg message to display if there are unused wrappers - */ - done(msg = 'Unused wrappers') { - if (!unused.size) return; - console.warn(msg, ...[...unused.keys()].sort().map(q)); - }, - }); + return /** @type {typeof impl} */ (/** @type {unknown} */ (wrapped)); + }) + ); + if (unused.size) { + console.warn(unusedMsgPrefix, ...[...unused.keys()].sort().map(q)); + } + return wrappedMethods; } export const badConsole = makeLimitedConsole(level => () => { @@ -89,22 +84,19 @@ export const noopConsole = makeLimitedConsole(_level => () => {}); * @returns {KernelSlog} */ export function makeDummySlogger(slogCallbacks, dummyConsole = badConsole) { - const { wrap, done } = makeFinishersKit(slogCallbacks); - const dummySlogger = harden({ - provideVatSlogger: wrap('provideVatSlogger', () => { - return harden({ vatSlog: { delivery: () => noopFinisher } }); - }), - vatConsole: wrap('vatConsole', () => dummyConsole), - startup: wrap('startup', () => noopFinisher), - replayVatTranscript: wrap('replayVatTranscript', () => noopFinisher), - delivery: wrap('delivery', () => noopFinisher), - syscall: wrap('syscall', () => noopFinisher), - changeCList: wrap('changeCList', () => noopFinisher), - terminateVat: wrap('terminateVat', () => noopFinisher), - write: noopFinisher, + const unusedWrapperPrefix = + 'Unused methods in makeDummySlogger slogCallbacks'; + const wrappedMethods = addSlogCallbacks(slogCallbacks, unusedWrapperPrefix, { + provideVatSlogger: () => + harden({ vatSlog: { delivery: () => noopFinisher } }), + vatConsole: () => dummyConsole, + startup: () => noopFinisher, + delivery: () => noopFinisher, + syscall: () => noopFinisher, + changeCList: () => noopFinisher, + terminateVat: () => noopFinisher, }); - done('Unused makeDummySlogger slogCallbacks method names'); - return dummySlogger; + return harden({ ...wrappedMethods, write: noopFinisher }); } /** @@ -250,43 +242,21 @@ export function makeSlogger(slogCallbacks, writeObj) { return { vatSlog, starting: true }; } - function replayVatTranscript(vatID) { - safeWrite({ type: 'replay-transcript-start', vatID }); - function finish() { - safeWrite({ type: 'replay-transcript-finish', vatID }); - } - return harden(finish); - } - - // function annotateVat(vatID, data) { - // safeWrite({ type: 'annotate-vat', vatID, data }); - // } - - const { wrap, done } = makeFinishersKit(slogCallbacks); - const slogger = harden({ - provideVatSlogger: wrap('provideVatSlogger', provideVatSlogger), - vatConsole: wrap('vatConsole', (vatID, ...args) => + const unusedWrapperPrefix = 'Unused methods in makeSlogger slogCallbacks'; + const wrappedMethods = addSlogCallbacks(slogCallbacks, unusedWrapperPrefix, { + provideVatSlogger, + vatConsole: (vatID, ...args) => provideVatSlogger(vatID).vatSlog.vatConsole(...args), - ), - startup: wrap('startup', (vatID, ...args) => + startup: (vatID, ...args) => provideVatSlogger(vatID).vatSlog.startup(...args), - ), - // TODO: Remove this seemingly dead code. - replayVatTranscript, - delivery: wrap('delivery', (vatID, ...args) => + delivery: (vatID, ...args) => provideVatSlogger(vatID).vatSlog.delivery(...args), - ), - syscall: wrap('syscall', (vatID, ...args) => + syscall: (vatID, ...args) => provideVatSlogger(vatID).vatSlog.syscall(...args), - ), - changeCList: wrap('changeCList', (vatID, ...args) => + changeCList: (vatID, ...args) => provideVatSlogger(vatID).vatSlog.changeCList(...args), - ), - terminateVat: wrap('terminateVat', (vatID, ...args) => + terminateVat: (vatID, ...args) => provideVatSlogger(vatID).vatSlog.terminateVat(...args), - ), - write: safeWrite, }); - done('Unused makeSlogger slogCallbacks method names'); - return slogger; + return harden({ ...wrappedMethods, write: safeWrite }); } diff --git a/packages/cosmic-swingset/src/chain-main.js b/packages/cosmic-swingset/src/chain-main.js index 0b163fa6200..31d1b260e03 100644 --- a/packages/cosmic-swingset/src/chain-main.js +++ b/packages/cosmic-swingset/src/chain-main.js @@ -439,7 +439,8 @@ export const makeLaunchChain = ( serviceName: TELEMETRY_SERVICE_NAME, }); - const slogSender = await (testingOverrides.slogSender || + const providedSlogSender = await testingOverrides.slogSender; + const slogSender = await (providedSlogSender || makeSlogSender({ stateDir: stateDBDir, env, @@ -552,7 +553,12 @@ export const makeLaunchChain = ( swingsetConfig, }); savedChainSends = s.savedChainSends; - return s; + const shutdown = async () => { + await s.shutdown?.(); + if (providedSlogSender) return; + await slogSender?.shutdown?.(); + }; + return { ...s, shutdown }; }; return launchChain; diff --git a/packages/cosmic-swingset/src/launch-chain.js b/packages/cosmic-swingset/src/launch-chain.js index 3bb3fd4b9f5..7f829c7582c 100644 --- a/packages/cosmic-swingset/src/launch-chain.js +++ b/packages/cosmic-swingset/src/launch-chain.js @@ -1393,7 +1393,8 @@ export async function launch({ } async function shutdown() { - return controller.shutdown(); + await controller.shutdown(); + await afterCommitWorkDone; } function writeSlogObject(obj) { diff --git a/packages/internal/src/node/fs-stream.js b/packages/internal/src/node/fs-stream.js index 5d3da5edafe..e7f3eeb38dc 100644 --- a/packages/internal/src/node/fs-stream.js +++ b/packages/internal/src/node/fs-stream.js @@ -1,6 +1,6 @@ -import { createWriteStream } from 'node:fs'; -import process from 'node:process'; import { open } from 'node:fs/promises'; +import process from 'node:process'; +import { promisify } from 'node:util'; /** * @param {import('fs').ReadStream @@ -40,10 +40,6 @@ export const fsStreamReady = stream => stream.on('error', onError); }); -const noPath = /** @type {import('fs').PathLike} */ ( - /** @type {unknown} */ (undefined) -); - /** @typedef {NonNullable>>} FsStreamWriter */ /** @param {string | undefined | null} filePath */ export const makeFsStreamWriter = async filePath => { @@ -51,12 +47,22 @@ export const makeFsStreamWriter = async filePath => { return undefined; } - const handle = await (filePath !== '-' ? open(filePath, 'a') : undefined); - - const stream = handle - ? createWriteStream(noPath, { fd: handle.fd }) - : process.stdout; + const useStdout = filePath === '-'; + const { handle, stream } = await (async () => { + if (useStdout) { + return { handle: undefined, stream: process.stdout }; + } + const fh = await open(filePath, 'a'); + return { handle: fh, stream: fh.createWriteStream({ flush: true }) }; + })(); await fsStreamReady(stream); + const writeAsync = promisify(stream.write.bind(stream)); + const closeAsync = + useStdout || !(/** @type {any} */ (stream).close) + ? undefined + : promisify( + /** @type {import('fs').WriteStream} */ (stream).close.bind(stream), + ); let flushed = Promise.resolve(); let closed = false; @@ -77,20 +83,14 @@ export const makeFsStreamWriter = async filePath => { }; const write = async data => { - /** @type {Promise} */ const written = closed ? Promise.reject(Error('Stream closed')) - : new Promise((resolve, reject) => { - stream.write(data, err => { - if (err) { - reject(err); - } else { - resolve(); - } - }); - }); + : writeAsync(data); updateFlushed(written); - return written; + const waitForDrain = await written; + if (waitForDrain) { + await new Promise(resolve => stream.once('drain', resolve)); + } }; const flush = async () => { @@ -107,8 +107,7 @@ export const makeFsStreamWriter = async filePath => { // TODO: Consider creating a single Error here to use a write rejection closed = true; await flush(); - // @ts-expect-error calling a possibly missing method - stream.close?.(); + await closeAsync?.(); }; stream.on('error', err => updateFlushed(Promise.reject(err))); diff --git a/packages/telemetry/src/flight-recorder.js b/packages/telemetry/src/flight-recorder.js index b88da377215..afd04222e3c 100644 --- a/packages/telemetry/src/flight-recorder.js +++ b/packages/telemetry/src/flight-recorder.js @@ -76,7 +76,7 @@ const initializeCircularBuffer = async (bufferFile, circularBufferSize) => { * @param {(outbuf: Uint8Array, readStart: number, firstReadLength: number) => void} readRecord * @param {(record: Uint8Array, firstWriteLength: number, circEnd: bigint) => Promise} writeRecord */ -function finishCircularBuffer(arenaSize, header, readRecord, writeRecord) { +function makeCircBufMethods(arenaSize, header, readRecord, writeRecord) { const readCircBuf = (outbuf, offset = 0) => { offset + outbuf.byteLength <= arenaSize || Fail`Reading past end of circular buffer`; @@ -269,14 +269,17 @@ export const makeSimpleCircularBuffer = async ({ await file.write(headerBuffer, undefined, undefined, 0); }; - return finishCircularBuffer(arenaSize, header, readRecord, writeRecord); + return { + fileHandle: file, + ...makeCircBufMethods(arenaSize, header, readRecord, writeRecord), + }; }; /** * - * @param {Pick, 'writeCircBuf'>} circBuf + * @param {Pick} circBuf */ -export const makeSlogSenderFromBuffer = ({ writeCircBuf }) => { +export const makeSlogSenderFromBuffer = ({ fileHandle, writeCircBuf }) => { /** @type {Promise} */ let toWrite = Promise.resolve(); const writeJSON = (obj, serialized = serializeSlogObj(obj)) => { @@ -289,6 +292,10 @@ export const makeSlogSenderFromBuffer = ({ writeCircBuf }) => { forceFlush: async () => { await toWrite; }, + shutdown: async () => { + await toWrite; + await fileHandle.close(); + }, usesJsonObject: true, }); }; @@ -299,6 +306,6 @@ export const makeSlogSenderFromBuffer = ({ writeCircBuf }) => { * @type {import('./index.js').MakeSlogSender} */ export const makeSlogSender = async opts => { - const { writeCircBuf } = await makeSimpleCircularBuffer(opts); - return makeSlogSenderFromBuffer({ writeCircBuf }); + const { fileHandle, writeCircBuf } = await makeSimpleCircularBuffer(opts); + return makeSlogSenderFromBuffer({ fileHandle, writeCircBuf }); }; diff --git a/packages/telemetry/src/index.js b/packages/telemetry/src/index.js index 13d69ad05a1..c0ef7b4a828 100644 --- a/packages/telemetry/src/index.js +++ b/packages/telemetry/src/index.js @@ -34,7 +34,10 @@ export const tryFlushSlogSender = async ( slogSender, { env = {}, log } = {}, ) => { - await Promise.resolve(slogSender?.forceFlush?.()).catch(err => { + await null; + try { + await slogSender?.forceFlush?.(); + } catch (err) { log?.('Failed to flush slog sender', err); if (err.errors) { for (const error of err.errors) { @@ -44,7 +47,7 @@ export const tryFlushSlogSender = async ( if (env.SLOGSENDER_FAIL_ON_ERROR) { throw err; } - }); + } }; export const getResourceAttributes = ({ diff --git a/packages/telemetry/src/make-slog-sender.js b/packages/telemetry/src/make-slog-sender.js index 94693661087..023d33aae0f 100644 --- a/packages/telemetry/src/make-slog-sender.js +++ b/packages/telemetry/src/make-slog-sender.js @@ -158,15 +158,15 @@ export const makeSlogSender = async (opts = {}) => { } }; return Object.assign(slogSender, { - forceFlush: async () => - PromiseAllOrErrors([ + forceFlush: async () => { + await PromiseAllOrErrors([ ...senders.map(sender => sender.forceFlush?.()), ...sendErrors.splice(0).map(err => Promise.reject(err)), - ]).then(() => {}), - shutdown: async () => - PromiseAllOrErrors(senders.map(sender => sender.shutdown?.())).then( - () => {}, - ), + ]); + }, + shutdown: async () => { + await PromiseAllOrErrors(senders.map(sender => sender.shutdown?.())); + }, usesJsonObject: hasSenderUsingJsonObj, }); } diff --git a/packages/telemetry/test/flight-recorder.test.js b/packages/telemetry/test/flight-recorder.test.js index 27cfada2c3a..9db83830c9f 100644 --- a/packages/telemetry/test/flight-recorder.test.js +++ b/packages/telemetry/test/flight-recorder.test.js @@ -1,4 +1,5 @@ import fs from 'node:fs'; +import { promisify } from 'node:util'; import tmp from 'tmp'; import { test } from './prepare-test-env-ava.js'; @@ -17,12 +18,21 @@ const bufferTests = test.macro( async (t, input) => { const BUFFER_SIZE = 512; - const { name: tmpFile, removeCallback } = tmp.fileSync(); + const { + name: tmpFile, + fd, + removeCallback, + } = tmp.fileSync({ detachDescriptor: true }); + t.teardown(removeCallback); + const fileHandle = /** @type {import('fs/promises').FileHandle} */ ({ + close: promisify(fs.close.bind(fs, fd)), + }); const { readCircBuf, writeCircBuf } = await input.makeBuffer({ circularBufferSize: BUFFER_SIZE, circularBufferFilename: tmpFile, }); - const slogSender = makeSlogSenderFromBuffer({ writeCircBuf }); + const slogSender = makeSlogSenderFromBuffer({ fileHandle, writeCircBuf }); + t.teardown(slogSender.shutdown); slogSender({ type: 'start' }); await slogSender.forceFlush(); t.is(fs.readFileSync(tmpFile, { encoding: 'utf8' }).length, BUFFER_SIZE); @@ -73,8 +83,6 @@ const bufferTests = test.macro( slogSender(null, 'PRE-SERIALIZED'); await slogSender.forceFlush(); t.truthy(fs.readFileSync(tmpFile).includes('PRE-SERIALIZED')); - // console.log({ tmpFile }); - removeCallback(); }, );