Skip to content

Commit aa6fd40

Browse files
authored
ehancement(datadog_metrics sink, datadog_logs sink): improve retry behavior code quality (#19450)
* fix(datadog_metrics sink, datadog_logs sink): improve retry behavior * revert behavioral changes * OPW-245 * fmt
1 parent 81d22b3 commit aa6fd40

File tree

2 files changed

+37
-39
lines changed

2 files changed

+37
-39
lines changed

src/sinks/datadog/metrics/service.rs

Lines changed: 5 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::sync::Arc;
22
use std::task::{Context, Poll};
33

4-
use bytes::{Buf, Bytes};
4+
use bytes::Bytes;
55
use futures::future::BoxFuture;
66
use http::{
77
header::{HeaderValue, CONTENT_ENCODING, CONTENT_TYPE},
@@ -15,9 +15,9 @@ use vector_lib::request_metadata::{GroupedCountByteSize, MetaDescriptive, Reques
1515
use vector_lib::stream::DriverResponse;
1616

1717
use crate::{
18-
http::{BuildRequestSnafu, CallRequestSnafu, HttpClient},
18+
http::{BuildRequestSnafu, HttpClient},
1919
sinks::datadog::DatadogApiError,
20-
sinks::util::retries::{RetryAction, RetryLogic},
20+
sinks::util::retries::RetryLogic,
2121
};
2222

2323
/// Retry logic specific to the Datadog metrics endpoints.
@@ -31,23 +31,6 @@ impl RetryLogic for DatadogMetricsRetryLogic {
3131
fn is_retriable_error(&self, error: &Self::Error) -> bool {
3232
error.is_retriable()
3333
}
34-
35-
fn should_retry_response(&self, response: &Self::Response) -> RetryAction {
36-
let status = response.status_code;
37-
38-
match status {
39-
StatusCode::FORBIDDEN => RetryAction::Retry("forbidden".into()),
40-
StatusCode::TOO_MANY_REQUESTS => RetryAction::Retry("too many requests".into()),
41-
StatusCode::NOT_IMPLEMENTED => {
42-
RetryAction::DontRetry("endpoint not implemented".into())
43-
}
44-
_ if status.is_server_error() => RetryAction::Retry(
45-
format!("{}: {}", status, String::from_utf8_lossy(&response.body)).into(),
46-
),
47-
_ if status.is_success() => RetryAction::Successful,
48-
_ => RetryAction::DontRetry(format!("response status: {}", status).into()),
49-
}
50-
}
5134
}
5235

5336
/// Generalized request for sending metrics to the Datadog metrics endpoints.
@@ -121,7 +104,6 @@ impl MetaDescriptive for DatadogMetricsRequest {
121104
#[derive(Debug)]
122105
pub struct DatadogMetricsResponse {
123106
status_code: StatusCode,
124-
body: Bytes,
125107
request_metadata: RequestMetadata,
126108
}
127109

@@ -189,17 +171,10 @@ impl Service<DatadogMetricsRequest> for DatadogMetricsService {
189171
.map_err(|error| DatadogApiError::HttpError { error })?;
190172

191173
let result = client.send(request).await;
192-
let (parts, body) = DatadogApiError::from_result(result)?.into_parts();
193-
194-
let mut body = hyper::body::aggregate(body)
195-
.await
196-
.context(CallRequestSnafu)
197-
.map_err(|error| DatadogApiError::HttpError { error })?;
198-
let body = body.copy_to_bytes(body.remaining());
174+
let result = DatadogApiError::from_result(result)?;
199175

200176
Ok(DatadogMetricsResponse {
201-
status_code: parts.status,
202-
body,
177+
status_code: result.status(),
203178
request_metadata,
204179
})
205180
})

src/sinks/datadog/mod.rs

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -176,16 +176,24 @@ fn get_api_validate_endpoint(endpoint: Option<&String>, site: &str) -> crate::Re
176176

177177
#[derive(Debug, Snafu)]
178178
pub enum DatadogApiError {
179-
#[snafu(display("Server responded with an error."))]
180-
ServerError,
181179
#[snafu(display("Failed to make HTTP(S) request: {}", error))]
182180
HttpError { error: HttpError },
183-
#[snafu(display("Client sent a payload that is too large."))]
184-
PayloadTooLarge,
185181
#[snafu(display("Client request was not valid for unknown reasons."))]
186182
BadRequest,
183+
#[snafu(display("Client request was unauthorized."))]
184+
Unauthorized,
187185
#[snafu(display("Client request was forbidden."))]
188186
Forbidden,
187+
#[snafu(display("Client request timed out."))]
188+
RequestTimeout,
189+
#[snafu(display("Client sent a payload that is too large."))]
190+
PayloadTooLarge,
191+
#[snafu(display("Client sent too many requests (rate limiting)."))]
192+
TooManyRequests,
193+
#[snafu(display("Client request was invalid."))]
194+
ClientError,
195+
#[snafu(display("Server responded with an error."))]
196+
ServerError,
189197
}
190198

191199
impl DatadogApiError {
@@ -204,14 +212,24 @@ impl DatadogApiError {
204212
// 202: Accepted (v2)
205213
// 400: Bad request (likely an issue in the payload
206214
// formatting)
215+
// 401: Unauthorized (likely a missing API Key))
207216
// 403: Permission issue (likely using an invalid API Key)
217+
// 408: Request Timeout, request should be retried after some
208218
// 413: Payload too large (batch is above 5MB uncompressed)
209-
// 5xx: Internal error, request should be retried after some
210-
// time
219+
// 429: Too Many Requests, request should be retried after some time
220+
// 500: Internal Server Error, the server encountered an unexpected condition
221+
// that prevented it from fulfilling the request, request should be
222+
// retried after some time
223+
// 503: Service Unavailable, the server is not ready to handle the request
224+
// probably because it is overloaded, request should be retried after some time
225+
s if s.is_success() => Ok(response),
211226
StatusCode::BAD_REQUEST => Err(DatadogApiError::BadRequest),
227+
StatusCode::UNAUTHORIZED => Err(DatadogApiError::Unauthorized),
212228
StatusCode::FORBIDDEN => Err(DatadogApiError::Forbidden),
213-
StatusCode::OK | StatusCode::ACCEPTED => Ok(response),
229+
StatusCode::REQUEST_TIMEOUT => Err(DatadogApiError::RequestTimeout),
214230
StatusCode::PAYLOAD_TOO_LARGE => Err(DatadogApiError::PayloadTooLarge),
231+
StatusCode::TOO_MANY_REQUESTS => Err(DatadogApiError::TooManyRequests),
232+
s if s.is_client_error() => Err(DatadogApiError::ClientError),
215233
_ => Err(DatadogApiError::ServerError),
216234
}
217235
}
@@ -222,14 +240,19 @@ impl DatadogApiError {
222240
pub const fn is_retriable(&self) -> bool {
223241
match self {
224242
// This retry logic will be expanded further, but specifically retrying unauthorized
225-
// requests and lower level HttpErrorsfor now.
243+
// requests and lower level HttpErrors for now.
226244
// I verified using `curl` that `403` is the respose code for this.
227245
//
228246
// https://github.com/vectordotdev/vector/issues/10870
229247
// https://github.com/vectordotdev/vector/issues/12220
230248
DatadogApiError::HttpError { error } => error.is_retriable(),
231249
DatadogApiError::BadRequest | DatadogApiError::PayloadTooLarge => false,
232-
DatadogApiError::ServerError | DatadogApiError::Forbidden => true,
250+
DatadogApiError::ServerError
251+
| DatadogApiError::ClientError
252+
| DatadogApiError::Unauthorized
253+
| DatadogApiError::Forbidden
254+
| DatadogApiError::RequestTimeout
255+
| DatadogApiError::TooManyRequests => true,
233256
}
234257
}
235258
}

0 commit comments

Comments
 (0)