Skip to content

Commit

Permalink
[EDR Workflows] Workflow Insights - Aggregate file events by path (el…
Browse files Browse the repository at this point in the history
…astic#207079)

This PR updates the method for fetching file events used as the
foundation for creating insights. Previously, we retrieved the last 200
events from the past 24 hours. With these changes, we now rely on
aggregations: all file events from the past 24 hours are aggregated by
file path, and for each path, only the latest event associated with it
is selected. The limit of 200 paths remains unchanged.
  • Loading branch information
szwarckonrad authored Jan 20, 2025
1 parent 1dc2aca commit 1d13e42
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ const SIZE = 200;
export function getFileEventsQuery({ endpointIds }: { endpointIds: string[] }): SearchRequest {
return {
allow_no_indices: true,
fields: ['_id', 'agent.id', 'process.executable'],
query: {
bool: {
must: [
Expand All @@ -34,15 +33,31 @@ export function getFileEventsQuery({ endpointIds }: { endpointIds: string[] }):
],
},
},
size: SIZE,
sort: [
{
'@timestamp': {
order: 'desc',
size: 0, // Aggregations only
aggs: {
unique_process_executable: {
terms: {
field: 'process.executable',
size: SIZE,
},
aggs: {
// Get the latest event for each process.executable
latest_event: {
top_hits: {
size: 1,
sort: [
{
'@timestamp': {
order: 'desc',
},
},
],
_source: ['_id', 'agent.id', 'process.executable'], // Include only necessary fields
},
},
},
},
],
_source: false,
},
ignore_unavailable: true,
index: [FILE_EVENTS_INDEX_PATTERN],
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { DefendInsightType, transformRawData } from '@kbn/elastic-assistant-comm
import { InvalidDefendInsightTypeError } from '../errors';
import { getFileEventsQuery } from './get_file_events_query';
import { getAnonymizedEvents } from '.';
import type { SearchResponse } from '@elastic/elasticsearch/lib/api/types';

jest.mock('@kbn/elastic-assistant-common', () => {
const originalModule = jest.requireActual('@kbn/elastic-assistant-common');
Expand All @@ -28,10 +29,46 @@ jest.mock('./get_file_events_query', () => ({
describe('getAnonymizedEvents', () => {
let mockEsClient: jest.Mocked<ElasticsearchClient>;

const mockHits = [
{ _index: 'test-index', fields: { field1: ['value1'] } },
{ _index: 'test-index', fields: { field2: ['value2'] } },
];
const mockAggregations = {
unique_process_executable: {
buckets: [
{
key: 'process1',
doc_count: 10,
latest_event: {
hits: {
hits: [
{
_id: 'event1',
_source: {
agent: { id: 'agent1' },
process: { executable: 'process1' },
},
},
],
},
},
},
{
key: 'process2',
doc_count: 5,
latest_event: {
hits: {
hits: [
{
_id: 'event2',
_source: {
agent: { id: 'agent2' },
process: { executable: 'process2' },
},
},
],
},
},
},
],
},
};

beforeEach(() => {
(getFileEventsQuery as jest.Mock).mockReturnValue({ index: 'test-index', body: {} });
Expand All @@ -48,9 +85,7 @@ describe('getAnonymizedEvents', () => {
skipped: 0,
failed: 0,
},
hits: {
hits: mockHits,
},
aggregations: mockAggregations,
}),
} as unknown as jest.Mocked<ElasticsearchClient>;
});
Expand All @@ -59,17 +94,54 @@ describe('getAnonymizedEvents', () => {
jest.clearAllMocks();
});

it('should return anonymized events successfully', async () => {
it('should return anonymized events successfully from aggregations', async () => {
const result = await getAnonymizedEvents({
endpointIds: ['endpoint1'],
type: DefendInsightType.Enum.incompatible_antivirus,
esClient: mockEsClient,
});

expect(result).toEqual(['anonymized_value1', 'anonymized_value2']);
expect(result).toEqual(['anonymized_event1', 'anonymized_event2']);
expect(getFileEventsQuery).toHaveBeenCalledWith({ endpointIds: ['endpoint1'] });
expect(mockEsClient.search).toHaveBeenCalledWith({ index: 'test-index', body: {} });
expect(transformRawData).toHaveBeenCalledTimes(2);
expect(transformRawData).toHaveBeenCalledWith(
expect.objectContaining({
rawData: expect.objectContaining({
_id: ['event1'],
}),
})
);
});

it('should map aggregation response correctly into fileEvents structure', async () => {
await getAnonymizedEvents({
endpointIds: ['endpoint1'],
type: DefendInsightType.Enum.incompatible_antivirus,
esClient: mockEsClient,
});

expect(mockEsClient.search).toHaveBeenCalledWith({ index: 'test-index', body: {} });

expect(transformRawData).toHaveBeenCalledWith(
expect.objectContaining({
rawData: {
_id: ['event1'],
'agent.id': ['agent1'],
'process.executable': ['process1'],
},
})
);

expect(transformRawData).toHaveBeenCalledWith(
expect.objectContaining({
rawData: {
_id: ['event2'],
'agent.id': ['agent2'],
'process.executable': ['process2'],
},
})
);
});

it('should throw InvalidDefendInsightTypeError for invalid type', async () => {
Expand All @@ -81,4 +153,31 @@ describe('getAnonymizedEvents', () => {
})
).rejects.toThrow(InvalidDefendInsightTypeError);
});

it('should handle empty aggregation response gracefully', async () => {
mockEsClient.search.mockResolvedValueOnce({
took: 1,
timed_out: false,
_shards: {
total: 1,
successful: 1,
skipped: 0,
failed: 0,
},
aggregations: {
unique_process_executable: {
buckets: [],
},
},
} as unknown as SearchResponse);

const result = await getAnonymizedEvents({
endpointIds: ['endpoint1'],
type: DefendInsightType.Enum.incompatible_antivirus,
esClient: mockEsClient,
});

expect(result).toEqual([]);
expect(transformRawData).not.toHaveBeenCalled();
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* 2.0.
*/

import type { SearchRequest, SearchResponse } from '@elastic/elasticsearch/lib/api/types';
import type { SearchRequest } from '@elastic/elasticsearch/lib/api/types';
import type { ElasticsearchClient } from '@kbn/core/server';
import type { Replacements } from '@kbn/elastic-assistant-common';
import type { AnonymizationFieldResponse } from '@kbn/elastic-assistant-common/impl/schemas/anonymization_fields/bulk_crud_anonymization_fields_route.gen';
Expand All @@ -20,6 +20,26 @@ import {
import { getFileEventsQuery } from './get_file_events_query';
import { InvalidDefendInsightTypeError } from '../errors';

interface AggregationResponse {
unique_process_executable: {
buckets: Array<{
key: string;
doc_count: number;
latest_event: {
hits: {
hits: Array<{
_id: string;
_source: {
agent: { id: string };
process: { executable: string };
};
}>;
};
};
}>;
};
}

export async function getAnonymizedEvents({
endpointIds,
type,
Expand Down Expand Up @@ -70,7 +90,17 @@ const getAnonymized = async ({
onNewReplacements?: (replacements: Replacements) => void;
replacements?: Replacements;
}): Promise<string[]> => {
const result = await esClient.search<SearchResponse>(query);
const result = await esClient.search<{}, AggregationResponse>(query);
const fileEvents = (result.aggregations?.unique_process_executable.buckets ?? []).map(
(bucket) => {
const latestEvent = bucket.latest_event.hits.hits[0];
return {
_id: [latestEvent._id],
'agent.id': [latestEvent._source.agent.id],
'process.executable': [latestEvent._source.process.executable],
};
}
);

// Accumulate replacements locally so we can, for example use the same
// replacement for a hostname when we see it in multiple alerts:
Expand All @@ -81,13 +111,13 @@ const getAnonymized = async ({
onNewReplacements?.(localReplacements); // invoke the callback with the latest replacements
};

return result.hits?.hits?.map((hit) =>
return fileEvents.map((fileEvent) =>
transformRawData({
anonymizationFields,
currentReplacements: localReplacements, // <-- the latest local replacements
getAnonymizedValue,
onNewReplacements: localOnNewReplacements, // <-- the local callback
rawData: getRawDataOrDefault(hit.fields),
rawData: getRawDataOrDefault(fileEvent),
})
);
};

0 comments on commit 1d13e42

Please sign in to comment.