From ec3c1a2d8817f6b261c5e644e1be839afd7659d9 Mon Sep 17 00:00:00 2001 From: Richard Gibson Date: Tue, 25 Feb 2025 10:28:16 -0500 Subject: [PATCH 1/9] chore(SwingSet): Remove unused code and comments --- packages/SwingSet/src/kernel/slogger.js | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/packages/SwingSet/src/kernel/slogger.js b/packages/SwingSet/src/kernel/slogger.js index 5ddb274ad97..488798384a0 100644 --- a/packages/SwingSet/src/kernel/slogger.js +++ b/packages/SwingSet/src/kernel/slogger.js @@ -96,7 +96,6 @@ export function makeDummySlogger(slogCallbacks, dummyConsole = badConsole) { }), vatConsole: wrap('vatConsole', () => dummyConsole), startup: wrap('startup', () => noopFinisher), - replayVatTranscript: wrap('replayVatTranscript', () => noopFinisher), delivery: wrap('delivery', () => noopFinisher), syscall: wrap('syscall', () => noopFinisher), changeCList: wrap('changeCList', () => noopFinisher), @@ -250,18 +249,6 @@ export function makeSlogger(slogCallbacks, writeObj) { return { vatSlog, starting: true }; } - function replayVatTranscript(vatID) { - safeWrite({ type: 'replay-transcript-start', vatID }); - function finish() { - safeWrite({ type: 'replay-transcript-finish', vatID }); - } - return harden(finish); - } - - // function annotateVat(vatID, data) { - // safeWrite({ type: 'annotate-vat', vatID, data }); - // } - const { wrap, done } = makeFinishersKit(slogCallbacks); const slogger = harden({ provideVatSlogger: wrap('provideVatSlogger', provideVatSlogger), @@ -271,8 +258,6 @@ export function makeSlogger(slogCallbacks, writeObj) { startup: wrap('startup', (vatID, ...args) => provideVatSlogger(vatID).vatSlog.startup(...args), ), - // TODO: Remove this seemingly dead code. - replayVatTranscript, delivery: wrap('delivery', (vatID, ...args) => provideVatSlogger(vatID).vatSlog.delivery(...args), ), From 9ab1630aa6f68b809baa9d164e8c096ded40097f Mon Sep 17 00:00:00 2001 From: Richard Gibson Date: Tue, 25 Feb 2025 10:34:24 -0500 Subject: [PATCH 2/9] fix(SwingSet): Include level in kernel console slog output Ref #10925 --- packages/SwingSet/src/controller/controller.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/SwingSet/src/controller/controller.js b/packages/SwingSet/src/controller/controller.js index de632ec9630..c4d09d5d455 100644 --- a/packages/SwingSet/src/controller/controller.js +++ b/packages/SwingSet/src/controller/controller.js @@ -216,7 +216,7 @@ export async function makeSwingsetController( const sloggingKernelConsole = makeLimitedConsole(level => { return (...args) => { kernelConsole[level](...args); - writeSlogObject({ type: 'console', source: 'kernel', args }); + writeSlogObject({ type: 'console', source: 'kernel', level, args }); }; }); writeSlogObject({ type: 'import-kernel-start' }); From fcfb9449640e04e3d4e7951c5ed7b6d7e4e448ef Mon Sep 17 00:00:00 2001 From: Richard Gibson Date: Tue, 25 Feb 2025 11:27:08 -0500 Subject: [PATCH 3/9] refactor(SwingSet): Simplify makeFinishersKit into addSlogCallbacks --- packages/SwingSet/src/kernel/slogger.js | 102 ++++++++++-------------- 1 file changed, 43 insertions(+), 59 deletions(-) diff --git a/packages/SwingSet/src/kernel/slogger.js b/packages/SwingSet/src/kernel/slogger.js index 488798384a0..609fb5398ef 100644 --- a/packages/SwingSet/src/kernel/slogger.js +++ b/packages/SwingSet/src/kernel/slogger.js @@ -1,4 +1,5 @@ import { q } from '@endo/errors'; +import { objectMap } from '@agoric/internal'; import { makeLimitedConsole } from '@agoric/internal/src/ses-utils.js'; /** @import {LimitedConsole} from '@agoric/internal/src/js-utils.js'; */ @@ -13,7 +14,7 @@ const noopFinisher = harden(() => {}); /** @typedef {Partial, (methodName: string, args: unknown[], finisher: AnyFinisher) => unknown>>} SlogWrappers */ /** - * Support composition of asynchronous callbacks that are invoked at the start + * Support asynchronous slog callbacks that are invoked at the start * of an operation and return either a non-function result or a "finisher" * function to be invoked upon operation completion. * This maker accepts a collection of wrapper functions that receive the same @@ -21,21 +22,17 @@ const noopFinisher = harden(() => {}); * (e.g., its finisher), and are expected to return a finisher of their own that * will invoke that wrapped finisher. * + * @template {Record} Methods * @param {SlogWrappers} wrappers + * @param {string} msg prefix for warn-level logging about unused callbacks + * @param {Methods} methods to wrap + * @returns {Methods} */ -function makeFinishersKit(wrappers) { +function addSlogCallbacks(wrappers, msg, methods) { const unused = new Set(Object.keys(wrappers)); - return harden({ - /** - * Robustly wrap a method if a wrapper is defined. - * - * @template {(...args: unknown[]) => (Finisher | unknown)} F - * @template {AnyFinisher} [Finisher=AnyFinisher] - * @param {string} method name - * @param {F} impl the original implementation - * @returns {F} the wrapped method - */ - wrap(method, impl) { + const wrappedMethods = /** @type {typeof methods} */ ( + objectMap(methods, (impl, methodKey) => { + const method = /** @type {keyof typeof wrappers} */ (methodKey); unused.delete(method); const wrapper = wrappers[method]; @@ -43,7 +40,7 @@ function makeFinishersKit(wrappers) { if (!wrapper) return impl; const wrapped = (...args) => { - const maybeFinisher = /** @type {Finisher} */ (impl(...args)); + const maybeFinisher = /** @type {AnyFinisher} */ (impl(...args)); try { // Allow the callback to observe the call synchronously, and replace // the implementation's finisher function, but not to throw an exception. @@ -53,7 +50,9 @@ function makeFinishersKit(wrappers) { // We wrap the finisher in the callback's return value. return (...finishArgs) => { try { - return /** @type {Finisher} */ (wrapperFinisher)(...finishArgs); + return /** @type {AnyFinisher} */ (wrapperFinisher)( + ...finishArgs, + ); } catch (e) { console.error(`${method} wrapper finisher failed:`, e); return maybeFinisher(...finishArgs); @@ -64,18 +63,13 @@ function makeFinishersKit(wrappers) { return maybeFinisher; } }; - return /** @type {F} */ (wrapped); - }, - /** - * Declare that all wrapping is done. - * - * @param {string} msg message to display if there are unused wrappers - */ - done(msg = 'Unused wrappers') { - if (!unused.size) return; - console.warn(msg, ...[...unused.keys()].sort().map(q)); - }, - }); + return /** @type {typeof impl} */ (/** @type {unknown} */ (wrapped)); + }) + ); + if (unused.size) { + console.warn(msg, ...[...unused.keys()].sort().map(q)); + } + return wrappedMethods; } export const badConsole = makeLimitedConsole(level => () => { @@ -89,21 +83,19 @@ export const noopConsole = makeLimitedConsole(_level => () => {}); * @returns {KernelSlog} */ export function makeDummySlogger(slogCallbacks, dummyConsole = badConsole) { - const { wrap, done } = makeFinishersKit(slogCallbacks); - const dummySlogger = harden({ - provideVatSlogger: wrap('provideVatSlogger', () => { - return harden({ vatSlog: { delivery: () => noopFinisher } }); - }), - vatConsole: wrap('vatConsole', () => dummyConsole), - startup: wrap('startup', () => noopFinisher), - delivery: wrap('delivery', () => noopFinisher), - syscall: wrap('syscall', () => noopFinisher), - changeCList: wrap('changeCList', () => noopFinisher), - terminateVat: wrap('terminateVat', () => noopFinisher), - write: noopFinisher, + const unusedWrapperPrefix = + 'Unused methods in makeDummySlogger slogCallbacks'; + const wrappedMethods = addSlogCallbacks(slogCallbacks, unusedWrapperPrefix, { + provideVatSlogger: () => + harden({ vatSlog: { delivery: () => noopFinisher } }), + vatConsole: () => dummyConsole, + startup: () => noopFinisher, + delivery: () => noopFinisher, + syscall: () => noopFinisher, + changeCList: () => noopFinisher, + terminateVat: () => noopFinisher, }); - done('Unused makeDummySlogger slogCallbacks method names'); - return dummySlogger; + return harden({ ...wrappedMethods, write: noopFinisher }); } /** @@ -249,29 +241,21 @@ export function makeSlogger(slogCallbacks, writeObj) { return { vatSlog, starting: true }; } - const { wrap, done } = makeFinishersKit(slogCallbacks); - const slogger = harden({ - provideVatSlogger: wrap('provideVatSlogger', provideVatSlogger), - vatConsole: wrap('vatConsole', (vatID, ...args) => + const unusedWrapperPrefix = 'Unused methods in makeSlogger slogCallbacks'; + const wrappedMethods = addSlogCallbacks(slogCallbacks, unusedWrapperPrefix, { + provideVatSlogger, + vatConsole: (vatID, ...args) => provideVatSlogger(vatID).vatSlog.vatConsole(...args), - ), - startup: wrap('startup', (vatID, ...args) => + startup: (vatID, ...args) => provideVatSlogger(vatID).vatSlog.startup(...args), - ), - delivery: wrap('delivery', (vatID, ...args) => + delivery: (vatID, ...args) => provideVatSlogger(vatID).vatSlog.delivery(...args), - ), - syscall: wrap('syscall', (vatID, ...args) => + syscall: (vatID, ...args) => provideVatSlogger(vatID).vatSlog.syscall(...args), - ), - changeCList: wrap('changeCList', (vatID, ...args) => + changeCList: (vatID, ...args) => provideVatSlogger(vatID).vatSlog.changeCList(...args), - ), - terminateVat: wrap('terminateVat', (vatID, ...args) => + terminateVat: (vatID, ...args) => provideVatSlogger(vatID).vatSlog.terminateVat(...args), - ), - write: safeWrite, }); - done('Unused makeSlogger slogCallbacks method names'); - return slogger; + return harden({ ...wrappedMethods, write: safeWrite }); } From d8a59477c89da0d4a3879ec925eac1b2393fee8a Mon Sep 17 00:00:00 2001 From: Richard Gibson Date: Tue, 25 Feb 2025 11:27:08 -0500 Subject: [PATCH 4/9] refactor(SwingSet): Improve addSlogCallbacks parameter/variable names --- packages/SwingSet/src/kernel/slogger.js | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/packages/SwingSet/src/kernel/slogger.js b/packages/SwingSet/src/kernel/slogger.js index 609fb5398ef..404df3505d6 100644 --- a/packages/SwingSet/src/kernel/slogger.js +++ b/packages/SwingSet/src/kernel/slogger.js @@ -23,18 +23,18 @@ const noopFinisher = harden(() => {}); * will invoke that wrapped finisher. * * @template {Record} Methods - * @param {SlogWrappers} wrappers - * @param {string} msg prefix for warn-level logging about unused callbacks + * @param {SlogWrappers} slogCallbacks + * @param {string} unusedMsgPrefix prefix for warn-level logging about unused callbacks * @param {Methods} methods to wrap * @returns {Methods} */ -function addSlogCallbacks(wrappers, msg, methods) { - const unused = new Set(Object.keys(wrappers)); - const wrappedMethods = /** @type {typeof methods} */ ( +function addSlogCallbacks(slogCallbacks, unusedMsgPrefix, methods) { + const unused = new Set(Object.keys(slogCallbacks)); + const wrappedMethods = /** @type {Methods} */ ( objectMap(methods, (impl, methodKey) => { - const method = /** @type {keyof typeof wrappers} */ (methodKey); - unused.delete(method); - const wrapper = wrappers[method]; + const methodName = /** @type {keyof typeof slogCallbacks} */ (methodKey); + unused.delete(methodName); + const wrapper = slogCallbacks[methodName]; // If there is no registered wrapper, return the implementation directly. if (!wrapper) return impl; @@ -44,7 +44,7 @@ function addSlogCallbacks(wrappers, msg, methods) { try { // Allow the callback to observe the call synchronously, and replace // the implementation's finisher function, but not to throw an exception. - const wrapperFinisher = wrapper(method, args, maybeFinisher); + const wrapperFinisher = wrapper(methodName, args, maybeFinisher); if (typeof maybeFinisher !== 'function') return wrapperFinisher; // We wrap the finisher in the callback's return value. @@ -54,12 +54,12 @@ function addSlogCallbacks(wrappers, msg, methods) { ...finishArgs, ); } catch (e) { - console.error(`${method} wrapper finisher failed:`, e); + console.error(`${methodName} wrapper finisher failed:`, e); return maybeFinisher(...finishArgs); } }; } catch (e) { - console.error(`${method} wrapper failed:`, e); + console.error(`${methodName} wrapper failed:`, e); return maybeFinisher; } }; @@ -67,7 +67,7 @@ function addSlogCallbacks(wrappers, msg, methods) { }) ); if (unused.size) { - console.warn(msg, ...[...unused.keys()].sort().map(q)); + console.warn(unusedMsgPrefix, ...[...unused.keys()].sort().map(q)); } return wrappedMethods; } From f83c01d89d80798e0922acdb498fcc7250560977 Mon Sep 17 00:00:00 2001 From: Richard Gibson Date: Tue, 25 Feb 2025 17:57:02 -0500 Subject: [PATCH 5/9] fix: Properly synchronize slog sender termination --- packages/cosmic-swingset/src/chain-main.js | 10 ++++-- packages/cosmic-swingset/src/launch-chain.js | 3 +- packages/internal/src/node/fs-stream.js | 32 +++++++++---------- packages/telemetry/src/flight-recorder.js | 19 +++++++---- .../telemetry/test/flight-recorder.test.js | 16 +++++++--- 5 files changed, 51 insertions(+), 29 deletions(-) diff --git a/packages/cosmic-swingset/src/chain-main.js b/packages/cosmic-swingset/src/chain-main.js index 117ed8ecbe2..3fda83719ae 100644 --- a/packages/cosmic-swingset/src/chain-main.js +++ b/packages/cosmic-swingset/src/chain-main.js @@ -443,7 +443,8 @@ export const makeLaunchChain = ( serviceName: TELEMETRY_SERVICE_NAME, }); - const slogSender = await (testingOverrides.slogSender || + const providedSlogSender = await testingOverrides.slogSender; + const slogSender = await (providedSlogSender || makeSlogSender({ stateDir: stateDBDir, env, @@ -559,7 +560,12 @@ export const makeLaunchChain = ( swingsetConfig, }); savedChainSends = s.savedChainSends; - return s; + const shutdown = async () => { + await s.shutdown?.(); + if (providedSlogSender) return; + await slogSender?.shutdown?.(); + }; + return { ...s, shutdown }; }; return launchChain; diff --git a/packages/cosmic-swingset/src/launch-chain.js b/packages/cosmic-swingset/src/launch-chain.js index f37951bfe0d..1eb9f568480 100644 --- a/packages/cosmic-swingset/src/launch-chain.js +++ b/packages/cosmic-swingset/src/launch-chain.js @@ -1393,7 +1393,8 @@ export async function launchAndShareInternals({ } async function shutdown() { - return controller.shutdown(); + await controller.shutdown(); + await afterCommitWorkDone; } function writeSlogObject(obj) { diff --git a/packages/internal/src/node/fs-stream.js b/packages/internal/src/node/fs-stream.js index 5d3da5edafe..d7e0a8c3014 100644 --- a/packages/internal/src/node/fs-stream.js +++ b/packages/internal/src/node/fs-stream.js @@ -1,6 +1,5 @@ -import { createWriteStream } from 'node:fs'; -import process from 'node:process'; import { open } from 'node:fs/promises'; +import process from 'node:process'; /** * @param {import('fs').ReadStream @@ -40,10 +39,6 @@ export const fsStreamReady = stream => stream.on('error', onError); }); -const noPath = /** @type {import('fs').PathLike} */ ( - /** @type {unknown} */ (undefined) -); - /** @typedef {NonNullable>>} FsStreamWriter */ /** @param {string | undefined | null} filePath */ export const makeFsStreamWriter = async filePath => { @@ -51,11 +46,13 @@ export const makeFsStreamWriter = async filePath => { return undefined; } - const handle = await (filePath !== '-' ? open(filePath, 'a') : undefined); - - const stream = handle - ? createWriteStream(noPath, { fd: handle.fd }) - : process.stdout; + const { handle, stream } = await (async () => { + if (filePath === '-') { + return { handle: undefined, stream: process.stdout }; + } + const fh = await open(filePath, 'a'); + return { handle: fh, stream: fh.createWriteStream({ flush: true }) }; + })(); await fsStreamReady(stream); let flushed = Promise.resolve(); @@ -77,7 +74,6 @@ export const makeFsStreamWriter = async filePath => { }; const write = async data => { - /** @type {Promise} */ const written = closed ? Promise.reject(Error('Stream closed')) : new Promise((resolve, reject) => { @@ -85,12 +81,15 @@ export const makeFsStreamWriter = async filePath => { if (err) { reject(err); } else { - resolve(); + resolve(undefined); } }); }); updateFlushed(written); - return written; + const waitForDrain = await written; + if (waitForDrain) { + await new Promise(resolve => stream.once('drain', resolve)); + } }; const flush = async () => { @@ -107,8 +106,9 @@ export const makeFsStreamWriter = async filePath => { // TODO: Consider creating a single Error here to use a write rejection closed = true; await flush(); - // @ts-expect-error calling a possibly missing method - stream.close?.(); + if (stream.end) { + await new Promise(resolve => stream.end(() => resolve(undefined))); + } }; stream.on('error', err => updateFlushed(Promise.reject(err))); diff --git a/packages/telemetry/src/flight-recorder.js b/packages/telemetry/src/flight-recorder.js index b88da377215..afd04222e3c 100644 --- a/packages/telemetry/src/flight-recorder.js +++ b/packages/telemetry/src/flight-recorder.js @@ -76,7 +76,7 @@ const initializeCircularBuffer = async (bufferFile, circularBufferSize) => { * @param {(outbuf: Uint8Array, readStart: number, firstReadLength: number) => void} readRecord * @param {(record: Uint8Array, firstWriteLength: number, circEnd: bigint) => Promise} writeRecord */ -function finishCircularBuffer(arenaSize, header, readRecord, writeRecord) { +function makeCircBufMethods(arenaSize, header, readRecord, writeRecord) { const readCircBuf = (outbuf, offset = 0) => { offset + outbuf.byteLength <= arenaSize || Fail`Reading past end of circular buffer`; @@ -269,14 +269,17 @@ export const makeSimpleCircularBuffer = async ({ await file.write(headerBuffer, undefined, undefined, 0); }; - return finishCircularBuffer(arenaSize, header, readRecord, writeRecord); + return { + fileHandle: file, + ...makeCircBufMethods(arenaSize, header, readRecord, writeRecord), + }; }; /** * - * @param {Pick, 'writeCircBuf'>} circBuf + * @param {Pick} circBuf */ -export const makeSlogSenderFromBuffer = ({ writeCircBuf }) => { +export const makeSlogSenderFromBuffer = ({ fileHandle, writeCircBuf }) => { /** @type {Promise} */ let toWrite = Promise.resolve(); const writeJSON = (obj, serialized = serializeSlogObj(obj)) => { @@ -289,6 +292,10 @@ export const makeSlogSenderFromBuffer = ({ writeCircBuf }) => { forceFlush: async () => { await toWrite; }, + shutdown: async () => { + await toWrite; + await fileHandle.close(); + }, usesJsonObject: true, }); }; @@ -299,6 +306,6 @@ export const makeSlogSenderFromBuffer = ({ writeCircBuf }) => { * @type {import('./index.js').MakeSlogSender} */ export const makeSlogSender = async opts => { - const { writeCircBuf } = await makeSimpleCircularBuffer(opts); - return makeSlogSenderFromBuffer({ writeCircBuf }); + const { fileHandle, writeCircBuf } = await makeSimpleCircularBuffer(opts); + return makeSlogSenderFromBuffer({ fileHandle, writeCircBuf }); }; diff --git a/packages/telemetry/test/flight-recorder.test.js b/packages/telemetry/test/flight-recorder.test.js index 27cfada2c3a..9db83830c9f 100644 --- a/packages/telemetry/test/flight-recorder.test.js +++ b/packages/telemetry/test/flight-recorder.test.js @@ -1,4 +1,5 @@ import fs from 'node:fs'; +import { promisify } from 'node:util'; import tmp from 'tmp'; import { test } from './prepare-test-env-ava.js'; @@ -17,12 +18,21 @@ const bufferTests = test.macro( async (t, input) => { const BUFFER_SIZE = 512; - const { name: tmpFile, removeCallback } = tmp.fileSync(); + const { + name: tmpFile, + fd, + removeCallback, + } = tmp.fileSync({ detachDescriptor: true }); + t.teardown(removeCallback); + const fileHandle = /** @type {import('fs/promises').FileHandle} */ ({ + close: promisify(fs.close.bind(fs, fd)), + }); const { readCircBuf, writeCircBuf } = await input.makeBuffer({ circularBufferSize: BUFFER_SIZE, circularBufferFilename: tmpFile, }); - const slogSender = makeSlogSenderFromBuffer({ writeCircBuf }); + const slogSender = makeSlogSenderFromBuffer({ fileHandle, writeCircBuf }); + t.teardown(slogSender.shutdown); slogSender({ type: 'start' }); await slogSender.forceFlush(); t.is(fs.readFileSync(tmpFile, { encoding: 'utf8' }).length, BUFFER_SIZE); @@ -73,8 +83,6 @@ const bufferTests = test.macro( slogSender(null, 'PRE-SERIALIZED'); await slogSender.forceFlush(); t.truthy(fs.readFileSync(tmpFile).includes('PRE-SERIALIZED')); - // console.log({ tmpFile }); - removeCallback(); }, ); From 318269e300dc4730750e8446aaefe100b148f73f Mon Sep 17 00:00:00 2001 From: Richard Gibson Date: Tue, 25 Feb 2025 18:28:21 -0500 Subject: [PATCH 6/9] chore(telemetry): Use more readable async patterns --- packages/internal/src/node/fs-stream.js | 17 +++++------------ packages/telemetry/src/index.js | 7 +++++-- packages/telemetry/src/make-slog-sender.js | 14 +++++++------- 3 files changed, 17 insertions(+), 21 deletions(-) diff --git a/packages/internal/src/node/fs-stream.js b/packages/internal/src/node/fs-stream.js index d7e0a8c3014..95866ca7a9c 100644 --- a/packages/internal/src/node/fs-stream.js +++ b/packages/internal/src/node/fs-stream.js @@ -1,5 +1,6 @@ import { open } from 'node:fs/promises'; import process from 'node:process'; +import { promisify } from 'node:util'; /** * @param {import('fs').ReadStream @@ -54,6 +55,8 @@ export const makeFsStreamWriter = async filePath => { return { handle: fh, stream: fh.createWriteStream({ flush: true }) }; })(); await fsStreamReady(stream); + const writeAsync = promisify(stream.write.bind(stream)); + const endAsync = stream.end && promisify(stream.end.bind(stream)); let flushed = Promise.resolve(); let closed = false; @@ -76,15 +79,7 @@ export const makeFsStreamWriter = async filePath => { const write = async data => { const written = closed ? Promise.reject(Error('Stream closed')) - : new Promise((resolve, reject) => { - stream.write(data, err => { - if (err) { - reject(err); - } else { - resolve(undefined); - } - }); - }); + : writeAsync(data); updateFlushed(written); const waitForDrain = await written; if (waitForDrain) { @@ -106,9 +101,7 @@ export const makeFsStreamWriter = async filePath => { // TODO: Consider creating a single Error here to use a write rejection closed = true; await flush(); - if (stream.end) { - await new Promise(resolve => stream.end(() => resolve(undefined))); - } + await endAsync?.(); }; stream.on('error', err => updateFlushed(Promise.reject(err))); diff --git a/packages/telemetry/src/index.js b/packages/telemetry/src/index.js index 13d69ad05a1..c0ef7b4a828 100644 --- a/packages/telemetry/src/index.js +++ b/packages/telemetry/src/index.js @@ -34,7 +34,10 @@ export const tryFlushSlogSender = async ( slogSender, { env = {}, log } = {}, ) => { - await Promise.resolve(slogSender?.forceFlush?.()).catch(err => { + await null; + try { + await slogSender?.forceFlush?.(); + } catch (err) { log?.('Failed to flush slog sender', err); if (err.errors) { for (const error of err.errors) { @@ -44,7 +47,7 @@ export const tryFlushSlogSender = async ( if (env.SLOGSENDER_FAIL_ON_ERROR) { throw err; } - }); + } }; export const getResourceAttributes = ({ diff --git a/packages/telemetry/src/make-slog-sender.js b/packages/telemetry/src/make-slog-sender.js index 94693661087..023d33aae0f 100644 --- a/packages/telemetry/src/make-slog-sender.js +++ b/packages/telemetry/src/make-slog-sender.js @@ -158,15 +158,15 @@ export const makeSlogSender = async (opts = {}) => { } }; return Object.assign(slogSender, { - forceFlush: async () => - PromiseAllOrErrors([ + forceFlush: async () => { + await PromiseAllOrErrors([ ...senders.map(sender => sender.forceFlush?.()), ...sendErrors.splice(0).map(err => Promise.reject(err)), - ]).then(() => {}), - shutdown: async () => - PromiseAllOrErrors(senders.map(sender => sender.shutdown?.())).then( - () => {}, - ), + ]); + }, + shutdown: async () => { + await PromiseAllOrErrors(senders.map(sender => sender.shutdown?.())); + }, usesJsonObject: hasSenderUsingJsonObj, }); } From fa1f04a29d3210d4290e3d3ec1b0b61d4c0ac560 Mon Sep 17 00:00:00 2001 From: Richard Gibson Date: Fri, 28 Feb 2025 13:09:12 -0500 Subject: [PATCH 7/9] chore(SwingSet): Prefer type Callable over Function --- packages/SwingSet/src/kernel/slogger.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/SwingSet/src/kernel/slogger.js b/packages/SwingSet/src/kernel/slogger.js index 404df3505d6..f8037b7ac6a 100644 --- a/packages/SwingSet/src/kernel/slogger.js +++ b/packages/SwingSet/src/kernel/slogger.js @@ -2,6 +2,7 @@ import { q } from '@endo/errors'; import { objectMap } from '@agoric/internal'; import { makeLimitedConsole } from '@agoric/internal/src/ses-utils.js'; +/** @import {Callable} from '@agoric/internal'; */ /** @import {LimitedConsole} from '@agoric/internal/src/js-utils.js'; */ const IDLE = 'idle'; @@ -22,7 +23,7 @@ const noopFinisher = harden(() => {}); * (e.g., its finisher), and are expected to return a finisher of their own that * will invoke that wrapped finisher. * - * @template {Record} Methods + * @template {Record} Methods * @param {SlogWrappers} slogCallbacks * @param {string} unusedMsgPrefix prefix for warn-level logging about unused callbacks * @param {Methods} methods to wrap From 117c766c38e70f76a683c2f070cddaa8287b7619 Mon Sep 17 00:00:00 2001 From: Richard Gibson Date: Fri, 28 Feb 2025 13:17:25 -0500 Subject: [PATCH 8/9] fix(internal): Exempt process.stdout from being closed by makeFsStreamWriter --- packages/internal/src/node/fs-stream.js | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/packages/internal/src/node/fs-stream.js b/packages/internal/src/node/fs-stream.js index 95866ca7a9c..7cf0dadf317 100644 --- a/packages/internal/src/node/fs-stream.js +++ b/packages/internal/src/node/fs-stream.js @@ -47,8 +47,9 @@ export const makeFsStreamWriter = async filePath => { return undefined; } + const useStdout = filePath === '-'; const { handle, stream } = await (async () => { - if (filePath === '-') { + if (useStdout) { return { handle: undefined, stream: process.stdout }; } const fh = await open(filePath, 'a'); @@ -56,7 +57,9 @@ export const makeFsStreamWriter = async filePath => { })(); await fsStreamReady(stream); const writeAsync = promisify(stream.write.bind(stream)); - const endAsync = stream.end && promisify(stream.end.bind(stream)); + const endAsync = useStdout + ? undefined + : stream.end && promisify(stream.end.bind(stream)); let flushed = Promise.resolve(); let closed = false; From 6c6fba4d2f4bcfbc49f51fd46d2fffa460ea4d78 Mon Sep 17 00:00:00 2001 From: Richard Gibson Date: Sat, 1 Mar 2025 01:03:02 -0500 Subject: [PATCH 9/9] refactor(internal): Prefer fs.WriteStream close() over stream.Writable end() --- packages/internal/src/node/fs-stream.js | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/packages/internal/src/node/fs-stream.js b/packages/internal/src/node/fs-stream.js index 7cf0dadf317..e7f3eeb38dc 100644 --- a/packages/internal/src/node/fs-stream.js +++ b/packages/internal/src/node/fs-stream.js @@ -57,9 +57,12 @@ export const makeFsStreamWriter = async filePath => { })(); await fsStreamReady(stream); const writeAsync = promisify(stream.write.bind(stream)); - const endAsync = useStdout - ? undefined - : stream.end && promisify(stream.end.bind(stream)); + const closeAsync = + useStdout || !(/** @type {any} */ (stream).close) + ? undefined + : promisify( + /** @type {import('fs').WriteStream} */ (stream).close.bind(stream), + ); let flushed = Promise.resolve(); let closed = false; @@ -104,7 +107,7 @@ export const makeFsStreamWriter = async filePath => { // TODO: Consider creating a single Error here to use a write rejection closed = true; await flush(); - await endAsync?.(); + await closeAsync?.(); }; stream.on('error', err => updateFlushed(Promise.reject(err)));