Skip to content

Commit

Permalink
[Streams] Centralize error handling (elastic#207858)
Browse files Browse the repository at this point in the history
Centralizes the error handling by:
- creating a base StatusError exception that reports a status code
- having all known errors extend from StatusError and setting a default
status code for that error
- handling both ES response and status errors into a single place,
converting them into Boom errors

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
Co-authored-by: Joe Reuter <johannes.reuter@elastic.co>
  • Loading branch information
3 people authored Jan 23, 2025
1 parent 28885a7 commit c33ecb9
Show file tree
Hide file tree
Showing 39 changed files with 557 additions and 746 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import { ZodSchema, z } from '@kbn/zod';

export function createIsNarrowSchema<TBaseSchema extends z.Schema, TNarrowSchema extends z.Schema>(
base: TBaseSchema,
_base: TBaseSchema,
narrow: TNarrowSchema
) {
return <TValue extends z.input<TBaseSchema>>(
Expand All @@ -19,7 +19,7 @@ export function createIsNarrowSchema<TBaseSchema extends z.Schema, TNarrowSchema
}

export function createAsSchemaOrThrow<TBaseSchema extends z.Schema, TNarrowSchema extends z.Schema>(
base: TBaseSchema,
_base: TBaseSchema,
narrow: TNarrowSchema
) {
return <TValue extends z.input<TBaseSchema>>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ import {
} from '@kbn/streams-schema';
import { cloneDeep, keyBy, omit, orderBy } from 'lodash';
import { AssetClient } from './assets/asset_client';
import { DefinitionNotFound, SecurityException } from './errors';
import { MalformedStreamId } from './errors/malformed_stream_id';
import {
syncUnwiredStreamDefinitionObjects,
syncWiredStreamDefinitionObjects,
Expand All @@ -50,6 +48,9 @@ import {
deleteStreamObjects,
deleteUnmanagedStreamObjects,
} from './stream_crud';
import { DefinitionNotFoundError } from './errors/definition_not_found_error';
import { MalformedStreamIdError } from './errors/malformed_stream_id_error';
import { SecurityError } from './errors/security_error';

interface AcknowledgeResponse<TResult extends Result> {
acknowledged: true;
Expand All @@ -70,8 +71,8 @@ function isElasticsearch404(error: unknown): error is errors.ResponseError & { s
return isResponseError(error) && error.statusCode === 404;
}

function isDefinitionNotFoundError(error: unknown): error is DefinitionNotFound {
return error instanceof DefinitionNotFound;
function isDefinitionNotFoundError(error: unknown): error is DefinitionNotFoundError {
return error instanceof DefinitionNotFoundError;
}

export class StreamsClient {
Expand Down Expand Up @@ -362,7 +363,7 @@ export class StreamsClient {
parentDefinition &&
!isWiredStreamDefinition(parentDefinition)
) {
throw new MalformedStreamId('Cannot fork a stream that is not managed');
throw new MalformedStreamIdError('Cannot fork a stream that is not managed');
}

validateAncestorFields({
Expand All @@ -384,7 +385,7 @@ export class StreamsClient {
continue;
}
if (!isChildOf(definition.name, item.destination)) {
throw new MalformedStreamId(
throw new MalformedStreamIdError(
`The ID (${item.destination}) from the child stream must start with the parent's id (${definition.name}), followed by a dot and a name`
);
}
Expand Down Expand Up @@ -435,12 +436,12 @@ export class StreamsClient {

// check whether root stream has a child of the given name already
if (parentDefinition.ingest.routing.some((item) => item.destination === childDefinition.name)) {
throw new MalformedStreamId(
throw new MalformedStreamIdError(
`The stream with ID (${name}) already exists as a child of the parent stream`
);
}
if (!isChildOf(parentDefinition.name, childDefinition.name)) {
throw new MalformedStreamId(
throw new MalformedStreamIdError(
`The ID (${name}) from the new stream must start with the parent's id (${parentDefinition.name}), followed by a dot and a name`
);
}
Expand Down Expand Up @@ -483,7 +484,7 @@ export class StreamsClient {
checkAccess({ id: name, scopedClusterClient: this.dependencies.scopedClusterClient }).then(
(privileges) => {
if (!privileges.read) {
throw new DefinitionNotFound(`Stream definition for ${name} not found`);
throw new DefinitionNotFoundError(`Stream definition for ${name} not found`);
}
}
),
Expand All @@ -500,7 +501,7 @@ export class StreamsClient {
})
.catch(async (error) => {
if (isElasticsearch404(error)) {
throw new DefinitionNotFound(`Cannot find stream ${name}`);
throw new DefinitionNotFoundError(`Cannot find stream ${name}`);
}
throw error;
});
Expand Down Expand Up @@ -711,7 +712,7 @@ export class StreamsClient {
]);

if (!access.write) {
throw new SecurityException(`Cannot delete stream, insufficient privileges`);
throw new SecurityError(`Cannot delete stream, insufficient privileges`);
}

if (!definition) {
Expand All @@ -720,7 +721,7 @@ export class StreamsClient {

const parentId = getParentId(name);
if (isWiredStreamDefinition(definition) && !parentId) {
throw new MalformedStreamId('Cannot delete root stream');
throw new MalformedStreamIdError('Cannot delete root stream');
}

await this.deleteStreamFromDefinition(definition);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { StatusError } from './status_error';

export class ComponentTemplateNotFoundError extends StatusError {
constructor(message: string) {
super(message, 404);
this.name = 'ComponentTemplateNotFoundError';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { StatusError } from './status_error';

export class DefinitionIdInvalidError extends StatusError {
constructor(message: string) {
super(message, 400);
this.name = 'DefinitionIdInvalidError';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { StatusError } from './status_error';

export class DefinitionNotFoundError extends StatusError {
constructor(message: string) {
super(message, 404);
this.name = 'DefinitionNotFoundError';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { StatusError } from './status_error';

export class DetectedMappingFailureError extends StatusError {
constructor(message: string) {
super(message, 400);
this.name = 'DetectedMappingFailureError';
}
}

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
* 2.0.
*/

export class DetectedMappingFailure extends Error {
import { StatusError } from './status_error';

export class MalformedChildrenError extends StatusError {
constructor(message: string) {
super(message);
this.name = 'DetectedMappingFailure';
super(message, 400);
this.name = 'MalformedChildrenError';
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
* 2.0.
*/

export class DefinitionIdInvalid extends Error {
import { StatusError } from './status_error';

export class MalformedFieldsError extends StatusError {
constructor(message: string) {
super(message);
this.name = 'DefinitionIdInvalid';
super(message, 400);
this.name = 'MalformedFieldsError';
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
* 2.0.
*/

export class DefinitionNotFound extends Error {
import { StatusError } from './status_error';

export class MalformedStreamError extends StatusError {
constructor(message: string) {
super(message);
this.name = 'DefinitionNotFound';
super(message, 400);
this.name = 'MalformedStreamError';
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { StatusError } from './status_error';

export class MalformedStreamIdError extends StatusError {
constructor(message: string) {
super(message, 400);
this.name = 'MalformedStreamIdError';
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { StatusError } from './status_error';

export class NonAdditiveProcessorError extends StatusError {
constructor(message: string) {
super(message, 400);
this.name = 'NonAdditiveProcessorError';
}
}
Loading

0 comments on commit c33ecb9

Please sign in to comment.