From d954d5ddae0b06f4f445666a53f1dd9aaa7bfd5f Mon Sep 17 00:00:00 2001 From: Marco Antonio Ghiani Date: Tue, 4 Mar 2025 07:43:30 +0100 Subject: [PATCH] =?UTF-8?q?[Streams=20=F0=9F=8C=8A]=20Enrichment=20-=20Fix?= =?UTF-8?q?=20broken=20results=20due=20to=20condition=20and=20add=20skippe?= =?UTF-8?q?d=20metric=20(#212757)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## 📓 Summary When the condition is not met, the processing simulation reports wrong metrics and fails on a unhandler error. This work fix the issue and also update the document simulation metrics, reporting how many documents are skipped by a processor during the simulation. A follow-up work will update the filters on the date to better reflect the available states of the documents (parsed, partially parsed, skipped, failed). Screenshot 2025-02-28 at 12 47 10 (cherry picked from commit 6e2a1033b8900529a2276f90a78b36a7ea145cb8) --- .../streams/processing/simulation_handler.ts | 76 +++++++++++++++---- .../processor_outcome_preview.tsx | 9 ++- .../processors/processor_metrics.tsx | 15 ++++ 3 files changed, 85 insertions(+), 15 deletions(-) diff --git a/x-pack/platform/plugins/shared/streams/server/routes/streams/processing/simulation_handler.ts b/x-pack/platform/plugins/shared/streams/server/routes/streams/processing/simulation_handler.ts index 5dbb3bd34d67f..124c3cf52f096 100644 --- a/x-pack/platform/plugins/shared/streams/server/routes/streams/processing/simulation_handler.ts +++ b/x-pack/platform/plugins/shared/streams/server/routes/streams/processing/simulation_handler.ts @@ -62,7 +62,7 @@ export interface SimulationError { | 'non_additive_processor_failure'; } -export type DocSimulationStatus = 'parsed' | 'partially_parsed' | 'failed'; +export type DocSimulationStatus = 'parsed' | 'partially_parsed' | 'skipped' | 'failed'; export interface SimulationDocReport { detected_fields: Array<{ processor_id: string; name: string }>; @@ -75,6 +75,7 @@ export interface ProcessorMetrics { detected_fields: string[]; errors: SimulationError[]; failure_rate: number; + skipped_rate: number; success_rate: number; } @@ -113,7 +114,6 @@ export const simulateProcessing = async ({ /* 1. Prepare data for either simulation types (ingest, pipeline), prepare simulation body for the mandatory pipeline simulation */ const simulationData = prepareSimulationData(params); const pipelineSimulationBody = preparePipelineSimulationBody(simulationData); - /** * 2. Run both pipeline and ingest simulations in parallel. * - The pipeline simulation is used to extract the documents reports and the processor metrics. This always runs. @@ -188,7 +188,16 @@ const prepareSimulationProcessors = ( } as ProcessorDefinition; }); - return formatToIngestProcessors(processors); + const dotExpanderProcessor: Pick = { + dot_expander: { + field: '*', + override: true, + }, + }; + + const formattedProcessors = formatToIngestProcessors(processors); + + return [dotExpanderProcessor, ...formattedProcessors]; }; const prepareSimulationData = (params: ProcessingSimulationParams) => { @@ -351,10 +360,18 @@ const computePipelineSimulationResult = ( const processorsMap = initProcessorMetricsMap(processing); const docReports = simulationResult.docs.map((docResult, id) => { - const { errors, status, value } = getLastDoc(docResult); + const { errors, status, value } = getLastDoc(docResult, sampleDocs[id]._source); const diff = computeSimulationDocDiff(docResult, sampleDocs[id]._source); + docResult.processor_results.forEach((processor) => { + const procId = processor.tag; + + if (procId && isSkippedProcessor(processor)) { + processorsMap[procId].skipped_rate++; + } + }); + diff.detected_fields.forEach(({ processor_id, name }) => { processorsMap[processor_id].detected_fields.push(name); }); @@ -392,6 +409,7 @@ const initProcessorMetricsMap = ( detected_fields: [], errors: [], failure_rate: 0, + skipped_rate: 0, success_rate: 1, }, ]); @@ -408,7 +426,8 @@ const extractProcessorMetrics = ({ }) => { return mapValues(processorsMap, (metrics) => { const failureRate = metrics.failure_rate / sampleSize; - const successRate = 1 - failureRate; + const skippedRate = metrics.skipped_rate / sampleSize; + const successRate = 1 - skippedRate - failureRate; const detected_fields = uniq(metrics.detected_fields); const errors = uniqBy(metrics.errors, (error) => error.message); @@ -416,22 +435,38 @@ const extractProcessorMetrics = ({ detected_fields, errors, failure_rate: parseFloat(failureRate.toFixed(2)), + skipped_rate: parseFloat(skippedRate.toFixed(2)), success_rate: parseFloat(successRate.toFixed(2)), }; }); }; const getDocumentStatus = (doc: SuccessfulIngestSimulateDocumentResult): DocSimulationStatus => { - if (doc.processor_results.every(isSuccessfulProcessor)) return 'parsed'; + // Remove the always present base processor for dot expander + const processorResults = doc.processor_results.slice(1); - if (doc.processor_results.some(isSuccessfulProcessor)) return 'partially_parsed'; + if (processorResults.every(isSkippedProcessor)) { + return 'skipped'; + } + + if (processorResults.every((proc) => isSuccessfulProcessor(proc) || isSkippedProcessor(proc))) { + return 'parsed'; + } + + if (processorResults.some(isSuccessfulProcessor)) { + return 'partially_parsed'; + } return 'failed'; }; -const getLastDoc = (docResult: SuccessfulIngestSimulateDocumentResult) => { +const getLastDoc = (docResult: SuccessfulIngestSimulateDocumentResult, sample: FlattenRecord) => { const status = getDocumentStatus(docResult); - const lastDocSource = docResult.processor_results.at(-1)?.doc?._source ?? {}; + const lastDocSource = + docResult.processor_results + .slice(1) // Remove the always present base processor for dot expander + .filter((proc) => !isSkippedProcessor(proc)) + .at(-1)?.doc?._source ?? sample; if (status === 'parsed') { return { @@ -440,7 +475,7 @@ const getLastDoc = (docResult: SuccessfulIngestSimulateDocumentResult) => { status, }; } else { - const { _errors, ...value } = lastDocSource; + const { _errors = [], ...value } = lastDocSource; return { value: flattenObjectNestedLast(value), errors: _errors as SimulationError[], status }; } }; @@ -459,7 +494,7 @@ const computeSimulationDocDiff = ( const successfulProcessors = docResult.processor_results.filter(isSuccessfulProcessor); const comparisonDocs = [ - { processor_id: 'sample', value: sample }, + { processor_id: 'base', value: docResult.processor_results[0]!.doc!._source }, ...successfulProcessors.map((proc) => ({ processor_id: proc.tag, value: omit(proc.doc._source, ['_errors']), @@ -495,7 +530,7 @@ const computeSimulationDocDiff = ( // We might have updated fields that are not present in the original document because are generated by the previous processors. // We exclude them from the list of fields that make the processor non-additive. - const originalUpdatedFields = updatedFields.filter((field) => field in sample); + const originalUpdatedFields = updatedFields.filter((field) => field in sample).sort(); if (!isEmpty(originalUpdatedFields)) { diffResult.errors.push({ processor_id: nextDoc.processor_id, @@ -514,7 +549,8 @@ const prepareSimulationResponse = async ( detectedFields: DetectedField[] ) => { const successRate = computeSuccessRate(docReports); - const failureRate = 1 - successRate; + const skippedRate = computeSkippedRate(docReports); + const failureRate = 1 - skippedRate - successRate; const isNotAdditiveSimulation = some(processorsMetrics, (metrics) => metrics.errors.some(isNonAdditiveSimulationError) ); @@ -524,6 +560,7 @@ const prepareSimulationResponse = async ( documents: docReports, processors_metrics: processorsMetrics, failure_rate: parseFloat(failureRate.toFixed(2)), + skipped_rate: parseFloat(skippedRate.toFixed(2)), success_rate: parseFloat(successRate.toFixed(2)), is_non_additive_simulation: isNotAdditiveSimulation, }; @@ -538,10 +575,12 @@ const prepareSimulationFailureResponse = (error: SimulationError) => { detected_fields: [], errors: [error], failure_rate: 1, + skipped_rate: 0, success_rate: 0, }, }, failure_rate: 1, + skipped_rate: 0, success_rate: 0, is_non_additive_simulation: isNonAdditiveSimulationError(error), }; @@ -597,6 +636,12 @@ const computeSuccessRate = (docs: SimulationDocReport[]) => { return successfulCount / docs.length; }; +const computeSkippedRate = (docs: SimulationDocReport[]) => { + const skippedCount = docs.reduce((rate, doc) => (rate += doc.status === 'skipped' ? 1 : 0), 0); + + return skippedCount / docs.length; +}; + const computeMappingProperties = (detectedFields: NamedFieldDefinitionConfig[]) => { return Object.fromEntries(detectedFields.map(({ name, type }) => [name, { type }])); }; @@ -609,6 +654,11 @@ const isSuccessfulProcessor = ( ): processor is WithRequired => processor.status === 'success' && !!processor.tag; +const isSkippedProcessor = ( + processor: IngestPipelineSimulation + // @ts-expect-error Looks like the IngestPipelineSimulation.status is not typed correctly and misses the 'skipped' status +): processor is WithRequired => processor.status === 'skipped'; + // TODO: update type once Kibana updates to elasticsearch-js 8.17 const isMappingFailure = (entry: any) => entry.doc?.error?.type === 'document_parsing_exception'; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processor_outcome_preview.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processor_outcome_preview.tsx index dab8028f1f999..f3c7b22c8ef3e 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processor_outcome_preview.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processor_outcome_preview.tsx @@ -67,6 +67,11 @@ export const ProcessorOutcomePreview = ({ } }, [columns, selectedDocsFilter]); + const simulationFailureRate = simulation + ? simulation?.failure_rate + simulation?.skipped_rate + : undefined; + const simulationSuccessRate = simulation?.success_rate; + return ( <> @@ -76,8 +81,8 @@ export const ProcessorOutcomePreview = ({ timeRange={timeRange} onTimeRangeChange={setTimeRange} onTimeRangeRefresh={onRefreshSamples} - simulationFailureRate={simulation?.failure_rate} - simulationSuccessRate={simulation?.success_rate} + simulationFailureRate={simulationFailureRate} + simulationSuccessRate={simulationSuccessRate} /> diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors/processor_metrics.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors/processor_metrics.tsx index baead2578d4db..edc211ddec06e 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors/processor_metrics.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors/processor_metrics.tsx @@ -32,10 +32,12 @@ const formatter = new Intl.NumberFormat('en-US', { export const ProcessorMetricBadges = ({ detected_fields, failure_rate, + skipped_rate, success_rate, }: ProcessorMetricBadgesProps) => { const detectedFieldsCount = detected_fields.length; const failureRate = failure_rate > 0 ? formatter.format(failure_rate) : null; + const skippedRate = skipped_rate > 0 ? formatter.format(skipped_rate) : null; const successRate = success_rate > 0 ? formatter.format(success_rate) : null; return ( @@ -53,6 +55,19 @@ export const ProcessorMetricBadges = ({ {failureRate} )} + {skippedRate && ( + + {skippedRate} + + )} {successRate && (