diff --git a/stackdriver/client.go b/stackdriver/client.go index fec0aa08..9b9aea6b 100644 --- a/stackdriver/client.go +++ b/stackdriver/client.go @@ -20,7 +20,6 @@ import ( "net" "net/url" "strconv" - "sync" "time" "go.opencensus.io/plugin/ocgrpc" @@ -178,73 +177,56 @@ func (c *Client) Store(req *monitoring.CreateTimeSeriesRequest) error { service := monitoring.NewMetricServiceClient(conn) - errors := make(chan error, len(tss)/MaxTimeseriesesPerRequest+1) - var wg sync.WaitGroup - for i := 0; i < len(tss); i += MaxTimeseriesesPerRequest { - end := i + MaxTimeseriesesPerRequest - if end > len(tss) { - end = len(tss) + req_copy := &monitoring.CreateTimeSeriesRequest{ + Name: c.projectID, + TimeSeries: req.TimeSeries, + } + _, err = service.CreateTimeSeries(ctx, req_copy) + if err == nil { + // The response is empty if all points were successfully written. + stats.RecordWithTags(ctx, + []tag.Mutator{tag.Upsert(StatusTag, "0")}, + PointCount.M(int64(len(tss)))) + } else { + level.Debug(c.logger).Log( + "msg", "Partial failure calling CreateTimeSeries", + "err", err) + status, ok := status.FromError(err) + if !ok { + level.Warn(c.logger).Log("msg", "Unexpected error message type from Monitoring API", "err", err) + return err } - wg.Add(1) - go func(begin int, end int) { - defer wg.Done() - req_copy := &monitoring.CreateTimeSeriesRequest{ - Name: c.projectID, - TimeSeries: req.TimeSeries[begin:end], - } - _, err := service.CreateTimeSeries(ctx, req_copy) - if err == nil { - // The response is empty if all points were successfully written. + for _, details := range status.Details() { + if summary, ok := details.(*monitoring.CreateTimeSeriesSummary); ok { + level.Debug(c.logger).Log("summary", summary) stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(StatusTag, "0")}, - PointCount.M(int64(end-begin))) - } else { - level.Debug(c.logger).Log( - "msg", "Partial failure calling CreateTimeSeries", - "err", err) - status, ok := status.FromError(err) - if !ok { - level.Warn(c.logger).Log("msg", "Unexpected error message type from Monitoring API", "err", err) - errors <- err - return - } - for _, details := range status.Details() { - if summary, ok := details.(*monitoring.CreateTimeSeriesSummary); ok { - level.Debug(c.logger).Log("summary", summary) - stats.RecordWithTags(ctx, - []tag.Mutator{tag.Upsert(StatusTag, "0")}, - PointCount.M(int64(summary.SuccessPointCount))) - for _, e := range summary.Errors { - stats.RecordWithTags(ctx, - []tag.Mutator{tag.Upsert(StatusTag, fmt.Sprint(uint32(e.Status.Code)))}, - PointCount.M(int64(e.PointCount))) - } - } - } - switch status.Code() { - // codes.DeadlineExceeded: - // It is safe to retry - // google.monitoring.v3.MetricService.CreateTimeSeries - // requests with backoff because QueueManager - // enforces in-order writes on a time series, which - // is a requirement for Stackdriver monitoring. - // - // codes.Unavailable: - // The condition is most likely transient. The request can - // be retried with backoff. - case codes.DeadlineExceeded, codes.Unavailable: - errors <- recoverableError{err} - default: - errors <- err + PointCount.M(int64(summary.SuccessPointCount))) + for _, e := range summary.Errors { + stats.RecordWithTags(ctx, + []tag.Mutator{tag.Upsert(StatusTag, fmt.Sprint(uint32(e.Status.Code)))}, + PointCount.M(int64(e.PointCount))) } } - }(i, end) - } - wg.Wait() - close(errors) - if err, ok := <-errors; ok { - return err + } + switch status.Code() { + // codes.DeadlineExceeded: + // It is safe to retry + // google.monitoring.v3.MetricService.CreateTimeSeries + // requests with backoff because QueueManager + // enforces in-order writes on a time series, which + // is a requirement for Stackdriver monitoring. + // + // codes.Unavailable: + // The condition is most likely transient. The request can + // be retried with backoff. + case codes.DeadlineExceeded, codes.Unavailable: + return recoverableError{err} + default: + return err + } } + return nil } diff --git a/stackdriver/queue_manager.go b/stackdriver/queue_manager.go index 95b8d363..33e0bebe 100644 --- a/stackdriver/queue_manager.go +++ b/stackdriver/queue_manager.go @@ -509,7 +509,13 @@ func (s *shardCollection) runShard(i int) { func (s *shardCollection) sendSamples(client StorageClient, samples []*monitoring_pb.TimeSeries) { begin := time.Now() - s.sendSamplesWithBackoff(client, samples) + for i := 0; i < len(samples); i += MaxTimeseriesesPerRequest { + end := i + MaxTimeseriesesPerRequest + if end > len(samples) { + end = len(samples) + } + s.sendSamplesWithBackoff(client, samples[i:end]) + } // These counters are used to calculate the dynamic sharding, and as such // should be maintained irrespective of success or failure.