From 580bffbe0bbb03efe5d7c0de2857a072ec89da9c Mon Sep 17 00:00:00 2001 From: Ilya Nikokoshev Date: Fri, 21 Feb 2025 15:29:13 +0100 Subject: [PATCH 1/6] Use the correct header values when dropping header row --- .../server/graphs/csv/columns.ts | 20 +++++++++- .../automatic_import/server/graphs/csv/csv.ts | 39 ++++++++++++------- 2 files changed, 42 insertions(+), 17 deletions(-) diff --git a/x-pack/platform/plugins/shared/automatic_import/server/graphs/csv/columns.ts b/x-pack/platform/plugins/shared/automatic_import/server/graphs/csv/columns.ts index 108c4cec75bf6..83002b4029b92 100644 --- a/x-pack/platform/plugins/shared/automatic_import/server/graphs/csv/columns.ts +++ b/x-pack/platform/plugins/shared/automatic_import/server/graphs/csv/columns.ts @@ -30,8 +30,15 @@ export function toSafeColumnName(columnName: unknown): string | undefined { const safeName = columnName.replace(/[^a-zA-Z0-9_]/g, '_'); return /^[0-9]/.test(safeName) ? `Column${safeName}` : safeName; } -// Returns the column list from a header row. We skip values that are not strings. +/** + * Extracts column names from the provided header doc by truncating unnecessary columns + * and converting each name into a normalized format. + * + * @param tempColumnNames - The list of temporary column names (integer-based). + * @param headerObject - The processed first document (corresponding to the header row). + * @returns A filtered array of valid column names in a safe format or undefined where the value was neither string nor numbers. + */ export function columnsFromHeader( tempColumnNames: string[], headerObject: { [key: string]: unknown } @@ -44,8 +51,17 @@ export function columnsFromHeader( .map((columnName) => headerObject[columnName]) .map(toSafeColumnName); } -// Count the number of columns actually present in the rows. +/** + * Calculates the total number of columns in a CSV by going through the processed + * documents to find the last defined value across all rows. + * + * @param tempColumnNames - An array of column names used to reference CSV row properties. + * @param csvRows - An array of row objects representing CSV data, where each key + * corresponds to a column name from `tempColumnNames`. + * @returns The total number of columns, determined by the position of the last + * defined value across all rows. + */ export function totalColumnCount( tempColumnNames: string[], csvRows: Array<{ [key: string]: unknown }> diff --git a/x-pack/platform/plugins/shared/automatic_import/server/graphs/csv/csv.ts b/x-pack/platform/plugins/shared/automatic_import/server/graphs/csv/csv.ts index d753fd7995688..743aa2389497f 100644 --- a/x-pack/platform/plugins/shared/automatic_import/server/graphs/csv/csv.ts +++ b/x-pack/platform/plugins/shared/automatic_import/server/graphs/csv/csv.ts @@ -7,6 +7,7 @@ import type { LogFormatDetectionState } from '../../types'; import type { LogDetectionNodeParams } from '../log_type_detection/types'; import { createJSONInput } from '../../util'; +import type { ESProcessorItem } from '../../../common'; import { createCSVProcessor, createDropProcessor } from '../../util/processors'; import { CSVParseError, UnparseableCSVFormatError } from '../../lib/errors/unparseable_csv_error'; import { @@ -46,36 +47,44 @@ export async function handleCSV({ throw new UnparseableCSVFormatError(tempErrors as CSVParseError[]); } - const headerColumns = state.samplesFormat.header - ? columnsFromHeader(temporaryColumns, tempResults[0]) - : []; + const prefix = [packageName, dataStreamName]; + + // What columns does the LLM suggest? const llmProvidedColumns = (state.samplesFormat.columns || []).map(toSafeColumnName); const needColumns = totalColumnCount(temporaryColumns, tempResults); - const columns: string[] = Array.from( - yieldUniqueColumnNames(needColumns, [llmProvidedColumns, headerColumns], temporaryColumns) - ); - const prefix = [packageName, dataStreamName]; - const prefixedColumns = prefixColumns(columns, prefix); - const csvProcessor = createCSVProcessor('message', prefixedColumns); - const csvHandlingProcessors = [csvProcessor]; + // What columns do we get by parsing the header row, if any? + const headerColumns: Array = []; + const dropProcessors: ESProcessorItem[] = []; + if (state.samplesFormat.header) { + const headerResults = tempResults[0]; + headerColumns.push(...columnsFromHeader(temporaryColumns, headerResults)); - if (headerColumns.length > 0) { - const dropValues = columns.reduce((acc, column, index) => { - if (headerColumns[index] !== undefined) { - acc[column] = String(headerColumns[index]); + const dropValues = temporaryColumns.reduce((acc, column, index) => { + const headerValue = headerResults[temporaryColumns[index]]; + if (typeof headerValue === 'string') { + acc[column] = headerValue; } return acc; }, {} as Record); + const dropProcessor = createDropProcessor( dropValues, prefix, 'remove_csv_header', 'Remove the CSV header line by comparing the values' ); - csvHandlingProcessors.push(dropProcessor); + dropProcessors.push(dropProcessor); } + // Combine all that information into a single list of columns + const columns: string[] = Array.from( + yieldUniqueColumnNames(needColumns, [llmProvidedColumns, headerColumns], temporaryColumns) + ); + const prefixedColumns = prefixColumns(columns, prefix); + const csvProcessor = createCSVProcessor('message', prefixedColumns); + const csvHandlingProcessors = [csvProcessor]; + const { pipelineResults: finalResults, errors: finalErrors } = await createJSONInput( csvHandlingProcessors, samples, From 80824e1ee52ceeb61e89b99e8bdc8777f872a775 Mon Sep 17 00:00:00 2001 From: Ilya Nikokoshev Date: Fri, 21 Feb 2025 15:35:32 +0100 Subject: [PATCH 2/6] Fix the csv bug --- .../plugins/shared/automatic_import/server/graphs/csv/csv.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/x-pack/platform/plugins/shared/automatic_import/server/graphs/csv/csv.ts b/x-pack/platform/plugins/shared/automatic_import/server/graphs/csv/csv.ts index 743aa2389497f..6cf78d2357927 100644 --- a/x-pack/platform/plugins/shared/automatic_import/server/graphs/csv/csv.ts +++ b/x-pack/platform/plugins/shared/automatic_import/server/graphs/csv/csv.ts @@ -82,8 +82,10 @@ export async function handleCSV({ yieldUniqueColumnNames(needColumns, [llmProvidedColumns, headerColumns], temporaryColumns) ); const prefixedColumns = prefixColumns(columns, prefix); + + // Instantiate the processors to handle the CSV format const csvProcessor = createCSVProcessor('message', prefixedColumns); - const csvHandlingProcessors = [csvProcessor]; + const csvHandlingProcessors = [csvProcessor, ...dropProcessors]; const { pipelineResults: finalResults, errors: finalErrors } = await createJSONInput( csvHandlingProcessors, From abf925b45a9285370bec2c9110c42311b47d92f3 Mon Sep 17 00:00:00 2001 From: Ilya Nikokoshev Date: Fri, 21 Feb 2025 15:48:14 +0100 Subject: [PATCH 3/6] Now the fix works --- .../automatic_import/server/graphs/csv/csv.ts | 32 +++++++++++-------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/x-pack/platform/plugins/shared/automatic_import/server/graphs/csv/csv.ts b/x-pack/platform/plugins/shared/automatic_import/server/graphs/csv/csv.ts index 6cf78d2357927..0984f9853490a 100644 --- a/x-pack/platform/plugins/shared/automatic_import/server/graphs/csv/csv.ts +++ b/x-pack/platform/plugins/shared/automatic_import/server/graphs/csv/csv.ts @@ -47,20 +47,33 @@ export async function handleCSV({ throw new UnparseableCSVFormatError(tempErrors as CSVParseError[]); } + // Some basic information we'll need later const prefix = [packageName, dataStreamName]; // What columns does the LLM suggest? const llmProvidedColumns = (state.samplesFormat.columns || []).map(toSafeColumnName); - const needColumns = totalColumnCount(temporaryColumns, tempResults); - // What columns do we get by parsing the header row, if any? + // What columns do we get by parsing the header row, if any exists? const headerColumns: Array = []; - const dropProcessors: ESProcessorItem[] = []; if (state.samplesFormat.header) { const headerResults = tempResults[0]; headerColumns.push(...columnsFromHeader(temporaryColumns, headerResults)); + } + + // Combine all that information into a single list of columns + const columns: string[] = Array.from( + yieldUniqueColumnNames( + totalColumnCount(temporaryColumns, tempResults), + [llmProvidedColumns, headerColumns], + temporaryColumns + ) + ); - const dropValues = temporaryColumns.reduce((acc, column, index) => { + // Instantiate the processors to handle the CSV format + const dropProcessors: ESProcessorItem[] = []; + if (state.samplesFormat.header) { + const headerResults = tempResults[0]; + const dropValues = columns.reduce((acc, column, index) => { const headerValue = headerResults[temporaryColumns[index]]; if (typeof headerValue === 'string') { acc[column] = headerValue; @@ -76,17 +89,10 @@ export async function handleCSV({ ); dropProcessors.push(dropProcessor); } - - // Combine all that information into a single list of columns - const columns: string[] = Array.from( - yieldUniqueColumnNames(needColumns, [llmProvidedColumns, headerColumns], temporaryColumns) - ); const prefixedColumns = prefixColumns(columns, prefix); + const csvHandlingProcessors = [createCSVProcessor('message', prefixedColumns), ...dropProcessors]; - // Instantiate the processors to handle the CSV format - const csvProcessor = createCSVProcessor('message', prefixedColumns); - const csvHandlingProcessors = [csvProcessor, ...dropProcessors]; - + // Test the processors on the samples provided const { pipelineResults: finalResults, errors: finalErrors } = await createJSONInput( csvHandlingProcessors, samples, From 8cffe1bfc9fc7019724f1cf80aa3bbb84a423a28 Mon Sep 17 00:00:00 2001 From: Ilya Nikokoshev Date: Wed, 26 Feb 2025 14:19:15 +0100 Subject: [PATCH 4/6] Rewrite with tests --- .../server/graphs/csv/columns.ts | 22 ++- .../server/graphs/csv/csv.test.ts | 155 ++++++++++++++++++ .../automatic_import/server/graphs/csv/csv.ts | 125 +++++++++----- .../automatic_import/server/util/fields.ts | 10 ++ .../automatic_import/server/util/pipeline.ts | 3 +- 5 files changed, 275 insertions(+), 40 deletions(-) create mode 100644 x-pack/platform/plugins/shared/automatic_import/server/graphs/csv/csv.test.ts diff --git a/x-pack/platform/plugins/shared/automatic_import/server/graphs/csv/columns.ts b/x-pack/platform/plugins/shared/automatic_import/server/graphs/csv/columns.ts index 83002b4029b92..3c7c1ed10af42 100644 --- a/x-pack/platform/plugins/shared/automatic_import/server/graphs/csv/columns.ts +++ b/x-pack/platform/plugins/shared/automatic_import/server/graphs/csv/columns.ts @@ -43,13 +43,33 @@ export function columnsFromHeader( tempColumnNames: string[], headerObject: { [key: string]: unknown } ): Array { + return valuesFromHeader(tempColumnNames, headerObject).map(toSafeColumnName); +} + +/** + * Extracts values from a header object based on column names, converting non-string/numeric values to undefined. + * The function processes the array up to the last non-undefined value in the header object. + * + * @param tempColumnNames - Array of column names to look up in the header object + * @param headerObject - Object containing header values indexed by column names + * @returns Array of string/number values or undefined for non-string/number values, truncated at the last non-undefined entry + * + * @example + * const columns = ['col1', 'col2', 'col3', 'col4']; + * const header = { col1: 'value1', col2: 123, col3: 'value3', 'col4': null }; + * valuesFromHeader(columns, header); // ['value1', 123, 'value3', undefined] + */ +export function valuesFromHeader( + tempColumnNames: string[], + headerObject: { [key: string]: unknown } +): Array { const maxIndex = tempColumnNames.findLastIndex( (columnName) => headerObject[columnName] !== undefined ); return tempColumnNames .slice(0, maxIndex + 1) .map((columnName) => headerObject[columnName]) - .map(toSafeColumnName); + .map((value) => (typeof value === 'string' || typeof value === 'number' ? value : undefined)); } /** diff --git a/x-pack/platform/plugins/shared/automatic_import/server/graphs/csv/csv.test.ts b/x-pack/platform/plugins/shared/automatic_import/server/graphs/csv/csv.test.ts new file mode 100644 index 0000000000000..cf2b9d16f8e13 --- /dev/null +++ b/x-pack/platform/plugins/shared/automatic_import/server/graphs/csv/csv.test.ts @@ -0,0 +1,155 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { IScopedClusterClient } from '@kbn/core/server'; +import { handleCSV } from './csv'; +import { ESProcessorItem } from '../../../common'; +import { DocTemplate } from '../../util/pipeline'; + +interface SimpleCSVPipelineSimulationParams { + pipeline: { processors: ESProcessorItem[] }; + docs: DocTemplate[]; +} + +/** + * Simulates processing a list of documents with a defined pipeline of processors, + * specifically handling 'csv' and 'drop' processors in the way they are used in our CSV processing. + * + * @param params - An object containing the pipeline of processors and the documents to be transformed. + * @returns An object containing the processed list of documents after all processors in the pipeline have been applied. + */ +export const simpleCSVPipelineSimulation = ( + params: SimpleCSVPipelineSimulationParams +): { docs: Array<{ doc: DocTemplate }> } => { + const { pipeline, docs } = params; + for (const processor of pipeline.processors) { + if ('remove' in processor) { + // do nothing + } else if ('csv' in processor) { + // Not a real CSV parser, of course. It only handles the "json.*" field names. + const fields = processor.csv.target_fields as string[]; + for (const doc of docs) { + const message = doc._source.message; + const values = message.split(','); + const unpacked: Record = {}; + for (let i = 0; i < fields.length; i++) { + const field = fields[i].startsWith('json.') ? fields[i].slice(5) : fields[i]; + // The only error it handles is: CSV value starts with " and does not end with ". + if (values[i].startsWith('"') && !values[i].endsWith('"')) { + throw new Error('Mismatched quote'); + } + unpacked[field] = values[i].startsWith('"') ? values[i].slice(1, -1) : values[i]; + } + // eslint-disable-next-line dot-notation + doc._source['json'] = unpacked; + } + } else if ('drop' in processor) { + docs.shift(); + } else { + throw new Error('Unknown processor'); + } + } + return { docs: docs.map((doc) => ({ doc })) }; +}; + +describe('handleCSV', () => { + const mockClient = { + asCurrentUser: { + ingest: { + simulate: simpleCSVPipelineSimulation, + }, + }, + } as unknown as IScopedClusterClient; + + it('should successfully parse valid CSV logs without header', async () => { + const mockParams = { + state: { + packageName: 'testPackage', + dataStreamName: 'testDataStream', + logSamples: ['123,"string",456', '"123",Some Value,"456"'], + samplesFormat: { + columns: [], + header: false, + }, + additionalProcessors: [], + }, + client: mockClient, + }; + + const result = await handleCSV(mockParams); + expect(result.jsonSamples).toBeDefined(); + expect(result.additionalProcessors).toHaveLength(1); // Must be CSV and drop processor + if (!result.additionalProcessors) { + fail('additionalProcessors is undefined, logic error after expectation'); + } + + const csvProcessor = result.additionalProcessors[0].csv; + expect(csvProcessor).toBeDefined(); + expect(csvProcessor.target_fields).toEqual([ + 'testPackage.testDataStream.column1', + 'testPackage.testDataStream.column2', + 'testPackage.testDataStream.column3', + ]); + expect(result.jsonSamples).toEqual([ + '{"column1":"123","column2":"string","column3":"456"}', + '{"column1":"123","column2":"Some Value","column3":"456"}', + ]); + expect(result.lastExecutedChain).toBe('handleCSV'); + }); + + it('should successfully parse valid CSV logs with header', async () => { + const mockParams = { + state: { + packageName: 'testPackage', + dataStreamName: 'testDataStream', + logSamples: ['header1,header2,header3', 'value1,value2,value3'], + samplesFormat: { + columns: ['first column', 'second column'], + header: true, + }, + additionalProcessors: [], + }, + client: mockClient, + }; + + const result = await handleCSV(mockParams); + expect(result.jsonSamples).toBeDefined(); + expect(result.additionalProcessors).toHaveLength(2); // Must be CSV and drop processor + if (!result.additionalProcessors) { + fail('additionalProcessors is undefined, logic error after expectation'); + } + const csvProcessor = result.additionalProcessors[0].csv; + expect(csvProcessor).toBeDefined(); + expect(csvProcessor.target_fields).toEqual([ + 'testPackage.testDataStream.first_column', + 'testPackage.testDataStream.second_column', + 'testPackage.testDataStream.header3', + ]); + const dropProcessor = result.additionalProcessors[1].drop; + expect(dropProcessor).toBeDefined(); + expect(dropProcessor.if).toContain('header1'); // column value, not column name! + expect(result.lastExecutedChain).toBe('handleCSV'); + }); + + it('should throw UnparseableCSVFormatError when CSV parsing fails', async () => { + const mockParams = { + state: { + packageName: 'testPackage', + dataStreamName: 'testDataStream', + // Intentionally malformed according to our simple CSV parser + logSamples: ['header1,header2', '"values...'], + samplesFormat: { + columns: ['col1', 'col2'], + header: true, + }, + additionalProcessors: [], + }, + client: mockClient, + }; + await expect(handleCSV(mockParams)).rejects.toThrow('unparseable-csv-data'); + }); +}); diff --git a/x-pack/platform/plugins/shared/automatic_import/server/graphs/csv/csv.ts b/x-pack/platform/plugins/shared/automatic_import/server/graphs/csv/csv.ts index 0984f9853490a..67d36c4030ff1 100644 --- a/x-pack/platform/plugins/shared/automatic_import/server/graphs/csv/csv.ts +++ b/x-pack/platform/plugins/shared/automatic_import/server/graphs/csv/csv.ts @@ -4,8 +4,8 @@ * 2.0; you may not use this file except in compliance with the Elastic License * 2.0. */ +import { IScopedClusterClient } from '@kbn/core/server'; import type { LogFormatDetectionState } from '../../types'; -import type { LogDetectionNodeParams } from '../log_type_detection/types'; import { createJSONInput } from '../../util'; import type { ESProcessorItem } from '../../../common'; import { createCSVProcessor, createDropProcessor } from '../../util/processors'; @@ -14,6 +14,7 @@ import { generateColumnNames, upperBoundForColumnCount, columnsFromHeader, + valuesFromHeader, toSafeColumnName, totalColumnCount, yieldUniqueColumnNames, @@ -23,11 +24,70 @@ import { // We will only create the processor for the first MAX_CSV_COLUMNS columns. const MAX_CSV_COLUMNS = 100; -// Converts CSV samples into JSON samples. +interface HandleCSVState { + packageName: string; + dataStreamName: string; + logSamples: string[]; + samplesFormat: { + columns?: string[]; + header?: boolean; + }; + additionalProcessors: ESProcessorItem[]; +} + +interface HandleCSVParams { + state: HandleCSVState; + client: IScopedClusterClient; +} + +function createCSVPipeline( + prefix: string[], + columns: string[], + headerValues: Array +): ESProcessorItem[] { + const prefixedColumns = prefixColumns(columns, prefix); + const dropProcessors: ESProcessorItem[] = []; + + if (headerValues.length !== 0) { + const dropValues = columns.reduce((acc, column, index) => { + const headerValue = headerValues[index]; + if (headerValue !== undefined) { + acc[column] = headerValue; + } + return acc; + }, {} as Record); + + const dropProcessor = createDropProcessor( + dropValues, + prefix, + 'remove_csv_header', + 'Remove the CSV header by comparing row values to the header row.' + ); + dropProcessors.push(dropProcessor); + } + + return [createCSVProcessor('message', prefixedColumns), ...dropProcessors]; +} + +/** + * Processes CSV log data and converts it to JSON for further pipeline execution. + * + * This function attempts to parse CSV-formatted samples with temporary columns names first. + * If that is successful, the final column names are determined by combining the columns suggested by the LLM, + * the columns parsed from the header row, and the temporary columns as the last resort. + * + * We generate necessary processors to handle the CSV format, including a processor to drop the header row if it exists. + * The samples are then processed with these processors to convert them to JSON and stored in the state. + * + * @param param0 - An object containing the state, which holds log samples and format info, and the Elasticsearch client. + * @returns A promise resolving to a partial state containing JSON samples, additional processors, and the last executed chain label. + * @throws UnparseableCSVFormatError if CSV parsing fails for any log samples. + */ export async function handleCSV({ state, client, -}: LogDetectionNodeParams): Promise> { +}: HandleCSVParams): Promise> { + const jsonKey = 'json'; const packageName = state.packageName; const dataStreamName = state.dataStreamName; @@ -35,10 +95,10 @@ export async function handleCSV({ const temporaryColumns = generateColumnNames( Math.min(upperBoundForColumnCount(samples), MAX_CSV_COLUMNS) ); - const temporaryProcessor = createCSVProcessor('message', temporaryColumns); + const temporaryPipeline = createCSVPipeline([jsonKey], temporaryColumns, []); const { pipelineResults: tempResults, errors: tempErrors } = await createJSONInput( - [temporaryProcessor], + temporaryPipeline, samples, client ); @@ -55,59 +115,48 @@ export async function handleCSV({ // What columns do we get by parsing the header row, if any exists? const headerColumns: Array = []; + const headerValues: Array = []; + const csvRows = tempResults.map((result) => result[jsonKey] as { [key: string]: unknown }); + if (state.samplesFormat.header) { - const headerResults = tempResults[0]; - headerColumns.push(...columnsFromHeader(temporaryColumns, headerResults)); + const headerRow = csvRows[0]; + headerValues.push(...valuesFromHeader(temporaryColumns, headerRow)); + headerColumns.push(...columnsFromHeader(temporaryColumns, headerRow)); } // Combine all that information into a single list of columns const columns: string[] = Array.from( yieldUniqueColumnNames( - totalColumnCount(temporaryColumns, tempResults), + totalColumnCount(temporaryColumns, csvRows), [llmProvidedColumns, headerColumns], temporaryColumns ) ); - // Instantiate the processors to handle the CSV format - const dropProcessors: ESProcessorItem[] = []; - if (state.samplesFormat.header) { - const headerResults = tempResults[0]; - const dropValues = columns.reduce((acc, column, index) => { - const headerValue = headerResults[temporaryColumns[index]]; - if (typeof headerValue === 'string') { - acc[column] = headerValue; - } - return acc; - }, {} as Record); + // These processors extract CSV fields into a specific key. + const csvHandlingProcessors = createCSVPipeline(prefix, columns, headerValues); - const dropProcessor = createDropProcessor( - dropValues, - prefix, - 'remove_csv_header', - 'Remove the CSV header line by comparing the values' - ); - dropProcessors.push(dropProcessor); + // Test the processors on the samples provided + const { errors } = await createJSONInput(csvHandlingProcessors, samples, client); + + if (errors.length > 0) { + throw new UnparseableCSVFormatError(errors as CSVParseError[]); } - const prefixedColumns = prefixColumns(columns, prefix); - const csvHandlingProcessors = [createCSVProcessor('message', prefixedColumns), ...dropProcessors]; - // Test the processors on the samples provided - const { pipelineResults: finalResults, errors: finalErrors } = await createJSONInput( - csvHandlingProcessors, + // These processors extract CSV fields into a specific key. + const csvToJSONProcessors = createCSVPipeline([jsonKey], columns, headerValues); + + const { pipelineResults: jsonResults, errors: jsonErrors } = await createJSONInput( + csvToJSONProcessors, samples, client ); - if (finalErrors.length > 0) { - throw new UnparseableCSVFormatError(finalErrors as CSVParseError[]); + if (jsonErrors.length > 0) { + throw new UnparseableCSVFormatError(jsonErrors as CSVParseError[]); } - // Converts JSON Object into a string and parses it as a array of JSON strings - const jsonSamples = finalResults - .map((log) => log[packageName]) - .map((log) => (log as Record)[dataStreamName]) - .map((log) => JSON.stringify(log)); + const jsonSamples = jsonResults.map((log) => log[jsonKey]).map((log) => JSON.stringify(log)); return { jsonSamples, diff --git a/x-pack/platform/plugins/shared/automatic_import/server/util/fields.ts b/x-pack/platform/plugins/shared/automatic_import/server/util/fields.ts index 810754b23e150..f43ca8166e254 100644 --- a/x-pack/platform/plugins/shared/automatic_import/server/util/fields.ts +++ b/x-pack/platform/plugins/shared/automatic_import/server/util/fields.ts @@ -23,3 +23,13 @@ export type FieldPath = string[]; export function fieldPathToProcessorString(fieldPath: FieldPath): string { return fieldPath.join('.'); } + +/** + * Converts a string representing a processor's path into a field path array. + * + * @param processorString - The dotted string to convert + * @returns An array of path segments representing the field path + */ +export function processorStringToFieldPath(processorString: string): FieldPath { + return processorString.split('.'); +} diff --git a/x-pack/platform/plugins/shared/automatic_import/server/util/pipeline.ts b/x-pack/platform/plugins/shared/automatic_import/server/util/pipeline.ts index 6eacb8b19b468..843832046d3e6 100644 --- a/x-pack/platform/plugins/shared/automatic_import/server/util/pipeline.ts +++ b/x-pack/platform/plugins/shared/automatic_import/server/util/pipeline.ts @@ -8,11 +8,12 @@ import type { IScopedClusterClient } from '@kbn/core-elasticsearch-server'; import { ESProcessorItem } from '../../common'; import { createPassthroughFailureProcessor, createRemoveProcessor } from './processors'; -interface DocTemplate { +export interface DocTemplate { _index: string; _id: string; _source: { message: string; + [key: string]: unknown; }; } From 3a8c209760c34d85f00dae36b0c08a3dfbf37661 Mon Sep 17 00:00:00 2001 From: Ilya Nikokoshev Date: Wed, 26 Feb 2025 14:31:22 +0100 Subject: [PATCH 5/6] Update the function doc --- .../automatic_import/server/graphs/csv/csv.ts | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/x-pack/platform/plugins/shared/automatic_import/server/graphs/csv/csv.ts b/x-pack/platform/plugins/shared/automatic_import/server/graphs/csv/csv.ts index 67d36c4030ff1..6481816227016 100644 --- a/x-pack/platform/plugins/shared/automatic_import/server/graphs/csv/csv.ts +++ b/x-pack/platform/plugins/shared/automatic_import/server/graphs/csv/csv.ts @@ -70,18 +70,19 @@ function createCSVPipeline( } /** - * Processes CSV log data and converts it to JSON for further pipeline execution. + * Processes CSV log data by parsing, testing, and converting to JSON format. * - * This function attempts to parse CSV-formatted samples with temporary columns names first. - * If that is successful, the final column names are determined by combining the columns suggested by the LLM, - * the columns parsed from the header row, and the temporary columns as the last resort. + * The process follows three stages: + * 1. Initial parsing with temporary column names (column1, column2, etc.) + * 2. Testing with actual pipeline using package.dataStream.columnName format + * 3. Converting to JSON format for further processing * - * We generate necessary processors to handle the CSV format, including a processor to drop the header row if it exists. - * The samples are then processed with these processors to convert them to JSON and stored in the state. + * Final column names are determined by combining LLM suggestions, header row parsing, + * and temporary columns as fallback. Includes header row handling and CSV-to-JSON conversion. * - * @param param0 - An object containing the state, which holds log samples and format info, and the Elasticsearch client. - * @returns A promise resolving to a partial state containing JSON samples, additional processors, and the last executed chain label. - * @throws UnparseableCSVFormatError if CSV parsing fails for any log samples. + * @param param0 - Object containing state (log samples, format info) and Elasticsearch client + * @returns Promise with JSON samples, processors, and chain label + * @throws UnparseableCSVFormatError if CSV parsing fails */ export async function handleCSV({ state, From 4ff69622a7cbea4c1494eb048a9e7c4228e96cf7 Mon Sep 17 00:00:00 2001 From: Ilya Nikokoshev Date: Fri, 28 Feb 2025 13:30:40 +0100 Subject: [PATCH 6/6] Fix type complaint --- .../automatic_import/server/graphs/log_type_detection/graph.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/platform/plugins/shared/automatic_import/server/graphs/log_type_detection/graph.ts b/x-pack/platform/plugins/shared/automatic_import/server/graphs/log_type_detection/graph.ts index ae4c607ab3f68..7a56fb57f83f4 100644 --- a/x-pack/platform/plugins/shared/automatic_import/server/graphs/log_type_detection/graph.ts +++ b/x-pack/platform/plugins/shared/automatic_import/server/graphs/log_type_detection/graph.ts @@ -126,7 +126,7 @@ export async function getLogFormatDetectionGraph({ model, client }: LogDetection 'handleUnstructuredGraph', (await getUnstructuredGraph({ model, client })).withConfig({ runName: 'Unstructured' }) ) - .addNode('handleCSV', (state: LogFormatDetectionState) => handleCSV({ state, model, client })) + .addNode('handleCSV', (state: LogFormatDetectionState) => handleCSV({ state, client })) .addEdge(START, 'modelInput') .addEdge('modelInput', 'handleLogFormatDetection') .addEdge('handleKVGraph', 'modelOutput')