Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exp/api: Add experimental exp module; Add remote API with write client and handler. #1658

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

bwplotka
Copy link
Member

@bwplotka bwplotka commented Oct 21, 2024

This proposes Remote Write API directly in client_golang main module. No dependency added (some new small packages are added under the internal vendored code).

Old attempt: #1656

Both API (client) and handler supports 1.0 and 2.0 Proto messages, but I intentionally only host writev2.Request generated Go code as we plan to deprecate 1.0 and users should use writev2 now. Both API and handler also will work against protobuf message types that use custom generators e.g. with gogoproto or anything else.

TODO:

  • Add examples, documentation
  • Open PR that uses it on Prometheus

Signed-off-by: bwplotka <bwplotka@gmail.com>
}

// DesymbolizeLabels decodes label references, with given symbols to labels.
func DesymbolizeLabels(labelRefs []uint32, symbols, buf []string) []string {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I just ran into DesymbolizeLabels not being public in prometheus codebase here open-telemetry/opentelemetry-collector-contrib#35751 (comment)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you use labels.Labels or some other struct (for labels in Otel)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


// NewRemoteWriteHandler returns HTTP handler that receives Remote Write 2.0
// protocol https://prometheus.io/docs/specs/remote_write_spec_2_0/.
func NewRemoteWriteHandler(logger *slog.Logger, store writeStorage) http.Handler {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's bit inconsistent with NewRemoteAPI in terms of options, let's unify one to one or another 🤔

// WriteProtoFullName represents the fully qualified name of the protobuf message
// to use in Remote write 1.0 and 2.0 protocols.
// See https://prometheus.io/docs/specs/remote_write_spec_2_0/#protocol.
type WriteProtoFullName protoreflect.FullName
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Change to string, bit too hard to use

const (
// WriteProtoFullNameV1 represents the `prometheus.WriteRequest` protobuf
// message introduced in the https://prometheus.io/docs/specs/remote_write_spec/.
// DEPRECATED: Use WriteProtoFullNameV2 instead.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For receivers who wants to support it for longer, deprecation might cause some friction, but maybe worth it? 🤔

@bwplotka
Copy link
Member Author

Example use of handler in sink project: https://github.com/bwplotka/sink/blob/main/go/sink/main.go#L51

Copy link
Member

@cstyan cstyan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both API (client) and handler supports 1.0 and 2.0 Proto messages, but I intentionally only host writev2.Request generated Go code as we plan to deprecate 1.0 and users should use writev2 now. Both API and handler also will work against protobuf message types that use custom generators e.g. with gogoproto or anything else.

Given that the spec is only just finalized recently and (afaik) still no receivers like Mimir support v2, we should include v1 request code imo.

}

func (r *API) attemptWrite(ctx context.Context, compr Compression, proto WriteProtoFullName, payload []byte, attempt int) (WriteResponseStats, error) {
u := r.client.URL("api/v1/write", nil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be configurable imo, not all remote write receivers use the same endpoint

Comment on lines 244 to 247
rs, err := parseWriteResponseStats(resp)
if err != nil {
r.opts.logger.Warn("parsing rw write statistics failed; partial or no stats", "err", err)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we only try to get write response stats/log a warning if the request type was v2?

return &handler{logger: logger, store: store}
}

func parseProtoMsg(contentType string) (WriteProtoFullName, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a comment with an example of what the string might look like would be useful for people who haven't read or fully understood the v2 spec

Comment on lines 1 to 7
// Copyright (c) The EfficientGo Authors.
// Licensed under the Apache License 2.0.

// Initially copied from Cortex project.

// Package backoff implements backoff timers which increases wait time on every retry, incredibly useful
// in distributed system timeout functionalities.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we be pulling this via go.mod as opposed to having copies of the files? or is this the way we include dependencies for the client library?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We try as hard as we can to avoid dependencies. I'm in favor of copying other libraries code (respecting copyright) if possible

}

// Read the request body.
body, err := io.ReadAll(r.Body)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be out of scope for Prometheus but it would be nice to limit the request body size to protect the handler. https://github.com/cortexproject/cortex/blob/master/pkg/util/http.go#L180

return
}

decompressed, err := snappy.Decode(nil, body)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar here. It would be nice if we can check snappy decoded length and see if it exceeds the max size https://github.com/cortexproject/cortex/blob/master/pkg/util/http.go#L247

return &handler{logger: logger, store: store}
}

func parseProtoMsg(contentType string) (WriteProtoFullName, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this something we can expose?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to exposing this

h.logger.Error("Error decompressing remote write request", "err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can expose a hook for the logic of decoding and decompressing part so that downstream project can just use their existing code without many adjustment.

func decodeRequestBody(r *http.Request) (body []byte, err error)

The main logic that can benefit us is handling v1 and v2 proto and response headers. It would be nice to still reuse the other code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup some requestValidatorDecoder interface that can be overridden downstream would be cool to have.

}

req.Header.Add("Content-Encoding", string(compr))
req.Header.Set("Content-Type", contentTypeHeader(proto))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe there should be an option to specify custom headers here too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can do that with tripperware though 🤔


err = fmt.Errorf("server returned HTTP status %s: %s", resp.Status, body)
if resp.StatusCode/100 == 5 ||
(r.opts.retryOnRateLimit && resp.StatusCode == http.StatusTooManyRequests) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should retryOn429 be configurable here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure!

h.logger.Error("Error decompressing remote write request", "err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup some requestValidatorDecoder interface that can be overridden downstream would be cool to have.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe more of a future comment, but we can probably have some common/model codec here too, since we already have the types, like https://github.com/prometheus/prometheus/blob/main/prompb/io/prometheus/write/v2/codec.go, that would translate to common/model types. Would be good for downstream clients, that can choose to not import prometheus at all.

return &handler{logger: logger, store: store}
}

func parseProtoMsg(contentType string) (WriteProtoFullName, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to exposing this

code = http.StatusInternalServerError
}
if code/5 == 100 { // 5xx
h.logger.Error("Error while remote writing the v2 request", "err", storeErr.Error())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

store.Store can ingest both I think

Suggested change
h.logger.Error("Error while remote writing the v2 request", "err", storeErr.Error())
h.logger.Error("Error while storing the remote write request", "err", storeErr.Error())

* Address remaining feedback

Signed-off-by: Saswata Mukherjee <saswataminsta@yahoo.com>

* Make Write message type more flexible

Signed-off-by: Saswata Mukherjee <saswataminsta@yahoo.com>

---------

Signed-off-by: Saswata Mukherjee <saswataminsta@yahoo.com>
* Move remote write API to client_golang/exp

Signed-off-by: Saswata Mukherjee <saswataminsta@yahoo.com>

* Don't use api.Client structs, add options for middleware

Signed-off-by: Saswata Mukherjee <saswataminsta@yahoo.com>

* Fix reqBuf usage

Signed-off-by: Saswata Mukherjee <saswataminsta@yahoo.com>

* Fix url path

Signed-off-by: Saswata Mukherjee <saswataminsta@yahoo.com>

* Add separate mod file (and workspace file)

Signed-off-by: Saswata Mukherjee <saswataminsta@yahoo.com>

* Hook exp tests fmt; Test handler error case; Configure backoff

Signed-off-by: Saswata Mukherjee <saswataminsta@yahoo.com>

---------

Signed-off-by: Saswata Mukherjee <saswataminsta@yahoo.com>
@bwplotka bwplotka changed the title api: Add remote API with write client; add remote handler. exp/api: Add experimental exp module; Add remote API with write client and handler. Jan 31, 2025
Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
//
// It is not safe to use the returned API from multiple goroutines, create a
// separate *API for each goroutine.
func NewAPI(client *http.Client, baseURL string, opts ...APIOption) (*API, error) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

healthy default for client?

// will be used
// - If neither is supported, it will marshaled using generic google.golang.org/protobuf methods and
// error out on unknown scheme.
func (r *API) Write(ctx context.Context, msg any) (_ WriteResponseStats, err error) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be more future proof to allow setting content type here (with our constant).

// error out on unknown scheme.
func (r *API) Write(ctx context.Context, msg any) (_ WriteResponseStats, err error) {
// Reset the buffer.
r.reqBuf = r.reqBuf[:0]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is good for single thread, but people might want to use the same http client etc for multiple concurrent streams, so we probably can't do this. I would remove and in future add (without breaking variadic options for write, we could do it now too it's a common with WithWriteBuffer)


// NewRemoteWriteHandler returns HTTP handler that receives Remote Write 2.0
// protocol https://prometheus.io/docs/specs/remote_write_spec_2_0/.
func NewRemoteWriteHandler(store writeStorage, opts ...HandlerOption) http.Handler {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func NewRemoteWriteHandler(store writeStorage, opts ...HandlerOption) http.Handler {
func NewHandler(store writeStorage, opts ...HandlerOption) http.Handler {

// writeStorage represents the storage for RemoteWriteHandler.
// This interface is intentionally private due its experimental state.
type writeStorage interface {
Store(ctx context.Context, proto WriteProtoFullName, serializedRequest []byte) (_ WriteResponseStats, code int, _ error)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussion, should it by any and people provide a type for NewHandler? This feels tricky as user would need to give us a map of content type -> type and the exacty method to use for parsing, kind of sily.

However, if we don't do this, should we just give http.Request?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's focus on value we bring vs write your own ServeHTTP:

  • We write write response stats for you (limiting error surface) with correct success response code
  • We validate headers and respond accordingly
  • We decompress for you

After that we can't do much.

Disadventages of http.Request

  • user don't know what we validated, we can write down we validated proto full name, but should we give it in a separate parameter or double write with both param and header, or only header
  • should we remove proto and compression (encoding) headers from request? What if user want to have a metric that instruments what compression was used (then, they could use middleware).
  • We kind of have to put decompressed content into io.Reader of request to avoid duplicate data.
  • We have to give options to limit known ctypes to X.
Suggested change
Store(ctx context.Context, proto WriteProtoFullName, serializedRequest []byte) (_ WriteResponseStats, code int, _ error)
// Store stores remote write metrics encoded in the given WriteFullName content type. Provided http.Request contains the encoded bytes in the req.Body with all the HTTP information, except "Content-Type" header which is provided in a separate, validated ctype. Other headers might be trimmed,
// depending on the configured middlewares e.g. a default SnappyMiddleware trims "Content-Encoding".
//
// Store blocks an HTTP request and waits for WriteResponseStats to provide to the client.
Store(ctx context.Context, ctype WriteFullName, req *http.Request) (_ WriteResponseStats, code int, _ error)

Questions:

  • How to attach extra response headers e.g. Retry-After.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For questions, we might want to create more complex WriteResponse type that gives a way to return stats (force user ideally) and header. However, should we HTTP style input param?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Store(ctx context.Context, proto WriteProtoFullName, serializedRequest []byte) (_ WriteResponseStats, code int, _ error)
// Store stores remote write metrics encoded in the given WriteFullName content type. Provided http.Request contains the encoded bytes in the req.Body with all the HTTP information, except "Content-Type" header which is provided in a separate, validated ctype. Other headers might be trimmed,
// depending on the configured middlewares e.g. a default SnappyMiddleware trims "Content-Encoding".
//
// Store blocks an HTTP request and waits for WriteResponseStats to provide to the client.
Store(ctx context.Context, ctype WriteFullName, req *http.Request) (_ *WriteResponse, _ error)
type Stats struct {
	// Samples represents X-Prometheus-Remote-Write-Written-Samples
	Samples int
	// Histograms represents X-Prometheus-Remote-Write-Written-Histograms
	Histograms int
	// Exemplars represents X-Prometheus-Remote-Write-Written-Exemplars
	Exemplars int

	// Confirmed means we can trust those statistics from the point of view
	// of the PRW 2.0 spec. When parsed from headers, it means we got at least one
	// response header from the Receiver to confirm those numbers, meaning it must
	// be at least 2.0 Receiver. See ParseWriteResponseStats for details.
	confirmed bool
}

func StatsFromV2() Stats

func NewWriteResponse(s Stats) *WriteResponse {
    ....
}

func (r *WriteResponse) SetHeaders

func (r *WriteResponse) SetCode

@bwplotka bwplotka marked this pull request as ready for review February 3, 2025 10:50
}

var contentTypeHeaders = map[WriteProtoFullName]string{
WriteProtoFullNameV1: appProtoContentType, // Also application/x-protobuf;proto=prometheus.WriteRequest but simplified for compatibility with 1.x spec.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make it WriteV1ContentType

Copy link
Member Author

@bwplotka bwplotka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @saswatamcode for helping a lot. Let's get those last things to this branch + quick README and let's get it merged in the experimental module!

}

// Read the request body.
bodyBytes, err := io.ReadAll(r.Body)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's an opportunity to use a sync.Pool here.

}

// Read the already decompressed body
body, err := io.ReadAll(r.Body)
Copy link

@GiedriusS GiedriusS Feb 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By using a bytes.Buffer instead of a bytes.Reader we could access the original bytes directly through Bytes() and avoid a copy/allocation here. 99% of users if not all will use the middleware that does the decompression 😄

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants