Skip to content

Commit

Permalink
[Streams 🌊] Enrichment - Fix broken results due to condition and add …
Browse files Browse the repository at this point in the history
…skipped metric (#212757)

## 📓 Summary

When the condition is not met, the processing simulation reports wrong
metrics and fails on a unhandler error.

This work fix the issue and also update the document simulation metrics,
reporting how many documents are skipped by a processor during the
simulation.

A follow-up work will update the filters on the date to better reflect
the available states of the documents (parsed, partially parsed,
skipped, failed).

<img width="701" alt="Screenshot 2025-02-28 at 12 47 10"
src="https://github.com/user-attachments/assets/1b6979e4-78a1-4db3-af72-faaf06c0e249"
/>

(cherry picked from commit 6e2a103)
  • Loading branch information
tonyghiani committed Mar 4, 2025
1 parent 207f721 commit d954d5d
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ export interface SimulationError {
| 'non_additive_processor_failure';
}

export type DocSimulationStatus = 'parsed' | 'partially_parsed' | 'failed';
export type DocSimulationStatus = 'parsed' | 'partially_parsed' | 'skipped' | 'failed';

export interface SimulationDocReport {
detected_fields: Array<{ processor_id: string; name: string }>;
Expand All @@ -75,6 +75,7 @@ export interface ProcessorMetrics {
detected_fields: string[];
errors: SimulationError[];
failure_rate: number;
skipped_rate: number;
success_rate: number;
}

Expand Down Expand Up @@ -113,7 +114,6 @@ export const simulateProcessing = async ({
/* 1. Prepare data for either simulation types (ingest, pipeline), prepare simulation body for the mandatory pipeline simulation */
const simulationData = prepareSimulationData(params);
const pipelineSimulationBody = preparePipelineSimulationBody(simulationData);

/**
* 2. Run both pipeline and ingest simulations in parallel.
* - The pipeline simulation is used to extract the documents reports and the processor metrics. This always runs.
Expand Down Expand Up @@ -188,7 +188,16 @@ const prepareSimulationProcessors = (
} as ProcessorDefinition;
});

return formatToIngestProcessors(processors);
const dotExpanderProcessor: Pick<IngestProcessorContainer, 'dot_expander'> = {
dot_expander: {
field: '*',
override: true,
},
};

const formattedProcessors = formatToIngestProcessors(processors);

return [dotExpanderProcessor, ...formattedProcessors];
};

const prepareSimulationData = (params: ProcessingSimulationParams) => {
Expand Down Expand Up @@ -351,10 +360,18 @@ const computePipelineSimulationResult = (
const processorsMap = initProcessorMetricsMap(processing);

const docReports = simulationResult.docs.map((docResult, id) => {
const { errors, status, value } = getLastDoc(docResult);
const { errors, status, value } = getLastDoc(docResult, sampleDocs[id]._source);

const diff = computeSimulationDocDiff(docResult, sampleDocs[id]._source);

docResult.processor_results.forEach((processor) => {
const procId = processor.tag;

if (procId && isSkippedProcessor(processor)) {
processorsMap[procId].skipped_rate++;
}
});

diff.detected_fields.forEach(({ processor_id, name }) => {
processorsMap[processor_id].detected_fields.push(name);
});
Expand Down Expand Up @@ -392,6 +409,7 @@ const initProcessorMetricsMap = (
detected_fields: [],
errors: [],
failure_rate: 0,
skipped_rate: 0,
success_rate: 1,
},
]);
Expand All @@ -408,30 +426,47 @@ const extractProcessorMetrics = ({
}) => {
return mapValues(processorsMap, (metrics) => {
const failureRate = metrics.failure_rate / sampleSize;
const successRate = 1 - failureRate;
const skippedRate = metrics.skipped_rate / sampleSize;
const successRate = 1 - skippedRate - failureRate;
const detected_fields = uniq(metrics.detected_fields);
const errors = uniqBy(metrics.errors, (error) => error.message);

return {
detected_fields,
errors,
failure_rate: parseFloat(failureRate.toFixed(2)),
skipped_rate: parseFloat(skippedRate.toFixed(2)),
success_rate: parseFloat(successRate.toFixed(2)),
};
});
};

const getDocumentStatus = (doc: SuccessfulIngestSimulateDocumentResult): DocSimulationStatus => {
if (doc.processor_results.every(isSuccessfulProcessor)) return 'parsed';
// Remove the always present base processor for dot expander
const processorResults = doc.processor_results.slice(1);

if (doc.processor_results.some(isSuccessfulProcessor)) return 'partially_parsed';
if (processorResults.every(isSkippedProcessor)) {
return 'skipped';
}

if (processorResults.every((proc) => isSuccessfulProcessor(proc) || isSkippedProcessor(proc))) {
return 'parsed';
}

if (processorResults.some(isSuccessfulProcessor)) {
return 'partially_parsed';
}

return 'failed';
};

const getLastDoc = (docResult: SuccessfulIngestSimulateDocumentResult) => {
const getLastDoc = (docResult: SuccessfulIngestSimulateDocumentResult, sample: FlattenRecord) => {
const status = getDocumentStatus(docResult);
const lastDocSource = docResult.processor_results.at(-1)?.doc?._source ?? {};
const lastDocSource =
docResult.processor_results
.slice(1) // Remove the always present base processor for dot expander
.filter((proc) => !isSkippedProcessor(proc))
.at(-1)?.doc?._source ?? sample;

if (status === 'parsed') {
return {
Expand All @@ -440,7 +475,7 @@ const getLastDoc = (docResult: SuccessfulIngestSimulateDocumentResult) => {
status,
};
} else {
const { _errors, ...value } = lastDocSource;
const { _errors = [], ...value } = lastDocSource;
return { value: flattenObjectNestedLast(value), errors: _errors as SimulationError[], status };
}
};
Expand All @@ -459,7 +494,7 @@ const computeSimulationDocDiff = (
const successfulProcessors = docResult.processor_results.filter(isSuccessfulProcessor);

const comparisonDocs = [
{ processor_id: 'sample', value: sample },
{ processor_id: 'base', value: docResult.processor_results[0]!.doc!._source },
...successfulProcessors.map((proc) => ({
processor_id: proc.tag,
value: omit(proc.doc._source, ['_errors']),
Expand Down Expand Up @@ -495,7 +530,7 @@ const computeSimulationDocDiff = (

// We might have updated fields that are not present in the original document because are generated by the previous processors.
// We exclude them from the list of fields that make the processor non-additive.
const originalUpdatedFields = updatedFields.filter((field) => field in sample);
const originalUpdatedFields = updatedFields.filter((field) => field in sample).sort();
if (!isEmpty(originalUpdatedFields)) {
diffResult.errors.push({
processor_id: nextDoc.processor_id,
Expand All @@ -514,7 +549,8 @@ const prepareSimulationResponse = async (
detectedFields: DetectedField[]
) => {
const successRate = computeSuccessRate(docReports);
const failureRate = 1 - successRate;
const skippedRate = computeSkippedRate(docReports);
const failureRate = 1 - skippedRate - successRate;
const isNotAdditiveSimulation = some(processorsMetrics, (metrics) =>
metrics.errors.some(isNonAdditiveSimulationError)
);
Expand All @@ -524,6 +560,7 @@ const prepareSimulationResponse = async (
documents: docReports,
processors_metrics: processorsMetrics,
failure_rate: parseFloat(failureRate.toFixed(2)),
skipped_rate: parseFloat(skippedRate.toFixed(2)),
success_rate: parseFloat(successRate.toFixed(2)),
is_non_additive_simulation: isNotAdditiveSimulation,
};
Expand All @@ -538,10 +575,12 @@ const prepareSimulationFailureResponse = (error: SimulationError) => {
detected_fields: [],
errors: [error],
failure_rate: 1,
skipped_rate: 0,
success_rate: 0,
},
},
failure_rate: 1,
skipped_rate: 0,
success_rate: 0,
is_non_additive_simulation: isNonAdditiveSimulationError(error),
};
Expand Down Expand Up @@ -597,6 +636,12 @@ const computeSuccessRate = (docs: SimulationDocReport[]) => {
return successfulCount / docs.length;
};

const computeSkippedRate = (docs: SimulationDocReport[]) => {
const skippedCount = docs.reduce((rate, doc) => (rate += doc.status === 'skipped' ? 1 : 0), 0);

return skippedCount / docs.length;
};

const computeMappingProperties = (detectedFields: NamedFieldDefinitionConfig[]) => {
return Object.fromEntries(detectedFields.map(({ name, type }) => [name, { type }]));
};
Expand All @@ -609,6 +654,11 @@ const isSuccessfulProcessor = (
): processor is WithRequired<IngestSimulatePipelineSimulation, 'doc' | 'tag'> =>
processor.status === 'success' && !!processor.tag;

const isSkippedProcessor = (
processor: IngestPipelineSimulation
// @ts-expect-error Looks like the IngestPipelineSimulation.status is not typed correctly and misses the 'skipped' status
): processor is WithRequired<IngestPipelineSimulation, 'tag'> => processor.status === 'skipped';

// TODO: update type once Kibana updates to elasticsearch-js 8.17
const isMappingFailure = (entry: any) => entry.doc?.error?.type === 'document_parsing_exception';

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ export const ProcessorOutcomePreview = ({
}
}, [columns, selectedDocsFilter]);

const simulationFailureRate = simulation
? simulation?.failure_rate + simulation?.skipped_rate
: undefined;
const simulationSuccessRate = simulation?.success_rate;

return (
<>
<EuiFlexItem grow={false}>
Expand All @@ -76,8 +81,8 @@ export const ProcessorOutcomePreview = ({
timeRange={timeRange}
onTimeRangeChange={setTimeRange}
onTimeRangeRefresh={onRefreshSamples}
simulationFailureRate={simulation?.failure_rate}
simulationSuccessRate={simulation?.success_rate}
simulationFailureRate={simulationFailureRate}
simulationSuccessRate={simulationSuccessRate}
/>
</EuiFlexItem>
<EuiSpacer size="m" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@ const formatter = new Intl.NumberFormat('en-US', {
export const ProcessorMetricBadges = ({
detected_fields,
failure_rate,
skipped_rate,
success_rate,
}: ProcessorMetricBadgesProps) => {
const detectedFieldsCount = detected_fields.length;
const failureRate = failure_rate > 0 ? formatter.format(failure_rate) : null;
const skippedRate = skipped_rate > 0 ? formatter.format(skipped_rate) : null;
const successRate = success_rate > 0 ? formatter.format(success_rate) : null;

return (
Expand All @@ -53,6 +55,19 @@ export const ProcessorMetricBadges = ({
{failureRate}
</EuiBadge>
)}
{skippedRate && (
<EuiBadge
color="hollow"
iconType="minus"
title={i18n.translate('xpack.streams.processorMetricBadges.euiBadge.skippedRate', {
defaultMessage:
'{skippedRate} of the sampled documents were skipped due to the set condition',
values: { skippedRate },
})}
>
{skippedRate}
</EuiBadge>
)}
{successRate && (
<EuiBadge
color="hollow"
Expand Down

0 comments on commit d954d5d

Please sign in to comment.