From 662c19b91c0e846b5df23744172ebaf505a4f8fa Mon Sep 17 00:00:00 2001 From: Kibana Machine <42973632+kibanamachine@users.noreply.github.com> Date: Wed, 22 Jan 2025 01:22:25 +1100 Subject: [PATCH] =?UTF-8?q?[8.x]=20[Streams=20=F0=9F=8C=8A]=20Update=20sim?= =?UTF-8?q?ulator=20to=20assert=20fields=20&=20integration=20testing?= =?UTF-8?q?=20(#206950)=20(#207345)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # Backport This will backport the following commits from `main` to `8.x`: - [[Streams 🌊] Update simulator to assert fields & integration testing (#206950)](https://github.com/elastic/kibana/pull/206950) ### Questions ? Please refer to the [Backport tool documentation](https://github.com/sqren/backport) Co-authored-by: Marco Antonio Ghiani --- .../errors/detected_mapping_failure.ts | 13 + .../server/routes/streams/processing/route.ts | 250 ++++++++++++------ .../stream_detail_enrichment/flyout/index.tsx | 2 +- .../flyout/processor_outcome_preview.tsx | 13 +- .../hooks/use_processing_simulator.ts | 21 +- .../apis/observability/streams/index.ts | 1 + .../streams/processing_simulate.ts | 239 +++++++++++++++++ 7 files changed, 455 insertions(+), 84 deletions(-) create mode 100644 x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/detected_mapping_failure.ts create mode 100644 x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/processing_simulate.ts diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/detected_mapping_failure.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/detected_mapping_failure.ts new file mode 100644 index 0000000000000..b026b150b64e8 --- /dev/null +++ b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/detected_mapping_failure.ts @@ -0,0 +1,13 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export class DetectedMappingFailure extends Error { + constructor(message: string) { + super(message); + this.name = 'DetectedMappingFailure'; + } +} diff --git a/x-pack/solutions/observability/plugins/streams/server/routes/streams/processing/route.ts b/x-pack/solutions/observability/plugins/streams/server/routes/streams/processing/route.ts index 897c6f02ec804..af65843ff4aac 100644 --- a/x-pack/solutions/observability/plugins/streams/server/routes/streams/processing/route.ts +++ b/x-pack/solutions/observability/plugins/streams/server/routes/streams/processing/route.ts @@ -5,15 +5,19 @@ * 2.0. */ +/* eslint-disable @typescript-eslint/naming-convention */ + import { z } from '@kbn/zod'; import { notFound, internal, badRequest } from '@hapi/boom'; -import { FieldDefinitionConfig, processingDefinitionSchema } from '@kbn/streams-schema'; -import { calculateObjectDiff, flattenObject } from '@kbn/object-utils'; import { - IngestSimulateResponse, - IngestSimulateSimulateDocumentResult, -} from '@elastic/elasticsearch/lib/api/types'; + FieldDefinitionConfig, + fieldDefinitionConfigSchema, + processingDefinitionSchema, +} from '@kbn/streams-schema'; +import { calculateObjectDiff, flattenObject } from '@kbn/object-utils'; import { isEmpty } from 'lodash'; +import { IScopedClusterClient } from '@kbn/core/server'; +import { DetectedMappingFailure } from '../../../lib/streams/errors/detected_mapping_failure'; import { NonAdditiveProcessor } from '../../../lib/streams/errors/non_additive_processor'; import { SimulationFailed } from '../../../lib/streams/errors/simulation_failed'; import { formatToIngestProcessors } from '../../../lib/streams/helpers/processing'; @@ -21,6 +25,17 @@ import { createServerRoute } from '../../create_server_route'; import { DefinitionNotFound } from '../../../lib/streams/errors'; import { checkAccess } from '../../../lib/streams/stream_crud'; +const paramsSchema = z.object({ + path: z.object({ id: z.string() }), + body: z.object({ + processing: z.array(processingDefinitionSchema), + documents: z.array(z.record(z.unknown())), + detected_fields: z.array(fieldDefinitionConfigSchema.extend({ name: z.string() })).optional(), + }), +}); + +type ProcessingSimulateParams = z.infer; + export const simulateProcessorRoute = createServerRoute({ endpoint: 'POST /api/streams/{id}/processing/_simulate', options: { @@ -33,82 +48,153 @@ export const simulateProcessorRoute = createServerRoute({ 'This API delegates security to the currently logged in user and their Elasticsearch permissions.', }, }, - params: z.object({ - path: z.object({ id: z.string() }), - body: z.object({ - processing: z.array(processingDefinitionSchema), - documents: z.array(z.record(z.unknown())), - }), - }), - handler: async ({ params, request, response, getScopedClients }) => { + params: paramsSchema, + handler: async ({ params, request, getScopedClients }) => { try { const { scopedClusterClient } = await getScopedClients({ request }); - const hasAccess = await checkAccess({ id: params.path.id, scopedClusterClient }); - if (!hasAccess) { + const { read } = await checkAccess({ id: params.path.id, scopedClusterClient }); + if (!read) { throw new DefinitionNotFound(`Stream definition for ${params.path.id} not found.`); } - // Normalize processing definition to pipeline processors - const processors = formatToIngestProcessors(params.body.processing); - // Convert input documents to ingest simulation format - const docs = params.body.documents.map((doc) => ({ _source: doc })); - - let simulationResult: IngestSimulateResponse; - try { - simulationResult = await scopedClusterClient.asCurrentUser.ingest.simulate({ - verbose: true, - pipeline: { processors }, - docs, - }); - } catch (error) { - throw new SimulationFailed(error); - } - const simulationDiffs = computeSimulationDiffs(simulationResult, docs); + const simulationBody = prepareSimulationBody(params); - const updatedFields = computeUpdatedFields(simulationDiffs); - if (!isEmpty(updatedFields)) { - throw new NonAdditiveProcessor( - `The processor is not additive to the documents. It might update fields [${updatedFields.join()}]` - ); - } + const simulationResult = await executeSimulation(scopedClusterClient, simulationBody); - const documents = computeSimulationDocuments(simulationResult, docs); - const detectedFields = computeDetectedFields(simulationDiffs); - const successRate = computeSuccessRate(simulationResult); - const failureRate = 1 - successRate; + const simulationDiffs = prepareSimulationDiffs(simulationResult, simulationBody.docs); - return { - documents, - success_rate: parseFloat(successRate.toFixed(2)), - failure_rate: parseFloat(failureRate.toFixed(2)), - detected_fields: detectedFields, - }; + assertSimulationResult(simulationResult, simulationDiffs); + + return prepareSimulationResponse( + simulationResult, + simulationBody.docs, + simulationDiffs, + params.body.detected_fields + ); } catch (error) { if (error instanceof DefinitionNotFound) { throw notFound(error); } - - if (error instanceof SimulationFailed || error instanceof NonAdditiveProcessor) { + if ( + error instanceof SimulationFailed || + error instanceof NonAdditiveProcessor || + error instanceof DetectedMappingFailure + ) { throw badRequest(error); } - throw internal(error); } }, }); -const computeSimulationDiffs = ( - simulation: IngestSimulateResponse, +const prepareSimulationBody = (params: ProcessingSimulateParams) => { + const { path, body } = params; + const { processing, documents, detected_fields } = body; + + const processors = formatToIngestProcessors(processing); + const docs = documents.map((doc, id) => ({ + _index: path.id, + _id: id.toString(), + _source: doc, + })); + + const simulationBody: any = { + docs, + pipeline_substitutions: { + [`${path.id}@stream.processing`]: { + processors, + }, + }, + }; + + if (detected_fields) { + const properties = computeMappingProperties(detected_fields); + simulationBody.component_template_substitutions = { + [`${path.id}@stream.layer`]: { + template: { + mappings: { + properties, + }, + }, + }, + }; + } + + return simulationBody; +}; + +// TODO: update type once Kibana updates to elasticsearch-js 8.17 +const executeSimulation = async ( + scopedClusterClient: IScopedClusterClient, + simulationBody: ReturnType +): Promise => { + try { + // TODO: We should be using scopedClusterClient.asCurrentUser.simulate.ingest() once Kibana updates to elasticsearch-js 8.17 + return await scopedClusterClient.asCurrentUser.transport.request({ + method: 'POST', + path: `_ingest/_simulate`, + body: simulationBody, + }); + } catch (error) { + throw new SimulationFailed(error); + } +}; + +const assertSimulationResult = ( + simulationResult: Awaited>, + simulationDiffs: ReturnType +) => { + // Assert mappings are compatible with the documents + const entryWithError = simulationResult.docs.find(isMappingFailure); + if (entryWithError) { + throw new DetectedMappingFailure( + `The detected field types might not be compatible with these documents. ${entryWithError.doc.error.reason}` + ); + } + // Assert that the processors are purely additive to the documents + const updatedFields = computeUpdatedFields(simulationDiffs); + if (!isEmpty(updatedFields)) { + throw new NonAdditiveProcessor( + `The processor is not additive to the documents. It might update fields [${updatedFields.join()}]` + ); + } +}; + +const prepareSimulationResponse = ( + simulationResult: any, + docs: Array<{ _source: Record }>, + simulationDiffs: ReturnType, + detectedFields?: ProcessingSimulateParams['body']['detected_fields'] +) => { + const confirmedValidDetectedFields = computeMappingProperties(detectedFields ?? []); + const documents = computeSimulationDocuments(simulationResult, docs); + const detectedFieldsResult = computeDetectedFields(simulationDiffs, confirmedValidDetectedFields); + const successRate = computeSuccessRate(simulationResult); + const failureRate = 1 - successRate; + + return { + documents, + success_rate: parseFloat(successRate.toFixed(2)), + failure_rate: parseFloat(failureRate.toFixed(2)), + detected_fields: detectedFieldsResult, + }; +}; + +// TODO: update type once Kibana updates to elasticsearch-js 8.17 +const prepareSimulationDiffs = ( + simulation: any, sampleDocs: Array<{ _source: Record }> ) => { // Since we filter out failed documents, we need to map the simulation docs to the sample docs for later retrieval - const samplesToSimulationMap = new Map(simulation.docs.map((doc, id) => [doc, sampleDocs[id]])); + const samplesToSimulationMap = new Map }>( + simulation.docs.map((entry: any, id: number) => [entry.doc, sampleDocs[id]]) + ); - const diffs = simulation.docs.filter(isSuccessfulDocument).map((doc) => { - const sample = samplesToSimulationMap.get(doc); + const diffs = simulation.docs.filter(isSuccessfulDocument).map((entry: any) => { + const sample = samplesToSimulationMap.get(entry.doc); if (sample) { - return calculateObjectDiff(sample._source, doc.processor_results.at(-1)?.doc?._source); + return calculateObjectDiff(sample._source, entry.doc._source); } return calculateObjectDiff({}); @@ -117,9 +203,10 @@ const computeSimulationDiffs = ( return diffs; }; -const computeUpdatedFields = (simulationDiff: ReturnType) => { +// TODO: update type once Kibana updates to elasticsearch-js 8.17 +const computeUpdatedFields = (simulationDiff: ReturnType) => { const diffs = simulationDiff - .map((simulatedDoc) => flattenObject(simulatedDoc.updated)) + .map((simulatedDoc: any) => flattenObject(simulatedDoc.updated)) .flatMap(Object.keys); const uniqueFields = [...new Set(diffs)]; @@ -127,15 +214,16 @@ const computeUpdatedFields = (simulationDiff: ReturnType }> -) => { - return simulation.docs.map((doc, id) => { +): Array<{ isMatch: boolean; value: Record }> => { + return simulation.docs.map((entry: any, id: number) => { // If every processor was successful, return and flatten the simulation doc from the last processor - if (isSuccessfulDocument(doc)) { + if (isSuccessfulDocument(entry)) { return { - value: flattenObject(doc.processor_results.at(-1)?.doc?._source ?? sampleDocs[id]._source), + value: flattenObject(entry.doc._source ?? sampleDocs[id]._source), isMatch: true, }; } @@ -148,32 +236,44 @@ const computeSimulationDocuments = ( }; const computeDetectedFields = ( - simulationDiff: ReturnType + simulationDiff: ReturnType, + confirmedValidDetectedFields: Record ): Array<{ name: string; type: FieldDefinitionConfig['type'] | 'unmapped'; }> => { - const diffs = simulationDiff - .map((simulatedDoc) => flattenObject(simulatedDoc.added)) + const diffs: string[] = simulationDiff + .map((simulatedDoc: any) => flattenObject(simulatedDoc.added)) .flatMap(Object.keys); const uniqueFields = [...new Set(diffs)]; - return uniqueFields.map((name) => ({ name, type: 'unmapped' })); + return uniqueFields.map((name: string) => ({ + name, + type: confirmedValidDetectedFields[name]?.type || 'unmapped', + })); }; -const computeSuccessRate = (simulation: IngestSimulateResponse) => { - const successfulCount = simulation.docs.reduce((rate, doc) => { - return (rate += isSuccessfulDocument(doc) ? 1 : 0); +// TODO: update type once Kibana updates to elasticsearch-js 8.17 +const computeSuccessRate = (simulation: any) => { + const successfulCount = simulation.docs.reduce((rate: number, entry: any) => { + return (rate += isSuccessfulDocument(entry) ? 1 : 0); }, 0); + return successfulCount / simulation.docs.length; }; -const isSuccessfulDocument = ( - doc: IngestSimulateSimulateDocumentResult -): doc is Required => - doc.processor_results?.every((processorSimulation) => processorSimulation.status === 'success') || - false; +const computeMappingProperties = ( + detectedFields: NonNullable +) => { + return Object.fromEntries(detectedFields.map(({ name, type }) => [name, { type }])); +}; + +// TODO: update type once Kibana updates to elasticsearch-js 8.17 +const isSuccessfulDocument = (entry: any) => entry.doc.error === undefined; +// TODO: update type once Kibana updates to elasticsearch-js 8.17 +const isMappingFailure = (entry: any) => + !isSuccessfulDocument(entry) && entry.doc.error.type === 'document_parsing_exception'; export const processingRoutes = { ...simulateProcessorRoute, diff --git a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/flyout/index.tsx b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/flyout/index.tsx index bd04e2cbf959e..27624933559b2 100644 --- a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/flyout/index.tsx +++ b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/flyout/index.tsx @@ -68,7 +68,7 @@ export function AddProcessorFlyout({ const handleSubmit: SubmitHandler = async (data) => { const processingDefinition = convertFormStateToProcessing(data); - simulate(processingDefinition).then((responseBody) => { + simulate(processingDefinition, data.detected_fields).then((responseBody) => { if (responseBody instanceof Error) return; onAddProcessor(processingDefinition, data.detected_fields); diff --git a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/flyout/processor_outcome_preview.tsx b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/flyout/processor_outcome_preview.tsx index 47a08264b70ab..499802117bc80 100644 --- a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/flyout/processor_outcome_preview.tsx +++ b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/flyout/processor_outcome_preview.tsx @@ -109,7 +109,8 @@ export const ProcessorOutcomePreview = ({ }, [formFields.field, detectedFieldsColumns, selectedDocsFilter]); const detectedFieldsEnabled = - isWiredReadStream(definition) && simulation && !isEmpty(simulation.detected_fields); + isWiredReadStream(definition) && + ((simulation && !isEmpty(simulation.detected_fields)) || !isEmpty(formFields.detected_fields)); return ( @@ -126,7 +127,9 @@ export const ProcessorOutcomePreview = ({ iconType="play" color="accentSecondary" size="s" - onClick={() => onSimulate(convertFormStateToProcessing(formFields))} + onClick={() => { + onSimulate(convertFormStateToProcessing(formFields), formFields.detected_fields); + }} isLoading={isLoading} > {i18n.translate( @@ -136,7 +139,7 @@ export const ProcessorOutcomePreview = ({ - {detectedFieldsEnabled && } + {detectedFieldsEnabled && } { +const DetectedFields = ({ detectedFields }: { detectedFields?: DetectedField[] }) => { const { euiTheme } = useEuiTheme(); const { fields, replace } = useFieldArray<{ detected_fields: DetectedField[] }>({ name: 'detected_fields', }); useEffect(() => { - replace(detectedFields); + if (detectedFields) replace(detectedFields); }, [detectedFields, replace]); return ( diff --git a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/hooks/use_processing_simulator.ts b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/hooks/use_processing_simulator.ts index 1ff63fbc484e6..bf0ce34c1ac0f 100644 --- a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/hooks/use_processing_simulator.ts +++ b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/hooks/use_processing_simulator.ts @@ -5,23 +5,31 @@ * 2.0. */ +/* eslint-disable @typescript-eslint/naming-convention */ + import { useAbortController } from '@kbn/observability-utils-browser/hooks/use_abort_controller'; import { ReadStreamDefinition, ProcessingDefinition, Condition } from '@kbn/streams-schema'; import useAsyncFn from 'react-use/lib/useAsyncFn'; import { IHttpFetchError, ResponseErrorBody } from '@kbn/core/public'; import { useDateRange } from '@kbn/observability-utils-browser/hooks/use_date_range'; -import { APIReturnType } from '@kbn/streams-plugin/public/api'; +import { APIReturnType, StreamsAPIClientRequestParamsOf } from '@kbn/streams-plugin/public/api'; import { useStreamsAppFetch } from '../../../hooks/use_streams_app_fetch'; import { useKibana } from '../../../hooks/use_kibana'; +import { DetectedField } from '../types'; type Simulation = APIReturnType<'POST /api/streams/{id}/processing/_simulate'>; +type SimulationRequestBody = + StreamsAPIClientRequestParamsOf<'POST /api/streams/{id}/processing/_simulate'>['params']['body']; export interface UseProcessingSimulatorReturnType { error?: IHttpFetchError; isLoading: boolean; refreshSamples: () => void; samples: Array>; - simulate: (processing: ProcessingDefinition) => Promise; + simulate: ( + processing: ProcessingDefinition, + detectedFields?: DetectedField[] + ) => Promise; simulation?: Simulation | null; } @@ -76,11 +84,17 @@ export const useProcessingSimulator = ({ const sampleDocs = (samples?.documents ?? []) as Array>; const [{ loading: isLoadingSimulation, error, value }, simulate] = useAsyncFn( - (processingDefinition: ProcessingDefinition) => { + (processingDefinition: ProcessingDefinition, detectedFields?: DetectedField[]) => { if (!definition) { return Promise.resolve(null); } + const detected_fields = detectedFields + ? (detectedFields.filter( + (field) => field.type !== 'unmapped' + ) as SimulationRequestBody['detected_fields']) + : undefined; + return streamsRepositoryClient.fetch('POST /api/streams/{id}/processing/_simulate', { signal: abortController.signal, params: { @@ -88,6 +102,7 @@ export const useProcessingSimulator = ({ body: { documents: sampleDocs, processing: [processingDefinition], + detected_fields, }, }, }); diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/index.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/index.ts index 4f7f2295d1584..5f685bad1684b 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/index.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/index.ts @@ -15,6 +15,7 @@ export default function ({ loadTestFile }: DeploymentAgnosticFtrProviderContext) loadTestFile(require.resolve('./flush_config')); loadTestFile(require.resolve('./assets/dashboard')); loadTestFile(require.resolve('./schema')); + loadTestFile(require.resolve('./processing_simulate')); loadTestFile(require.resolve('./root_stream')); }); } diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/processing_simulate.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/processing_simulate.ts new file mode 100644 index 0000000000000..f2afc4f9b9bb4 --- /dev/null +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/processing_simulate.ts @@ -0,0 +1,239 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import expect from '@kbn/expect'; +import { ClientRequestParamsOf } from '@kbn/server-route-repository-utils'; +import { StreamsRouteRepository } from '@kbn/streams-plugin/server'; +import { errors } from '@elastic/elasticsearch'; +import { disableStreams, enableStreams, forkStream, indexDocument } from './helpers/requests'; +import { DeploymentAgnosticFtrProviderContext } from '../../../ftr_provider_context'; +import { + StreamsSupertestRepositoryClient, + createStreamsRepositoryAdminClient, +} from './helpers/repository_client'; + +async function simulateProcessingForStream( + client: StreamsSupertestRepositoryClient, + id: string, + body: ClientRequestParamsOf< + StreamsRouteRepository, + 'POST /api/streams/{id}/processing/_simulate' + >['params']['body'], + statusCode = 200 +) { + return client + .fetch('POST /api/streams/{id}/processing/_simulate', { + params: { + path: { id }, + body, + }, + }) + .expect(statusCode); +} + +export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { + const roleScopedSupertest = getService('roleScopedSupertest'); + const esClient = getService('es'); + + let apiClient: StreamsSupertestRepositoryClient; + + describe('Processing Simulation', () => { + const TEST_TIMESTAMP = '2025-01-01T00:00:10.000Z'; + const TEST_MESSAGE = `${TEST_TIMESTAMP} error test`; + const TEST_HOST = 'test-host'; + + const testDoc = { + '@timestamp': TEST_TIMESTAMP, + message: TEST_MESSAGE, + 'host.name': TEST_HOST, + 'log.level': 'error', + }; + + const basicGrokProcessor = { + config: { + grok: { + field: 'message', + patterns: [ + '%{TIMESTAMP_ISO8601:parsed_timestamp} %{LOGLEVEL:parsed_level} %{GREEDYDATA:parsed_message}', + ], + }, + }, + }; + + const createTestDocument = (message = TEST_MESSAGE) => ({ + '@timestamp': TEST_TIMESTAMP, + message, + }); + + before(async () => { + apiClient = await createStreamsRepositoryAdminClient(roleScopedSupertest); + + await enableStreams(apiClient); + + // Create a test document + await indexDocument(esClient, 'logs', testDoc); + + // Create a forked stream for testing + await forkStream(apiClient, 'logs', { + stream: { + name: 'logs.test', + }, + condition: { + field: 'host.name', + operator: 'eq' as const, + value: TEST_HOST, + }, + }); + }); + + after(async () => { + await disableStreams(apiClient); + }); + + describe('Successful simulations', () => { + describe('with valid documents', () => { + it('should simulate additive processing', async () => { + const response = await simulateProcessingForStream(apiClient, 'logs.test', { + processing: [basicGrokProcessor], + documents: [createTestDocument()], + }); + + expect(response.body.success_rate).to.be(1); + expect(response.body.failure_rate).to.be(0); + + const { isMatch, value } = response.body.documents[0]; + expect(isMatch).to.be(true); + expect(value).to.have.property('parsed_timestamp', TEST_TIMESTAMP); + expect(value).to.have.property('parsed_level', 'error'); + expect(value).to.have.property('parsed_message', 'test'); + }); + + it('should simulate with detected fields', async () => { + const response = await simulateProcessingForStream(apiClient, 'logs.test', { + processing: [basicGrokProcessor], + documents: [createTestDocument()], + detected_fields: [ + { name: 'parsed_timestamp', type: 'date' }, + { name: 'parsed_level', type: 'keyword' }, + ], + }); + + const findField = (name: string) => + response.body.detected_fields.find((f: { name: string }) => f.name === name); + + expect(response.body.detected_fields).to.have.length(3); // Including parsed_message + expect(findField('parsed_timestamp')).to.have.property('type', 'date'); + expect(findField('parsed_level')).to.have.property('type', 'keyword'); + }); + }); + + describe('with mixed success/failure documents', () => { + it('should provide accurate success/failure rates', async () => { + const response = await simulateProcessingForStream(apiClient, 'logs.test', { + processing: [basicGrokProcessor], + documents: [ + createTestDocument(), + createTestDocument('invalid format'), + createTestDocument(`${TEST_TIMESTAMP} info test`), + ], + }); + + expect(response.body.success_rate).to.be(0.67); + expect(response.body.failure_rate).to.be(0.33); + expect(response.body.documents).to.have.length(3); + expect(response.body.documents[0].isMatch).to.be(true); + expect(response.body.documents[1].isMatch).to.be(false); + expect(response.body.documents[2].isMatch).to.be(true); + }); + }); + }); + + describe('Failed simulations', () => { + it('should fail with invalid processor configurations', async () => { + await simulateProcessingForStream( + apiClient, + 'logs.test', + { + processing: [ + { + config: { + grok: { + field: 'message', + patterns: ['%{INVALID_PATTERN:field}'], + }, + }, + }, + ], + documents: [createTestDocument('test message')], + }, + 400 + ); + }); + + it('should fail when attempting to update existing fields', async () => { + const response = await simulateProcessingForStream( + apiClient, + 'logs.test', + { + processing: [ + { + config: { + grok: { + field: 'message', + patterns: ['%{TIMESTAMP_ISO8601:parsed_timestamp} %{GREEDYDATA:message}'], // Overwrites existing message field + }, + }, + }, + ], + documents: [createTestDocument(`${TEST_TIMESTAMP} original message`)], + }, + 400 + ); + + expect((response.body as errors.ResponseError['body']).message).to.contain( + 'The processor is not additive to the documents. It might update fields [message]' + ); + }); + + it('should fail with incompatible detected field mappings', async () => { + const response = await simulateProcessingForStream( + apiClient, + 'logs.test', + { + processing: [basicGrokProcessor], + documents: [createTestDocument()], + detected_fields: [ + { name: 'parsed_timestamp', type: 'boolean' }, // Incompatible type + ], + }, + 400 + ); + + expect((response.body as errors.ResponseError['body']).message).to.contain( + 'The detected field types might not be compatible with these documents.' + ); + }); + }); + + describe('Partial success simulations', () => { + it('should handle mixed success/failure documents', async () => { + const response = await simulateProcessingForStream(apiClient, 'logs.test', { + processing: [basicGrokProcessor], + documents: [ + createTestDocument(), // Will succeed + createTestDocument('invalid format'), // Will fail + ], + }); + + expect(response.body.success_rate).to.be(0.5); + expect(response.body.failure_rate).to.be(0.5); + expect(response.body.documents[0].isMatch).to.be(true); + expect(response.body.documents[1].isMatch).to.be(false); + }); + }); + }); +}