Skip to content

Commit

Permalink
[Fleet] Improve reading package archive memory usage (#208869)
Browse files Browse the repository at this point in the history
## Summary

Related to #208210 

As we know the package size from the content-length header we can
improve how read the archive stream to a buffer.

## Benchmark 

<img width="710" alt="Screenshot 2025-01-29 at 9 23 59 PM"
src="https://github.com/user-attachments/assets/79dc1f20-938b-402e-a823-1ab26a07b78e"
/>
  • Loading branch information
nchaulet authored Jan 30, 2025
1 parent bf0b26d commit 0a0a4d8
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ import type { ArchiveIterator } from '../../../../common/types';

import { airGappedUtils } from '../airgapped';

import { fetchUrl, getResponse, getResponseStream } from './requests';
import { fetchUrl, getResponse, getResponseStreamWithSize } from './requests';
import { getRegistryUrl } from './registry_url';

export const splitPkgKey = split;
Expand Down Expand Up @@ -495,7 +495,9 @@ export async function fetchArchiveBuffer({
const registryUrl = getRegistryUrl();
const archiveUrl = `${registryUrl}${archivePath}`;
try {
const archiveBuffer = await getResponseStream(archiveUrl).then(streamToBuffer);
const archiveBuffer = await getResponseStreamWithSize(archiveUrl).then(({ stream, size }) =>
streamToBuffer(stream, size)
);
if (!archiveBuffer) {
logger.warn(`Archive Buffer not found`);
throw new ArchiveNotFoundError('Archive Buffer not found');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,23 @@ export async function getResponseStream(
throw new RegistryResponseError('isAirGapped config enabled, registry not reacheable');
}

export async function getResponseStreamWithSize(
url: string,
retries?: number
): Promise<{ stream: NodeJS.ReadableStream; size?: number }> {
const res = await getResponse(url, retries);
if (res) {
const contentLengthHeader = res.headers.get('Content-Length');
const contentLength = contentLengthHeader ? parseInt(contentLengthHeader, 10) : undefined;

return {
stream: res.body,
size: contentLength && !isNaN(contentLength) ? contentLength : undefined,
};
}
throw new RegistryResponseError('isAirGapped config enabled, registry not reacheable');
}

export async function fetchUrl(url: string, retries?: number): Promise<string> {
const logger = appContextService.getLogger();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export function bufferToStream(buffer: Buffer): PassThrough {

export function streamToString(stream: NodeJS.ReadableStream | Buffer): Promise<string> {
if (stream instanceof Buffer) return Promise.resolve(stream.toString());

return new Promise((resolve, reject) => {
const body: string[] = [];
stream.on('data', (chunk: string) => body.push(chunk));
Expand All @@ -23,7 +24,22 @@ export function streamToString(stream: NodeJS.ReadableStream | Buffer): Promise<
});
}

export function streamToBuffer(stream: NodeJS.ReadableStream): Promise<Buffer> {
export function streamToBuffer(stream: NodeJS.ReadableStream, size?: number): Promise<Buffer> {
if (size) {
return new Promise((resolve, reject) => {
const data = Buffer.alloc(size);
let pos = 0;

stream.on('data', (chunk: Buffer) => {
pos += chunk.copy(data, pos);
});
stream.on('end', () => {
resolve(data);
});
stream.on('error', reject);
});
}

return new Promise((resolve, reject) => {
const chunks: Buffer[] = [];
stream.on('data', (chunk) => chunks.push(Buffer.from(chunk)));
Expand Down

0 comments on commit 0a0a4d8

Please sign in to comment.