Skip to content

Commit

Permalink
[8.x] [Streams 🌊] Update simulator to assert fields & integration…
Browse files Browse the repository at this point in the history
… testing (elastic#206950) (elastic#207345)

# Backport

This will backport the following commits from `main` to `8.x`:
- [[Streams 🌊] Update simulator to assert fields & integration
testing (elastic#206950)](elastic#206950)

<!--- Backport version: 9.4.3 -->

### Questions ?
Please refer to the [Backport tool
documentation](https://github.com/sqren/backport)

<!--BACKPORT [{"author":{"name":"Marco Antonio
Ghiani","email":"marcoantonio.ghiani01@gmail.com"},"sourceCommit":{"committedDate":"2025-01-21T12:50:07Z","message":"[Streams
🌊] Update simulator to assert fields & integration testing
(elastic#206950)\n\n## 📓 Summary\r\n\r\nCloses
https://github.com/elastic/streams-program/issues/68\r\n\r\nThis work
updates the way a simulation for processing is performed,\r\nworking
against the `_ingest/_simulate` API.\r\nThis gives less specific
feedback on the simulation failure (which\r\nprocessor failed), but
allows for a much more realistic simulation\r\nagainst the index
configuration.\r\n\r\nThis work also adds integration testing for this
API.\r\n\r\n## 📔 Reviewer notes\r\n\r\nThe API is poorly typed due to
missing typing in the elasticsearch-js\r\nlibrary. elastic#204175 updates the
library with those typings, as soon as it's\r\nmerged I'll update the
API.\r\n\r\n## 🎥
Recordings\r\n\r\n\r\nhttps://github.com/user-attachments/assets/36ce0d3c-b7de-44d2-bdc2-84ff67fb4b25","sha":"39bf5e646fcaf31702dfe9fb17942d5aaea528ab","branchLabelMapping":{"^v9.0.0$":"main","^v8.18.0$":"8.x","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["release_note:skip","v9.0.0","backport:prev-minor","Feature:Streams"],"title":"[Streams
🌊] Update simulator to assert fields & integration
testing","number":206950,"url":"https://github.com/elastic/kibana/pull/206950","mergeCommit":{"message":"[Streams
🌊] Update simulator to assert fields & integration testing
(elastic#206950)\n\n## 📓 Summary\r\n\r\nCloses
https://github.com/elastic/streams-program/issues/68\r\n\r\nThis work
updates the way a simulation for processing is performed,\r\nworking
against the `_ingest/_simulate` API.\r\nThis gives less specific
feedback on the simulation failure (which\r\nprocessor failed), but
allows for a much more realistic simulation\r\nagainst the index
configuration.\r\n\r\nThis work also adds integration testing for this
API.\r\n\r\n## 📔 Reviewer notes\r\n\r\nThe API is poorly typed due to
missing typing in the elasticsearch-js\r\nlibrary. elastic#204175 updates the
library with those typings, as soon as it's\r\nmerged I'll update the
API.\r\n\r\n## 🎥
Recordings\r\n\r\n\r\nhttps://github.com/user-attachments/assets/36ce0d3c-b7de-44d2-bdc2-84ff67fb4b25","sha":"39bf5e646fcaf31702dfe9fb17942d5aaea528ab"}},"sourceBranch":"main","suggestedTargetBranches":[],"targetPullRequestStates":[{"branch":"main","label":"v9.0.0","branchLabelMappingKey":"^v9.0.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/206950","number":206950,"mergeCommit":{"message":"[Streams
🌊] Update simulator to assert fields & integration testing
(elastic#206950)\n\n## 📓 Summary\r\n\r\nCloses
https://github.com/elastic/streams-program/issues/68\r\n\r\nThis work
updates the way a simulation for processing is performed,\r\nworking
against the `_ingest/_simulate` API.\r\nThis gives less specific
feedback on the simulation failure (which\r\nprocessor failed), but
allows for a much more realistic simulation\r\nagainst the index
configuration.\r\n\r\nThis work also adds integration testing for this
API.\r\n\r\n## 📔 Reviewer notes\r\n\r\nThe API is poorly typed due to
missing typing in the elasticsearch-js\r\nlibrary. elastic#204175 updates the
library with those typings, as soon as it's\r\nmerged I'll update the
API.\r\n\r\n## 🎥
Recordings\r\n\r\n\r\nhttps://github.com/user-attachments/assets/36ce0d3c-b7de-44d2-bdc2-84ff67fb4b25","sha":"39bf5e646fcaf31702dfe9fb17942d5aaea528ab"}}]}]
BACKPORT-->

Co-authored-by: Marco Antonio Ghiani <marcoantonio.ghiani01@gmail.com>
  • Loading branch information
kibanamachine and tonyghiani authored Jan 21, 2025
1 parent 9573c53 commit 662c19b
Show file tree
Hide file tree
Showing 7 changed files with 455 additions and 84 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

export class DetectedMappingFailure extends Error {
constructor(message: string) {
super(message);
this.name = 'DetectedMappingFailure';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,37 @@
* 2.0.
*/

/* eslint-disable @typescript-eslint/naming-convention */

import { z } from '@kbn/zod';
import { notFound, internal, badRequest } from '@hapi/boom';
import { FieldDefinitionConfig, processingDefinitionSchema } from '@kbn/streams-schema';
import { calculateObjectDiff, flattenObject } from '@kbn/object-utils';
import {
IngestSimulateResponse,
IngestSimulateSimulateDocumentResult,
} from '@elastic/elasticsearch/lib/api/types';
FieldDefinitionConfig,
fieldDefinitionConfigSchema,
processingDefinitionSchema,
} from '@kbn/streams-schema';
import { calculateObjectDiff, flattenObject } from '@kbn/object-utils';
import { isEmpty } from 'lodash';
import { IScopedClusterClient } from '@kbn/core/server';
import { DetectedMappingFailure } from '../../../lib/streams/errors/detected_mapping_failure';
import { NonAdditiveProcessor } from '../../../lib/streams/errors/non_additive_processor';
import { SimulationFailed } from '../../../lib/streams/errors/simulation_failed';
import { formatToIngestProcessors } from '../../../lib/streams/helpers/processing';
import { createServerRoute } from '../../create_server_route';
import { DefinitionNotFound } from '../../../lib/streams/errors';
import { checkAccess } from '../../../lib/streams/stream_crud';

const paramsSchema = z.object({
path: z.object({ id: z.string() }),
body: z.object({
processing: z.array(processingDefinitionSchema),
documents: z.array(z.record(z.unknown())),
detected_fields: z.array(fieldDefinitionConfigSchema.extend({ name: z.string() })).optional(),
}),
});

type ProcessingSimulateParams = z.infer<typeof paramsSchema>;

export const simulateProcessorRoute = createServerRoute({
endpoint: 'POST /api/streams/{id}/processing/_simulate',
options: {
Expand All @@ -33,82 +48,153 @@ export const simulateProcessorRoute = createServerRoute({
'This API delegates security to the currently logged in user and their Elasticsearch permissions.',
},
},
params: z.object({
path: z.object({ id: z.string() }),
body: z.object({
processing: z.array(processingDefinitionSchema),
documents: z.array(z.record(z.unknown())),
}),
}),
handler: async ({ params, request, response, getScopedClients }) => {
params: paramsSchema,
handler: async ({ params, request, getScopedClients }) => {
try {
const { scopedClusterClient } = await getScopedClients({ request });

const hasAccess = await checkAccess({ id: params.path.id, scopedClusterClient });
if (!hasAccess) {
const { read } = await checkAccess({ id: params.path.id, scopedClusterClient });
if (!read) {
throw new DefinitionNotFound(`Stream definition for ${params.path.id} not found.`);
}
// Normalize processing definition to pipeline processors
const processors = formatToIngestProcessors(params.body.processing);
// Convert input documents to ingest simulation format
const docs = params.body.documents.map((doc) => ({ _source: doc }));

let simulationResult: IngestSimulateResponse;
try {
simulationResult = await scopedClusterClient.asCurrentUser.ingest.simulate({
verbose: true,
pipeline: { processors },
docs,
});
} catch (error) {
throw new SimulationFailed(error);
}

const simulationDiffs = computeSimulationDiffs(simulationResult, docs);
const simulationBody = prepareSimulationBody(params);

const updatedFields = computeUpdatedFields(simulationDiffs);
if (!isEmpty(updatedFields)) {
throw new NonAdditiveProcessor(
`The processor is not additive to the documents. It might update fields [${updatedFields.join()}]`
);
}
const simulationResult = await executeSimulation(scopedClusterClient, simulationBody);

const documents = computeSimulationDocuments(simulationResult, docs);
const detectedFields = computeDetectedFields(simulationDiffs);
const successRate = computeSuccessRate(simulationResult);
const failureRate = 1 - successRate;
const simulationDiffs = prepareSimulationDiffs(simulationResult, simulationBody.docs);

return {
documents,
success_rate: parseFloat(successRate.toFixed(2)),
failure_rate: parseFloat(failureRate.toFixed(2)),
detected_fields: detectedFields,
};
assertSimulationResult(simulationResult, simulationDiffs);

return prepareSimulationResponse(
simulationResult,
simulationBody.docs,
simulationDiffs,
params.body.detected_fields
);
} catch (error) {
if (error instanceof DefinitionNotFound) {
throw notFound(error);
}

if (error instanceof SimulationFailed || error instanceof NonAdditiveProcessor) {
if (
error instanceof SimulationFailed ||
error instanceof NonAdditiveProcessor ||
error instanceof DetectedMappingFailure
) {
throw badRequest(error);
}

throw internal(error);
}
},
});

const computeSimulationDiffs = (
simulation: IngestSimulateResponse,
const prepareSimulationBody = (params: ProcessingSimulateParams) => {
const { path, body } = params;
const { processing, documents, detected_fields } = body;

const processors = formatToIngestProcessors(processing);
const docs = documents.map((doc, id) => ({
_index: path.id,
_id: id.toString(),
_source: doc,
}));

const simulationBody: any = {
docs,
pipeline_substitutions: {
[`${path.id}@stream.processing`]: {
processors,
},
},
};

if (detected_fields) {
const properties = computeMappingProperties(detected_fields);
simulationBody.component_template_substitutions = {
[`${path.id}@stream.layer`]: {
template: {
mappings: {
properties,
},
},
},
};
}

return simulationBody;
};

// TODO: update type once Kibana updates to elasticsearch-js 8.17
const executeSimulation = async (
scopedClusterClient: IScopedClusterClient,
simulationBody: ReturnType<typeof prepareSimulationBody>
): Promise<any> => {
try {
// TODO: We should be using scopedClusterClient.asCurrentUser.simulate.ingest() once Kibana updates to elasticsearch-js 8.17
return await scopedClusterClient.asCurrentUser.transport.request({
method: 'POST',
path: `_ingest/_simulate`,
body: simulationBody,
});
} catch (error) {
throw new SimulationFailed(error);
}
};

const assertSimulationResult = (
simulationResult: Awaited<ReturnType<typeof executeSimulation>>,
simulationDiffs: ReturnType<typeof prepareSimulationDiffs>
) => {
// Assert mappings are compatible with the documents
const entryWithError = simulationResult.docs.find(isMappingFailure);
if (entryWithError) {
throw new DetectedMappingFailure(
`The detected field types might not be compatible with these documents. ${entryWithError.doc.error.reason}`
);
}
// Assert that the processors are purely additive to the documents
const updatedFields = computeUpdatedFields(simulationDiffs);
if (!isEmpty(updatedFields)) {
throw new NonAdditiveProcessor(
`The processor is not additive to the documents. It might update fields [${updatedFields.join()}]`
);
}
};

const prepareSimulationResponse = (
simulationResult: any,
docs: Array<{ _source: Record<string, unknown> }>,
simulationDiffs: ReturnType<typeof prepareSimulationDiffs>,
detectedFields?: ProcessingSimulateParams['body']['detected_fields']
) => {
const confirmedValidDetectedFields = computeMappingProperties(detectedFields ?? []);
const documents = computeSimulationDocuments(simulationResult, docs);
const detectedFieldsResult = computeDetectedFields(simulationDiffs, confirmedValidDetectedFields);
const successRate = computeSuccessRate(simulationResult);
const failureRate = 1 - successRate;

return {
documents,
success_rate: parseFloat(successRate.toFixed(2)),
failure_rate: parseFloat(failureRate.toFixed(2)),
detected_fields: detectedFieldsResult,
};
};

// TODO: update type once Kibana updates to elasticsearch-js 8.17
const prepareSimulationDiffs = (
simulation: any,
sampleDocs: Array<{ _source: Record<string, unknown> }>
) => {
// Since we filter out failed documents, we need to map the simulation docs to the sample docs for later retrieval
const samplesToSimulationMap = new Map(simulation.docs.map((doc, id) => [doc, sampleDocs[id]]));
const samplesToSimulationMap = new Map<any, { _source: Record<string, unknown> }>(
simulation.docs.map((entry: any, id: number) => [entry.doc, sampleDocs[id]])
);

const diffs = simulation.docs.filter(isSuccessfulDocument).map((doc) => {
const sample = samplesToSimulationMap.get(doc);
const diffs = simulation.docs.filter(isSuccessfulDocument).map((entry: any) => {
const sample = samplesToSimulationMap.get(entry.doc);
if (sample) {
return calculateObjectDiff(sample._source, doc.processor_results.at(-1)?.doc?._source);
return calculateObjectDiff(sample._source, entry.doc._source);
}

return calculateObjectDiff({});
Expand All @@ -117,25 +203,27 @@ const computeSimulationDiffs = (
return diffs;
};

const computeUpdatedFields = (simulationDiff: ReturnType<typeof computeSimulationDiffs>) => {
// TODO: update type once Kibana updates to elasticsearch-js 8.17
const computeUpdatedFields = (simulationDiff: ReturnType<typeof prepareSimulationDiffs>) => {
const diffs = simulationDiff
.map((simulatedDoc) => flattenObject(simulatedDoc.updated))
.map((simulatedDoc: any) => flattenObject(simulatedDoc.updated))
.flatMap(Object.keys);

const uniqueFields = [...new Set(diffs)];

return uniqueFields;
};

// TODO: update type once Kibana updates to elasticsearch-js 8.17
const computeSimulationDocuments = (
simulation: IngestSimulateResponse,
simulation: any,
sampleDocs: Array<{ _source: Record<string, unknown> }>
) => {
return simulation.docs.map((doc, id) => {
): Array<{ isMatch: boolean; value: Record<string, unknown> }> => {
return simulation.docs.map((entry: any, id: number) => {
// If every processor was successful, return and flatten the simulation doc from the last processor
if (isSuccessfulDocument(doc)) {
if (isSuccessfulDocument(entry)) {
return {
value: flattenObject(doc.processor_results.at(-1)?.doc?._source ?? sampleDocs[id]._source),
value: flattenObject(entry.doc._source ?? sampleDocs[id]._source),
isMatch: true,
};
}
Expand All @@ -148,32 +236,44 @@ const computeSimulationDocuments = (
};

const computeDetectedFields = (
simulationDiff: ReturnType<typeof computeSimulationDiffs>
simulationDiff: ReturnType<typeof prepareSimulationDiffs>,
confirmedValidDetectedFields: Record<string, { type: FieldDefinitionConfig['type'] | 'unmapped' }>
): Array<{
name: string;
type: FieldDefinitionConfig['type'] | 'unmapped';
}> => {
const diffs = simulationDiff
.map((simulatedDoc) => flattenObject(simulatedDoc.added))
const diffs: string[] = simulationDiff
.map((simulatedDoc: any) => flattenObject(simulatedDoc.added))
.flatMap(Object.keys);

const uniqueFields = [...new Set(diffs)];

return uniqueFields.map((name) => ({ name, type: 'unmapped' }));
return uniqueFields.map((name: string) => ({
name,
type: confirmedValidDetectedFields[name]?.type || 'unmapped',
}));
};

const computeSuccessRate = (simulation: IngestSimulateResponse) => {
const successfulCount = simulation.docs.reduce((rate, doc) => {
return (rate += isSuccessfulDocument(doc) ? 1 : 0);
// TODO: update type once Kibana updates to elasticsearch-js 8.17
const computeSuccessRate = (simulation: any) => {
const successfulCount = simulation.docs.reduce((rate: number, entry: any) => {
return (rate += isSuccessfulDocument(entry) ? 1 : 0);
}, 0);

return successfulCount / simulation.docs.length;
};

const isSuccessfulDocument = (
doc: IngestSimulateSimulateDocumentResult
): doc is Required<IngestSimulateSimulateDocumentResult> =>
doc.processor_results?.every((processorSimulation) => processorSimulation.status === 'success') ||
false;
const computeMappingProperties = (
detectedFields: NonNullable<ProcessingSimulateParams['body']['detected_fields']>
) => {
return Object.fromEntries(detectedFields.map(({ name, type }) => [name, { type }]));
};

// TODO: update type once Kibana updates to elasticsearch-js 8.17
const isSuccessfulDocument = (entry: any) => entry.doc.error === undefined;
// TODO: update type once Kibana updates to elasticsearch-js 8.17
const isMappingFailure = (entry: any) =>
!isSuccessfulDocument(entry) && entry.doc.error.type === 'document_parsing_exception';

export const processingRoutes = {
...simulateProcessorRoute,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ export function AddProcessorFlyout({
const handleSubmit: SubmitHandler<ProcessorFormState> = async (data) => {
const processingDefinition = convertFormStateToProcessing(data);

simulate(processingDefinition).then((responseBody) => {
simulate(processingDefinition, data.detected_fields).then((responseBody) => {
if (responseBody instanceof Error) return;

onAddProcessor(processingDefinition, data.detected_fields);
Expand Down
Loading

0 comments on commit 662c19b

Please sign in to comment.