diff --git a/x-pack/platform/packages/shared/kbn-streams-schema/src/fixtures/ingest_stream_config.ts b/x-pack/platform/packages/shared/kbn-streams-schema/src/fixtures/ingest_stream_config.ts index 925ac9310762f..a2ab87fa12508 100644 --- a/x-pack/platform/packages/shared/kbn-streams-schema/src/fixtures/ingest_stream_config.ts +++ b/x-pack/platform/packages/shared/kbn-streams-schema/src/fixtures/ingest_stream_config.ts @@ -22,15 +22,5 @@ export const ingestStreamConfig = { }, }, ], - routing: [ - { - name: 'logs.errors', - condition: { - field: 'log.level', - operator: 'eq', - value: 'error', - }, - }, - ], }, }; diff --git a/x-pack/platform/packages/shared/kbn-streams-schema/src/fixtures/wired_stream_config.ts b/x-pack/platform/packages/shared/kbn-streams-schema/src/fixtures/wired_stream_config.ts index a8fdf9c593bed..0ba7a0f0ba1c2 100644 --- a/x-pack/platform/packages/shared/kbn-streams-schema/src/fixtures/wired_stream_config.ts +++ b/x-pack/platform/packages/shared/kbn-streams-schema/src/fixtures/wired_stream_config.ts @@ -23,22 +23,22 @@ export const wiredStreamConfig = { }, }, ], - routing: [ - { - name: 'logs.errors', - condition: { - field: 'log.level', - operator: 'eq', - value: 'error', - }, - }, - ], wired: { fields: { new_field: { type: 'long', }, }, + routing: [ + { + name: 'logs.errors', + condition: { + field: 'log.level', + operator: 'eq', + value: 'error', + }, + }, + ], }, }, }; diff --git a/x-pack/platform/packages/shared/kbn-streams-schema/src/models/ingest/base.ts b/x-pack/platform/packages/shared/kbn-streams-schema/src/models/ingest/base.ts index 8cd31d6f09117..171fe5f0aa1f3 100644 --- a/x-pack/platform/packages/shared/kbn-streams-schema/src/models/ingest/base.ts +++ b/x-pack/platform/packages/shared/kbn-streams-schema/src/models/ingest/base.ts @@ -16,12 +16,12 @@ import { IngestStreamLifecycle, ingestStreamLifecycleSchema } from './lifecycle' interface IngestBase { lifecycle: IngestStreamLifecycle; processing: ProcessorDefinition[]; - routing: RoutingDefinition[]; } interface WiredIngest extends IngestBase { wired: { fields: FieldDefinition; + routing: RoutingDefinition[]; }; } @@ -50,7 +50,6 @@ type IngestStreamDefinition = WiredStreamDefinition | UnwiredStreamDefinition; const ingestBaseSchema: z.Schema = z.object({ lifecycle: ingestStreamLifecycleSchema, processing: z.array(processorDefinitionSchema), - routing: z.array(routingDefinitionSchema), }); const unwiredIngestSchema: z.Schema = z.intersection( @@ -65,6 +64,7 @@ const wiredIngestSchema: z.Schema = z.intersection( z.object({ wired: z.object({ fields: fieldDefinitionSchema, + routing: z.array(routingDefinitionSchema), }), }) ); diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/client.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/client.ts index 12cf56ca60e1a..149985f58079e 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/client.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/client.ts @@ -245,7 +245,7 @@ export class StreamsClient { }); if (parentDefinition) { - const isRoutingToChild = parentDefinition.ingest.routing.find( + const isRoutingToChild = parentDefinition.ingest.wired.routing.find( (item) => item.destination === name ); @@ -255,7 +255,7 @@ export class StreamsClient { // The user can set the condition later on the parent await this.updateStreamRouting({ definition: parentDefinition, - routing: parentDefinition.ingest.routing.concat({ + routing: parentDefinition.ingest.wired.routing.concat({ destination: name, if: { never: {} }, }), @@ -275,14 +275,14 @@ export class StreamsClient { ingest: { lifecycle: { inherit: {} }, processing: [], - routing: [ - { - destination: stream.name, - if: { never: {} }, - }, - ], wired: { fields: {}, + routing: [ + { + destination: stream.name, + if: { never: {} }, + }, + ], }, }, }, @@ -471,7 +471,7 @@ export class StreamsClient { validateStreamChildrenChanges(existingDefinition, definition); } - for (const item of definition.ingest.routing) { + for (const item of definition.ingest.wired.routing) { if (descendantsById[item.destination]) { continue; } @@ -486,9 +486,9 @@ export class StreamsClient { ingest: { lifecycle: { inherit: {} }, processing: [], - routing: [], wired: { fields: {}, + routing: [], }, }, }, @@ -543,14 +543,21 @@ export class StreamsClient { if: Condition; }): Promise { const parentDefinition = asIngestStreamDefinition(await this.getStream(parent)); + if (!isWiredStreamDefinition(parentDefinition)) { + throw new MalformedStreamIdError('Cannot fork a stream that is not managed'); + } const childDefinition: WiredStreamDefinition = { name, - ingest: { lifecycle: { inherit: {} }, processing: [], routing: [], wired: { fields: {} } }, + ingest: { lifecycle: { inherit: {} }, processing: [], wired: { fields: {}, routing: [] } }, }; // check whether root stream has a child of the given name already - if (parentDefinition.ingest.routing.some((item) => item.destination === childDefinition.name)) { + if ( + parentDefinition.ingest.wired.routing.some( + (item) => item.destination === childDefinition.name + ) + ) { throw new MalformedStreamIdError( `The stream with ID (${name}) already exists as a child of the parent stream` ); @@ -569,7 +576,7 @@ export class StreamsClient { await this.updateStreamRouting({ definition: updatedParentDefinition!, - routing: parentDefinition.ingest.routing.concat({ + routing: parentDefinition.ingest.wired.routing.concat({ destination: name, if: condition, }), @@ -696,7 +703,6 @@ export class StreamsClient { name: dataStream.name, ingest: { lifecycle: { inherit: {} }, - routing: [], processing: [], unwired: {}, }, @@ -757,7 +763,6 @@ export class StreamsClient { ingest: { lifecycle: { inherit: {} }, processing: [], - routing: [], unwired: {}, }, })); @@ -820,7 +825,7 @@ export class StreamsClient { await this.updateStreamRouting({ definition: parentDefinition, - routing: parentDefinition.ingest.routing.filter( + routing: parentDefinition.ingest.wired.routing.filter( (item) => item.destination !== definition.name ), }); @@ -828,7 +833,7 @@ export class StreamsClient { // delete the children first, as this will update // the parent as well - for (const item of definition.ingest.routing) { + for (const item of definition.ingest.wired.routing) { await this.deleteStream(item.destination); } @@ -856,10 +861,10 @@ export class StreamsClient { routing, }: { definition: WiredStreamDefinition; - routing: WiredStreamDefinition['ingest']['routing']; + routing: WiredStreamDefinition['ingest']['wired']['routing']; }) { const update = cloneDeep(definition); - update.ingest.routing = routing; + update.ingest.wired.routing = routing; await this.updateStoredStream(update); diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/helpers/sync.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/helpers/sync.ts index a0bb3ca92a967..7371c69affde7 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/helpers/sync.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/helpers/sync.ts @@ -241,9 +241,6 @@ export async function syncUnwiredStreamDefinitionObjects({ dataStream: IndicesDataStream; definition: UnwiredStreamDefinition; }) { - if (definition.ingest.routing.length) { - throw new Error('Unmanaged streams cannot have managed children, coming soon'); - } const unmanagedAssets = await getUnmanagedElasticsearchAssets({ dataStream, scopedClusterClient, diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/helpers/validate_stream.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/helpers/validate_stream.ts index c5453160e227d..0b64bca9f3aaf 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/helpers/validate_stream.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/helpers/validate_stream.ts @@ -91,11 +91,11 @@ export function validateStreamChildrenChanges( currentStreamDefinition: WiredStreamDefinition, nextStreamDefinition: WiredStreamDefinition ) { - const existingChildren = currentStreamDefinition.ingest.routing.map( + const existingChildren = currentStreamDefinition.ingest.wired.routing.map( (routingDefinition) => routingDefinition.destination ); - const nextChildren = nextStreamDefinition.ingest.routing.map( + const nextChildren = nextStreamDefinition.ingest.wired.routing.map( (routingDefinition) => routingDefinition.destination ); diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/ingest_pipelines/generate_reroute_pipeline.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/ingest_pipelines/generate_reroute_pipeline.ts index bd349272c6379..b4d407f7973b8 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/ingest_pipelines/generate_reroute_pipeline.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/ingest_pipelines/generate_reroute_pipeline.ts @@ -5,19 +5,19 @@ * 2.0. */ -import { IngestStreamDefinition } from '@kbn/streams-schema'; +import { WiredStreamDefinition } from '@kbn/streams-schema'; import { ASSET_VERSION } from '../../../../common/constants'; import { conditionToPainless } from '../helpers/condition_to_painless'; import { getReroutePipelineName } from './name'; interface GenerateReroutePipelineParams { - definition: IngestStreamDefinition; + definition: WiredStreamDefinition; } export function generateReroutePipeline({ definition }: GenerateReroutePipelineParams) { return { id: getReroutePipelineName(definition.name), - processors: definition.ingest.routing.map((child) => { + processors: definition.ingest.wired.routing.map((child) => { return { reroute: { destination: child.destination, diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/root_stream_definition.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/root_stream_definition.ts index 5701d0f4f8a5e..b29cdd0393359 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/root_stream_definition.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/root_stream_definition.ts @@ -14,8 +14,8 @@ export const rootStreamDefinition: WiredStreamDefinition = { ingest: { lifecycle: { dsl: {} }, processing: [], - routing: [], wired: { + routing: [], fields: { '@timestamp': { type: 'date', diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/flyout/field_summary.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/flyout/field_summary.tsx index ce8a9dc4fe824..89df11f51ab8e 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/flyout/field_summary.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/flyout/field_summary.tsx @@ -203,9 +203,9 @@ export const FieldSummary = (props: FieldSummaryProps) => { - {isEditing && stream.ingest.routing.length > 0 ? ( + {isEditing && stream.ingest.wired.routing.length > 0 ? ( - + ) : null} diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/hooks/use_schema_fields.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/hooks/use_schema_fields.ts index 1cb843efdebee..579344e640225 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/hooks/use_schema_fields.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/hooks/use_schema_fields.ts @@ -120,6 +120,7 @@ export const useSchemaFields = ({ ingest: { ...definition.stream.ingest, wired: { + ...definition.stream.ingest.wired, fields: { ...definition.stream.ingest.wired.fields, [field.name]: nextFieldDefinitionConfig, @@ -171,6 +172,7 @@ export const useSchemaFields = ({ ingest: { ...definition.stream.ingest, wired: { + ...definition.stream.ingest.wired, fields: omit(definition.stream.ingest.wired.fields, fieldName), }, }, diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_routing/control_bar.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_routing/control_bar.tsx index 54670099ace5e..1e5d15fb54e99 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_routing/control_bar.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_routing/control_bar.tsx @@ -89,7 +89,10 @@ export function ControlBar({ const request = { ingest: { ...stream.ingest, - routing, + wired: { + ...stream.ingest.wired, + routing, + }, }, } as IngestUpsertRequest; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_routing/hooks/routing_state.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_routing/hooks/routing_state.ts index ec538f00f7c36..0a110a73eedf4 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_routing/hooks/routing_state.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_routing/hooks/routing_state.ts @@ -39,15 +39,15 @@ export function useRoutingState({ // Child streams: either represents the child streams as they are, or the new order from drag and drop. const [childStreams, setChildStreams] = React.useState< - WiredStreamGetResponse['stream']['ingest']['routing'] - >(definition?.stream.ingest.routing ?? []); + WiredStreamGetResponse['stream']['ingest']['wired']['routing'] + >(definition?.stream.ingest.wired.routing ?? []); useEffect(() => { - setChildStreams(definition?.stream.ingest.routing ?? []); + setChildStreams(definition?.stream.ingest.wired.routing ?? []); }, [definition]); // Note: just uses reference equality to check if the order has changed as onChildStreamReorder will create a new array. - const hasChildStreamsOrderChanged = childStreams !== definition?.stream.ingest.routing; + const hasChildStreamsOrderChanged = childStreams !== definition?.stream.ingest.wired.routing; // Child stream currently being dragged const [draggingChildStream, setDraggingChildStream] = React.useState(); @@ -73,8 +73,8 @@ export function useRoutingState({ const cancelChanges = useCallback(() => { setChildUnderEdit(undefined); - setChildStreams(definition?.stream.ingest.routing ?? []); - }, [definition?.stream.ingest.routing]); + setChildStreams(definition?.stream.ingest.wired.routing ?? []); + }, [definition?.stream.ingest.wired.routing]); const debouncedChildUnderEdit = useDebounced(childUnderEdit, 300); diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/classic.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/classic.ts index 2a7733da077b4..0df895e88d359 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/classic.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/classic.ts @@ -50,7 +50,6 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { ingest: { lifecycle: { inherit: {} }, processing: [], - routing: [], unwired: {}, }, }); @@ -67,7 +66,6 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { stream: { ingest: { lifecycle: { inherit: {} }, - routing: [], processing: [ { grok: { @@ -122,7 +120,6 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { }, }, ], - routing: [], unwired: {}, }, }); @@ -188,7 +185,6 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { ingest: { lifecycle: { inherit: {} }, processing: [], - routing: [], unwired: {}, }, }, @@ -284,7 +280,6 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { stream: { ingest: { lifecycle: { inherit: {} }, - routing: [], processing: [ { grok: { diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/enrichment.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/enrichment.ts index 7be4d4851510a..cf41cdac6e985 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/enrichment.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/enrichment.ts @@ -77,8 +77,8 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { }, }, ], - routing: [], wired: { + routing: [], fields: { '@timestamp': { type: 'date', diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/flush_config.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/flush_config.ts index bb89b1320102a..bf9614a4a2a77 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/flush_config.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/flush_config.ts @@ -8,6 +8,7 @@ import expect from '@kbn/expect'; import { isGroupStreamDefinitionBase, + isUnwiredStreamDefinition, StreamGetResponse, WiredStreamGetResponse, } from '@kbn/streams-schema'; @@ -40,8 +41,8 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { it('checks whether deeply nested stream is created correctly', async () => { function getChildNames(stream: StreamGetResponse['stream']): string[] { - if (isGroupStreamDefinitionBase(stream)) return []; - return stream.ingest.routing.map((r) => r.destination); + if (isGroupStreamDefinitionBase(stream) || isUnwiredStreamDefinition(stream)) return []; + return stream.ingest.wired.routing.map((r) => r.destination); } const logs = await apiClient.fetch('GET /api/streams/{name}', { params: { diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/helpers/create_streams.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/helpers/create_streams.ts index c12e6930ce257..db13e24d4361c 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/helpers/create_streams.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/helpers/create_streams.ts @@ -36,33 +36,33 @@ const streams: StreamPutItem[] = [ type: 'keyword', }, }, - }, - routing: [ - { - destination: 'logs.test', - if: { - and: [ - { - field: 'numberfield', - operator: 'gt', - value: 15, - }, - ], + routing: [ + { + destination: 'logs.test', + if: { + and: [ + { + field: 'numberfield', + operator: 'gt', + value: 15, + }, + ], + }, }, - }, - { - destination: 'logs.test2', - if: { - and: [ - { - field: 'field2', - operator: 'eq', - value: 'abc', - }, - ], + { + destination: 'logs.test2', + if: { + and: [ + { + field: 'field2', + operator: 'eq', + value: 'abc', + }, + ], + }, }, - }, - ], + ], + }, }, }, }, @@ -71,9 +71,9 @@ const streams: StreamPutItem[] = [ stream: { ingest: { lifecycle: { inherit: {} }, - routing: [], processing: [], wired: { + routing: [], fields: { numberfield: { type: 'long', @@ -103,8 +103,8 @@ const streams: StreamPutItem[] = [ type: 'keyword', }, }, + routing: [], }, - routing: [], }, }, }, @@ -120,8 +120,8 @@ const streams: StreamPutItem[] = [ type: 'keyword', }, }, + routing: [], }, - routing: [], }, }, }, diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/lifecycle.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/lifecycle.ts index 7d7f91e6ec003..75d129d5a75d5 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/lifecycle.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/lifecycle.ts @@ -86,9 +86,8 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { stream: { ingest: { lifecycle: { inherit: {} }, - routing: [], processing: [], - wired: { fields: {} }, + wired: { fields: {}, routing: [] }, }, }, dashboards: [], @@ -159,7 +158,10 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { stream: { ingest: { ...wiredPutBody.stream.ingest, - routing: [{ destination: 'logs.overrides.lifecycle', if: { never: {} } }], + wired: { + fields: {}, + routing: [{ destination: 'logs.overrides.lifecycle', if: { never: {} } }], + }, lifecycle: { dsl: { data_retention: '1d' } }, }, }, @@ -203,7 +205,10 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { stream: { ingest: { ...wiredPutBody.stream.ingest, - routing: [{ destination: 'logs.10d.20d.inherits', if: { never: {} } }], + wired: { + fields: {}, + routing: [{ destination: 'logs.10d.20d.inherits', if: { never: {} } }], + }, }, }, }); @@ -266,7 +271,10 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { stream: { ingest: { ...wiredPutBody.stream.ingest, - routing: [{ destination: 'logs.ilm.stream', if: { never: {} } }], + wired: { + fields: {}, + routing: [{ destination: 'logs.ilm.stream', if: { never: {} } }], + }, lifecycle: { ilm: { policy: 'my-policy' } }, }, }, @@ -285,7 +293,6 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { stream: { ingest: { ...wiredPutBody.stream.ingest, - routing: [], lifecycle: { ilm: { policy: 'my-policy' } }, }, }, @@ -299,7 +306,10 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { stream: { ingest: { ...wiredPutBody.stream.ingest, - routing: [], + wired: { + fields: {}, + routing: [], + }, lifecycle: { dsl: { data_retention: '7d' } }, }, }, @@ -315,7 +325,10 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { stream: { ingest: { ...wiredPutBody.stream.ingest, - routing: [], + wired: { + fields: {}, + routing: [], + }, lifecycle: { dsl: { data_retention: '7d' } }, }, }, @@ -329,7 +342,10 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { stream: { ingest: { ...wiredPutBody.stream.ingest, - routing: [], + wired: { + routing: [], + fields: {}, + }, lifecycle: { ilm: { policy: 'my-policy' } }, }, }, @@ -345,7 +361,6 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { stream: { ingest: { lifecycle: { inherit: {} }, - routing: [], processing: [], unwired: {}, }, @@ -447,7 +462,10 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { stream: { ingest: { ...wiredPutBody.stream.ingest, - routing: [], + wired: { + fields: {}, + routing: [], + }, lifecycle: { dsl: { data_retention: '1d' } }, }, }, @@ -463,7 +481,10 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { stream: { ingest: { ...wiredPutBody.stream.ingest, - routing: [], + wired: { + fields: {}, + routing: [], + }, lifecycle: { ilm: { policy: 'this-stream-policy-does-not-exist' } }, }, }, @@ -490,7 +511,10 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { stream: { ingest: { ...wiredPutBody.stream.ingest, - routing: [], + wired: { + fields: {}, + routing: [], + }, lifecycle: { ilm: { policy: policyName } }, }, }, diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/root_stream.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/root_stream.ts index d664ee9a3375c..c7c6aa4136aab 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/root_stream.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/root_stream.ts @@ -19,8 +19,8 @@ const rootStreamDefinition: WiredStreamDefinition = { ingest: { lifecycle: { dsl: {} }, processing: [], - routing: [], wired: { + routing: [], fields: { '@timestamp': { type: 'date', @@ -91,6 +91,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { ingest: { ...rootStreamDefinition.ingest, wired: { + ...rootStreamDefinition.ingest.wired, fields: { ...rootStreamDefinition.ingest.wired.fields, 'log.level': { @@ -111,16 +112,19 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { stream: { ingest: { ...rootStreamDefinition.ingest, - routing: [ - { - destination: 'logs.gcpcloud', - if: { - field: 'cloud.provider', - operator: 'eq', - value: 'gcp', + wired: { + ...rootStreamDefinition.ingest.wired, + routing: [ + { + destination: 'logs.gcpcloud', + if: { + field: 'cloud.provider', + operator: 'eq', + value: 'gcp', + }, }, - }, - ], + ], + }, }, }, };