Skip to content

Commit

Permalink
[Fleet] Use streaming for package install instead of an assetsMap wit…
Browse files Browse the repository at this point in the history
…h everything loaded in memory (#211961)
  • Loading branch information
nchaulet authored Mar 4, 2025
1 parent 4447a70 commit d3d44de
Show file tree
Hide file tree
Showing 32 changed files with 694 additions and 541 deletions.
10 changes: 4 additions & 6 deletions x-pack/platform/plugins/shared/fleet/common/types/models/epm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,17 +132,15 @@ export interface ArchiveEntry {
}

export interface ArchiveIterator {
traverseEntries: (onEntry: (entry: ArchiveEntry) => Promise<void>) => Promise<void>;
traverseEntries: (
onEntry: (entry: ArchiveEntry) => Promise<void>,
readBuffer?: (path: string) => boolean
) => Promise<void>;
getPaths: () => Promise<string[]>;
}

export interface PackageInstallContext {
packageInfo: InstallablePackage;
/**
* @deprecated Use `archiveIterator` to access the package archive entries
* without loading them all into memory at once.
*/
assetsMap: AssetsMap;
paths: string[];
archiveIterator: ArchiveIterator;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ export const installPackageKibanaAssetsHandler: FleetRequestHandler<
packageInstallContext: {
packageInfo,
paths: installedPkgWithAssets.paths,
assetsMap: installedPkgWithAssets.assetsMap,
archiveIterator: createArchiveIteratorFromMap(installedPkgWithAssets.assetsMap),
},
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,15 @@ export const createArchiveIterator = (
*/
export const createArchiveIteratorFromMap = (assetsMap: AssetsMap): ArchiveIterator => {
const traverseEntries = async (
onEntry: (entry: ArchiveEntry) => Promise<void>
onEntry: (entry: ArchiveEntry) => Promise<void>,
readBuffer?: (path: string) => boolean
): Promise<void> => {
for (const [path, buffer] of assetsMap) {
await onEntry({ path, buffer });
if (readBuffer && !readBuffer(path)) {
await onEntry({ path });
} else {
await onEntry({ path, buffer });
}
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ import {
ElasticsearchAssetType,
type PackageInstallContext,
} from '../../../../../common/types/models';
import type { EsAssetReference, RegistryDataStream } from '../../../../../common/types/models';
import type {
AssetsMap,
EsAssetReference,
RegistryDataStream,
} from '../../../../../common/types/models';
import { updateEsAssetReferences } from '../../packages/es_assets_reference';
import { getAssetFromAssetsMap } from '../../archive';

Expand Down Expand Up @@ -40,7 +44,7 @@ export const installIlmForDataStream = async (
logger: Logger,
esReferences: EsAssetReference[]
) => {
const { packageInfo: registryPackage, paths, assetsMap } = packageInstallContext;
const { packageInfo: registryPackage, paths } = packageInstallContext;
const previousInstalledIlmEsAssets = esReferences.filter(
({ type }) => type === ElasticsearchAssetType.dataStreamIlmPolicy
);
Expand Down Expand Up @@ -72,6 +76,19 @@ export const installIlmForDataStream = async (
};

const dataStreamIlmPaths = paths.filter((path) => isDataStreamIlm(path));

const dataStreamIlmAssetsMap: AssetsMap = new Map();
await packageInstallContext.archiveIterator.traverseEntries(
async (entry) => {
if (!entry.buffer) {
return;
}

dataStreamIlmAssetsMap.set(entry.path, entry.buffer);
},
(path) => dataStreamIlmPaths.includes(path)
);

let installedIlms: EsAssetReference[] = [];
if (dataStreamIlmPaths.length > 0) {
const ilmPathDatasets = dataStreams.reduce<IlmPathDataset[]>((acc, dataStream) => {
Expand Down Expand Up @@ -103,7 +120,7 @@ export const installIlmForDataStream = async (
const ilmInstallations: IlmInstallation[] = ilmPathDatasets.map(
(ilmPathDataset: IlmPathDataset) => {
const content = JSON.parse(
getAssetFromAssetsMap(assetsMap, ilmPathDataset.path).toString('utf-8')
getAssetFromAssetsMap(dataStreamIlmAssetsMap, ilmPathDataset.path).toString('utf-8')
);
content.policy._meta = getESAssetMetadata({ packageName: registryPackage.name });

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import { updateEsAssetReferences } from '../../packages/es_assets_reference';
import { getESAssetMetadata } from '../meta';
import { retryTransientEsErrors } from '../retry';
import { PackageInvalidArchiveError } from '../../../../errors';
import type { PackageInstallContext } from '../../../../../common/types';
import type { AssetsMap, PackageInstallContext } from '../../../../../common/types';
import { MAX_CONCURRENT_ILM_POLICIES_OPERATIONS } from '../../../../constants';

export async function installILMPolicy(
Expand All @@ -30,10 +30,20 @@ export async function installILMPolicy(
const ilmPaths = packageInstallContext.paths.filter((path) => isILMPolicy(path));
if (!ilmPaths.length) return esReferences;

const ilmAssetsMap: AssetsMap = new Map();
await packageInstallContext.archiveIterator.traverseEntries(
async (entry) => {
if (!entry.buffer) {
return;
}

ilmAssetsMap.set(entry.path, entry.buffer);
},
(path) => ilmPaths.includes(path)
);

const ilmPolicies = ilmPaths.map((path) => {
const body = JSON.parse(
getAssetFromAssetsMap(packageInstallContext.assetsMap, path).toString('utf-8')
);
const body = JSON.parse(getAssetFromAssetsMap(ilmAssetsMap, path).toString('utf-8'));

body.policy._meta = getESAssetMetadata({ packageName: packageInfo.name });

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import { elasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks';
import { loggerMock } from '@kbn/logging-mocks';

import { createArchiveIteratorFromMap } from '../../archive/archive_iterator';

import { prepareToInstallPipelines } from './install';

jest.mock('../../archive/cache');
Expand All @@ -25,10 +27,10 @@ describe('Install pipeline tests', () => {
path: '/datasettest',
},
],
},
} as any,
paths: [],
assetsMap: new Map(),
} as any);
archiveIterator: createArchiveIteratorFromMap(new Map()),
});

expect(res.assetsToAdd).toEqual([{ id: 'logs-datasettest-1.0.0', type: 'ingest_pipeline' }]);
const esClient = elasticsearchClientMock.createInternalClient();
Expand Down Expand Up @@ -64,16 +66,18 @@ describe('Install pipeline tests', () => {
'packagetest-1.0.0/data_stream/datasettest/elasticsearch/ingest_pipeline/default.yml',
'packagetest-1.0.0/data_stream/datasettest/elasticsearch/ingest_pipeline/standard.yml',
],
assetsMap: new Map([
[
'packagetest-1.0.0/data_stream/datasettest/elasticsearch/ingest_pipeline/default.yml',
Buffer.from('description: test'),
],
[
'packagetest-1.0.0/data_stream/datasettest/elasticsearch/ingest_pipeline/standard.yml',
Buffer.from('description: test'),
],
]),
archiveIterator: createArchiveIteratorFromMap(
new Map([
[
'packagetest-1.0.0/data_stream/datasettest/elasticsearch/ingest_pipeline/default.yml',
Buffer.from('description: test'),
],
[
'packagetest-1.0.0/data_stream/datasettest/elasticsearch/ingest_pipeline/standard.yml',
Buffer.from('description: test'),
],
])
),
} as any);
expect(res.assetsToAdd).toEqual([
{ id: 'logs-datasettest-1.0.0', type: 'ingest_pipeline' },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import {
FLEET_EVENT_INGESTED_PIPELINE_CONTENT,
} from '../../../../constants';
import { getPipelineNameForDatastream } from '../../../../../common/services';
import type { ArchiveEntry, PackageInstallContext } from '../../../../../common/types';
import type { ArchiveEntry, AssetsMap, PackageInstallContext } from '../../../../../common/types';

import { appendMetadataToIngestPipeline } from '../meta';
import { retryTransientEsErrors } from '../retry';
Expand Down Expand Up @@ -157,6 +157,19 @@ export async function installAllPipelines({
> = [];
const substitutions: RewriteSubstitution[] = [];

const pipelineAssetsMap: AssetsMap = new Map();

await packageInstallContext.archiveIterator.traverseEntries(
async (entry) => {
if (!entry.buffer) {
return;
}

pipelineAssetsMap.set(entry.path, entry.buffer);
},
(path) => pipelinePaths.includes(path)
);

let datastreamPipelineCreated = false;
pipelinePaths.forEach((path) => {
const { name, extension } = getNameAndExtension(path);
Expand All @@ -169,7 +182,7 @@ export async function installAllPipelines({
dataStream,
packageVersion: packageInstallContext.packageInfo.version,
});
const content = getAssetFromAssetsMap(packageInstallContext.assetsMap, path).toString('utf-8');
const content = getAssetFromAssetsMap(pipelineAssetsMap, path).toString('utf-8');
pipelinesInfos.push({
nameForInstallation,
shouldInstallCustomPipelines: dataStream && isMainPipeline,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {
ElasticsearchAssetType,
type PackageInstallContext,
} from '../../../../../common/types/models';
import type { EsAssetReference } from '../../../../../common/types/models';
import type { AssetsMap, EsAssetReference } from '../../../../../common/types/models';

import { retryTransientEsErrors } from '../retry';

Expand All @@ -34,9 +34,19 @@ export const installMlModel = async (
const mlModelPath = packageInstallContext.paths.find((path) => isMlModel(path));

if (mlModelPath !== undefined) {
const content = getAssetFromAssetsMap(packageInstallContext.assetsMap, mlModelPath).toString(
'utf-8'
const mlModelAssetsMap: AssetsMap = new Map();
await packageInstallContext.archiveIterator.traverseEntries(
async (entry) => {
if (!entry.buffer) {
return;
}

mlModelAssetsMap.set(entry.path, entry.buffer);
},
(path) => path === mlModelPath
);

const content = getAssetFromAssetsMap(mlModelAssetsMap, mlModelPath).toString('utf-8');
const pathParts = mlModelPath.split('/');
const modelId = pathParts[pathParts.length - 1].replace('.json', '');

Expand Down
Loading

0 comments on commit d3d44de

Please sign in to comment.