Skip to content

Commit

Permalink
fix: Properly synchronize slog sender termination (#11052)
Browse files Browse the repository at this point in the history
refs: #10925

## Description
This was prompted by #10925 (comment) (calling out missing `level` data for kernel console slog entries), but more fundamentally fixes the interaction between cosmic-swingset, the kernel, and the telemetry system (and performs a few cleanups along the way).

### Security Considerations
Security posture should not be affected.

### Scaling Considerations
None.

### Documentation Considerations
n/a

### Testing Considerations
Manual confirmation per #10776:

<details><summary>slog excerpt</summary>

```
{"type":"syscall","crankNum":407,"vatID":"v12","deliveryNum":13,"syscallNum":19,"replay":false,"ksc":["subscribe","v12","kp140"],"vsc":["subscribe","p+12"],"time":1740527497.795959,"monotime":64.620102064}
{"type":"syscall-result","crankNum":407,"vatID":"v12","deliveryNum":13,"syscallNum":19,"replay":false,"ksr":["ok",null],"vsr":["ok",null],"time":1740527497.79604,"monotime":64.620182996}
{"type":"clist","crankNum":407,"mode":"drop","vatID":"v12","kobj":"kp139","vobj":"p-61","time":1740527497.79643,"monotime":64.620573542}
{"type":"syscall","crankNum":407,"vatID":"v12","deliveryNum":13,"syscallNum":20,"replay":false,"ksc":["resolve","v12",[["kp139",false,{"body":"#\"$0.Alleged: IST payment\"","slots":["ko113"]}]]],"vsc":["resolve",[["p-61",false,{"body":"#\"$0.Alleged: IST payment\"","slots":["o-60"]}]]],"time":1740527497.796508,"monotime":64.620651781}
{"type":"syscall-result","crankNum":407,"vatID":"v12","deliveryNum":13,"syscallNum":20,"replay":false,"ksr":["ok",null],"vsr":["ok",null],"time":1740527497.796697,"monotime":64.62084073199999}
{"type":"syscall","crankNum":407,"vatID":"v12","deliveryNum":13,"syscallNum":21,"replay":false,"ksc":["vatstoreSet","v12","idCounters","{\"exportID\":23,\"collectionID\":14,\"promiseID\":13}"],"vsc":["vatstoreSet","idCounters","{\"exportID\":23,\"collectionID\":14,\"promiseID\":13}"],"time":1740527497.796941,"monotime":64.621084505}
{"type":"syscall-result","crankNum":407,"vatID":"v12","deliveryNum":13,"syscallNum":21,"replay":false,"ksr":["ok",null],"vsr":["ok",null],"time":1740527497.796998,"monotime":64.621141273}
{"type":"syscall","crankNum":407,"vatID":"v12","deliveryNum":13,"syscallNum":22,"replay":false,"ksc":["vatstoreSet","v12","vc.12.|schemata","{\"body\":\"#{\\\"keyShape\\\":{\\\"#tag\\\":\\\"match:scalar\\\",\\\"payload\\\":\\\"#undefined\\\"},\\\"label\\\":\\\"activeZCFSeats\\\"}\",\"slots\":[]}"],"vsc":["vatstoreSet","vc.12.|schemata","{\"body\":\"#{\\\"keyShape\\\":{\\\"#tag\\\":\\\"match:scalar\\\",\\\"payload\\\":\\\"#undefined\\\"},\\\"label\\\":\\\"activeZCFSeats\\\"}\",\"slots\":[]}"],"time":1740527497.797194,"monotime":64.621337016}
{"type":"syscall-result","crankNum":407,"vatID":"v12","deliveryNum":13,"syscallNum":22,"replay":false,"ksr":["ok",null],"vsr":["ok",null],"time":1740527497.797243,"monotime":64.62138665}
{"type":"syscall","crankNum":407,"vatID":"v12","deliveryNum":13,"syscallNum":23,"replay":false,"ksc":["vatstoreSet","v12","vc.13.|schemata","{\"body\":\"#{\\\"keyShape\\\":{\\\"#tag\\\":\\\"match:scalar\\\",\\\"payload\\\":\\\"#undefined\\\"},\\\"label\\\":\\\"zcfSeatToSeatHandle\\\"}\",\"slots\":[]}"],"vsc":["vatstoreSet","vc.13.|schemata","{\"body\":\"#{\\\"keyShape\\\":{\\\"#tag\\\":\\\"match:scalar\\\",\\\"payload\\\":\\\"#undefined\\\"},\\\"label\\\":\\\"zcfSeatToSeatHandle\\\"}\",\"slots\":[]}"],"time":1740527497.797417,"monotime":64.621560356}
{"type":"syscall-result","crankNum":407,"vatID":"v12","deliveryNum":13,"syscallNum":23,"replay":false,"ksr":["ok",null],"vsr":["ok",null],"time":1740527497.797454,"monotime":64.621596915}
{"type":"deliver-result","crankNum":407,"vatID":"v12","deliveryNum":13,"replay":false,"dr":["ok",null,{"meterType":"xs-meter-34","currentHeapCount":392885,"compute":143185,"allocate":46268416,"timestamps":[1740527497.788211,1740527497.788585,1740527497.789082,1740527497.789307,1740527497.789815,1740527497.790001,1740527497.790369,1740527497.790651,1740527497.790798,1740527497.7912,1740527497.791383,1740527497.791439,1740527497.791629,1740527497.791687,1740527497.791946,1740527497.791994,1740527497.7922,1740527497.79224,1740527497.792447,1740527497.792476,1740527497.792649,1740527497.792704,1740527497.792904,1740527497.793317,1740527497.793441,1740527497.793488,1740527497.793621,1740527497.793654,1740527497.793815,1740527497.793846,1740527497.794009,1740527497.794043,1740527497.794202,1740527497.794229,1740527497.794441,1740527497.794554,1740527497.794991,1740527497.795274,1740527497.795742,1740527497.795778,1740527497.796088,1740527497.796216,1740527497.79675,1740527497.796844,1740527497.797045,1740527497.79708,1740527497.797288,1740527497.797317,1740527497.797483,1740527497.797602]}],"time":1740527497.797708,"monotime":64.621851681}
{"type":"terminate","vatID":"v12","shouldReject":false,"info":{"body":"#\"payment retrieved\"","slots":[]},"time":1740527497.798019,"monotime":64.622162789}
{"type":"console","source":"kernel","level":"log","args":["kernel terminating vat v12 (failure=false)"],"time":1740527497.798286,"monotime":64.622429777}
{"type":"crank-finish","crankNum":407,"crankhash":"086f84650ada36d845ee2079ea0dfe6e6e08d6a587f6be46868e75eee0690a6b","activityhash":"ede8ca0d16de82e6eb8748dcfe475d71e908f334bdc7d564867f77384c6d0798","time":1740527497.80501,"monotime":64.629155006}
```

</details> 

### Upgrade Considerations
This fixes a late-discovered flaw in #10925 and would ideally be cherry-picked into agoric-upgrade-19, but if it doesn't make it in then we'll just be missing the kernel console `level` data until agoric-upgrade-20 (which is still acceptable).

Release verification should look for slog entries like the above (type "console", source "kernel", level "log"/"warn"/"error"/etc.).
  • Loading branch information
mergify[bot] authored and mujahidkay committed Mar 3, 2025
2 parents 0e5095e + 77b835b commit 745733a
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 128 deletions.
2 changes: 1 addition & 1 deletion packages/SwingSet/src/controller/controller.js
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ export async function makeSwingsetController(
const sloggingKernelConsole = makeLimitedConsole(level => {
return (...args) => {
kernelConsole[level](...args);
writeSlogObject({ type: 'console', source: 'kernel', args });
writeSlogObject({ type: 'console', source: 'kernel', level, args });
};
});
writeSlogObject({ type: 'import-kernel-start' });
Expand Down
132 changes: 51 additions & 81 deletions packages/SwingSet/src/kernel/slogger.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { q } from '@endo/errors';
import { objectMap } from '@agoric/internal';
import { makeLimitedConsole } from '@agoric/internal/src/ses-utils.js';

/** @import {Callable} from '@agoric/internal'; */
/** @import {LimitedConsole} from '@agoric/internal/src/js-utils.js'; */

const IDLE = 'idle';
Expand All @@ -13,69 +15,62 @@ const noopFinisher = harden(() => {});
/** @typedef {Partial<Record<Exclude<keyof KernelSlog, 'write'>, (methodName: string, args: unknown[], finisher: AnyFinisher) => unknown>>} SlogWrappers */

/**
* Support composition of asynchronous callbacks that are invoked at the start
* Support asynchronous slog callbacks that are invoked at the start
* of an operation and return either a non-function result or a "finisher"
* function to be invoked upon operation completion.
* This maker accepts a collection of wrapper functions that receive the same
* arguments as the method they wrap, along with the result of that method
* (e.g., its finisher), and are expected to return a finisher of their own that
* will invoke that wrapped finisher.
*
* @param {SlogWrappers} wrappers
* @template {Record<string, Callable>} Methods
* @param {SlogWrappers} slogCallbacks
* @param {string} unusedMsgPrefix prefix for warn-level logging about unused callbacks
* @param {Methods} methods to wrap
* @returns {Methods}
*/
function makeFinishersKit(wrappers) {
const unused = new Set(Object.keys(wrappers));
return harden({
/**
* Robustly wrap a method if a wrapper is defined.
*
* @template {(...args: unknown[]) => (Finisher | unknown)} F
* @template {AnyFinisher} [Finisher=AnyFinisher]
* @param {string} method name
* @param {F} impl the original implementation
* @returns {F} the wrapped method
*/
wrap(method, impl) {
unused.delete(method);
const wrapper = wrappers[method];
function addSlogCallbacks(slogCallbacks, unusedMsgPrefix, methods) {
const unused = new Set(Object.keys(slogCallbacks));
const wrappedMethods = /** @type {Methods} */ (
objectMap(methods, (impl, methodKey) => {
const methodName = /** @type {keyof typeof slogCallbacks} */ (methodKey);
unused.delete(methodName);
const wrapper = slogCallbacks[methodName];

// If there is no registered wrapper, return the implementation directly.
if (!wrapper) return impl;

const wrapped = (...args) => {
const maybeFinisher = /** @type {Finisher} */ (impl(...args));
const maybeFinisher = /** @type {AnyFinisher} */ (impl(...args));
try {
// Allow the callback to observe the call synchronously, and replace
// the implementation's finisher function, but not to throw an exception.
const wrapperFinisher = wrapper(method, args, maybeFinisher);
const wrapperFinisher = wrapper(methodName, args, maybeFinisher);
if (typeof maybeFinisher !== 'function') return wrapperFinisher;

// We wrap the finisher in the callback's return value.
return (...finishArgs) => {
try {
return /** @type {Finisher} */ (wrapperFinisher)(...finishArgs);
return /** @type {AnyFinisher} */ (wrapperFinisher)(
...finishArgs,
);
} catch (e) {
console.error(`${method} wrapper finisher failed:`, e);
console.error(`${methodName} wrapper finisher failed:`, e);
return maybeFinisher(...finishArgs);
}
};
} catch (e) {
console.error(`${method} wrapper failed:`, e);
console.error(`${methodName} wrapper failed:`, e);
return maybeFinisher;
}
};
return /** @type {F} */ (wrapped);
},
/**
* Declare that all wrapping is done.
*
* @param {string} msg message to display if there are unused wrappers
*/
done(msg = 'Unused wrappers') {
if (!unused.size) return;
console.warn(msg, ...[...unused.keys()].sort().map(q));
},
});
return /** @type {typeof impl} */ (/** @type {unknown} */ (wrapped));
})
);
if (unused.size) {
console.warn(unusedMsgPrefix, ...[...unused.keys()].sort().map(q));
}
return wrappedMethods;
}

export const badConsole = makeLimitedConsole(level => () => {
Expand All @@ -89,22 +84,19 @@ export const noopConsole = makeLimitedConsole(_level => () => {});
* @returns {KernelSlog}
*/
export function makeDummySlogger(slogCallbacks, dummyConsole = badConsole) {
const { wrap, done } = makeFinishersKit(slogCallbacks);
const dummySlogger = harden({
provideVatSlogger: wrap('provideVatSlogger', () => {
return harden({ vatSlog: { delivery: () => noopFinisher } });
}),
vatConsole: wrap('vatConsole', () => dummyConsole),
startup: wrap('startup', () => noopFinisher),
replayVatTranscript: wrap('replayVatTranscript', () => noopFinisher),
delivery: wrap('delivery', () => noopFinisher),
syscall: wrap('syscall', () => noopFinisher),
changeCList: wrap('changeCList', () => noopFinisher),
terminateVat: wrap('terminateVat', () => noopFinisher),
write: noopFinisher,
const unusedWrapperPrefix =
'Unused methods in makeDummySlogger slogCallbacks';
const wrappedMethods = addSlogCallbacks(slogCallbacks, unusedWrapperPrefix, {
provideVatSlogger: () =>
harden({ vatSlog: { delivery: () => noopFinisher } }),
vatConsole: () => dummyConsole,
startup: () => noopFinisher,
delivery: () => noopFinisher,
syscall: () => noopFinisher,
changeCList: () => noopFinisher,
terminateVat: () => noopFinisher,
});
done('Unused makeDummySlogger slogCallbacks method names');
return dummySlogger;
return harden({ ...wrappedMethods, write: noopFinisher });
}

/**
Expand Down Expand Up @@ -250,43 +242,21 @@ export function makeSlogger(slogCallbacks, writeObj) {
return { vatSlog, starting: true };
}

function replayVatTranscript(vatID) {
safeWrite({ type: 'replay-transcript-start', vatID });
function finish() {
safeWrite({ type: 'replay-transcript-finish', vatID });
}
return harden(finish);
}

// function annotateVat(vatID, data) {
// safeWrite({ type: 'annotate-vat', vatID, data });
// }

const { wrap, done } = makeFinishersKit(slogCallbacks);
const slogger = harden({
provideVatSlogger: wrap('provideVatSlogger', provideVatSlogger),
vatConsole: wrap('vatConsole', (vatID, ...args) =>
const unusedWrapperPrefix = 'Unused methods in makeSlogger slogCallbacks';
const wrappedMethods = addSlogCallbacks(slogCallbacks, unusedWrapperPrefix, {
provideVatSlogger,
vatConsole: (vatID, ...args) =>
provideVatSlogger(vatID).vatSlog.vatConsole(...args),
),
startup: wrap('startup', (vatID, ...args) =>
startup: (vatID, ...args) =>
provideVatSlogger(vatID).vatSlog.startup(...args),
),
// TODO: Remove this seemingly dead code.
replayVatTranscript,
delivery: wrap('delivery', (vatID, ...args) =>
delivery: (vatID, ...args) =>
provideVatSlogger(vatID).vatSlog.delivery(...args),
),
syscall: wrap('syscall', (vatID, ...args) =>
syscall: (vatID, ...args) =>
provideVatSlogger(vatID).vatSlog.syscall(...args),
),
changeCList: wrap('changeCList', (vatID, ...args) =>
changeCList: (vatID, ...args) =>
provideVatSlogger(vatID).vatSlog.changeCList(...args),
),
terminateVat: wrap('terminateVat', (vatID, ...args) =>
terminateVat: (vatID, ...args) =>
provideVatSlogger(vatID).vatSlog.terminateVat(...args),
),
write: safeWrite,
});
done('Unused makeSlogger slogCallbacks method names');
return slogger;
return harden({ ...wrappedMethods, write: safeWrite });
}
10 changes: 8 additions & 2 deletions packages/cosmic-swingset/src/chain-main.js
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,8 @@ export const makeLaunchChain = (
serviceName: TELEMETRY_SERVICE_NAME,
});

const slogSender = await (testingOverrides.slogSender ||
const providedSlogSender = await testingOverrides.slogSender;
const slogSender = await (providedSlogSender ||
makeSlogSender({
stateDir: stateDBDir,
env,
Expand Down Expand Up @@ -552,7 +553,12 @@ export const makeLaunchChain = (
swingsetConfig,
});
savedChainSends = s.savedChainSends;
return s;
const shutdown = async () => {
await s.shutdown?.();
if (providedSlogSender) return;
await slogSender?.shutdown?.();
};
return { ...s, shutdown };
};

return launchChain;
Expand Down
3 changes: 2 additions & 1 deletion packages/cosmic-swingset/src/launch-chain.js
Original file line number Diff line number Diff line change
Expand Up @@ -1393,7 +1393,8 @@ export async function launch({
}

async function shutdown() {
return controller.shutdown();
await controller.shutdown();
await afterCommitWorkDone;
}

function writeSlogObject(obj) {
Expand Down
47 changes: 23 additions & 24 deletions packages/internal/src/node/fs-stream.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { createWriteStream } from 'node:fs';
import process from 'node:process';
import { open } from 'node:fs/promises';
import process from 'node:process';
import { promisify } from 'node:util';

/**
* @param {import('fs').ReadStream
Expand Down Expand Up @@ -40,23 +40,29 @@ export const fsStreamReady = stream =>
stream.on('error', onError);
});

const noPath = /** @type {import('fs').PathLike} */ (
/** @type {unknown} */ (undefined)
);

/** @typedef {NonNullable<Awaited<ReturnType<typeof makeFsStreamWriter>>>} FsStreamWriter */
/** @param {string | undefined | null} filePath */
export const makeFsStreamWriter = async filePath => {
if (!filePath) {
return undefined;
}

const handle = await (filePath !== '-' ? open(filePath, 'a') : undefined);

const stream = handle
? createWriteStream(noPath, { fd: handle.fd })
: process.stdout;
const useStdout = filePath === '-';
const { handle, stream } = await (async () => {
if (useStdout) {
return { handle: undefined, stream: process.stdout };
}
const fh = await open(filePath, 'a');
return { handle: fh, stream: fh.createWriteStream({ flush: true }) };
})();
await fsStreamReady(stream);
const writeAsync = promisify(stream.write.bind(stream));
const closeAsync =
useStdout || !(/** @type {any} */ (stream).close)
? undefined
: promisify(
/** @type {import('fs').WriteStream} */ (stream).close.bind(stream),
);

let flushed = Promise.resolve();
let closed = false;
Expand All @@ -77,20 +83,14 @@ export const makeFsStreamWriter = async filePath => {
};

const write = async data => {
/** @type {Promise<void>} */
const written = closed
? Promise.reject(Error('Stream closed'))
: new Promise((resolve, reject) => {
stream.write(data, err => {
if (err) {
reject(err);
} else {
resolve();
}
});
});
: writeAsync(data);
updateFlushed(written);
return written;
const waitForDrain = await written;
if (waitForDrain) {
await new Promise(resolve => stream.once('drain', resolve));
}
};

const flush = async () => {
Expand All @@ -107,8 +107,7 @@ export const makeFsStreamWriter = async filePath => {
// TODO: Consider creating a single Error here to use a write rejection
closed = true;
await flush();
// @ts-expect-error calling a possibly missing method
stream.close?.();
await closeAsync?.();
};

stream.on('error', err => updateFlushed(Promise.reject(err)));
Expand Down
19 changes: 13 additions & 6 deletions packages/telemetry/src/flight-recorder.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ const initializeCircularBuffer = async (bufferFile, circularBufferSize) => {
* @param {(outbuf: Uint8Array, readStart: number, firstReadLength: number) => void} readRecord
* @param {(record: Uint8Array, firstWriteLength: number, circEnd: bigint) => Promise<void>} writeRecord
*/
function finishCircularBuffer(arenaSize, header, readRecord, writeRecord) {
function makeCircBufMethods(arenaSize, header, readRecord, writeRecord) {
const readCircBuf = (outbuf, offset = 0) => {
offset + outbuf.byteLength <= arenaSize ||
Fail`Reading past end of circular buffer`;
Expand Down Expand Up @@ -269,14 +269,17 @@ export const makeSimpleCircularBuffer = async ({
await file.write(headerBuffer, undefined, undefined, 0);
};

return finishCircularBuffer(arenaSize, header, readRecord, writeRecord);
return {
fileHandle: file,
...makeCircBufMethods(arenaSize, header, readRecord, writeRecord),
};
};

/**
*
* @param {Pick<EReturn<typeof makeSimpleCircularBuffer>, 'writeCircBuf'>} circBuf
* @param {Pick<CircularBuffer, 'fileHandle' | 'writeCircBuf'>} circBuf
*/
export const makeSlogSenderFromBuffer = ({ writeCircBuf }) => {
export const makeSlogSenderFromBuffer = ({ fileHandle, writeCircBuf }) => {
/** @type {Promise<void>} */
let toWrite = Promise.resolve();
const writeJSON = (obj, serialized = serializeSlogObj(obj)) => {
Expand All @@ -289,6 +292,10 @@ export const makeSlogSenderFromBuffer = ({ writeCircBuf }) => {
forceFlush: async () => {
await toWrite;
},
shutdown: async () => {
await toWrite;
await fileHandle.close();
},
usesJsonObject: true,
});
};
Expand All @@ -299,6 +306,6 @@ export const makeSlogSenderFromBuffer = ({ writeCircBuf }) => {
* @type {import('./index.js').MakeSlogSender}
*/
export const makeSlogSender = async opts => {
const { writeCircBuf } = await makeSimpleCircularBuffer(opts);
return makeSlogSenderFromBuffer({ writeCircBuf });
const { fileHandle, writeCircBuf } = await makeSimpleCircularBuffer(opts);
return makeSlogSenderFromBuffer({ fileHandle, writeCircBuf });
};
7 changes: 5 additions & 2 deletions packages/telemetry/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ export const tryFlushSlogSender = async (
slogSender,
{ env = {}, log } = {},
) => {
await Promise.resolve(slogSender?.forceFlush?.()).catch(err => {
await null;
try {
await slogSender?.forceFlush?.();
} catch (err) {
log?.('Failed to flush slog sender', err);
if (err.errors) {
for (const error of err.errors) {
Expand All @@ -44,7 +47,7 @@ export const tryFlushSlogSender = async (
if (env.SLOGSENDER_FAIL_ON_ERROR) {
throw err;
}
});
}
};

export const getResourceAttributes = ({
Expand Down
Loading

0 comments on commit 745733a

Please sign in to comment.