Skip to content

Distributed error handling #40

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 51 additions & 20 deletions distribution.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,10 @@ func distributedFetch[V, T any](c *Client[T], key string, fetchFn FetchFn[V]) Fe

return func(ctx context.Context) (V, error) {
stale, hasStale := *new(V), false
bytes, ok := c.distributedStorage.Get(ctx, key)
if ok {
c.reportDistributedCacheHit(true)
bytes, existsInDistributedStorage := c.distributedStorage.Get(ctx, key)
c.reportDistributedCacheHit(existsInDistributedStorage)

if existsInDistributedStorage {
record, unmarshalErr := unmarshalRecord[V](bytes, key, c.log)
if unmarshalErr != nil {
return record.Value, unmarshalErr
Expand All @@ -116,8 +117,16 @@ func distributedFetch[V, T any](c *Client[T], key string, fetchFn FetchFn[V]) Fe
stale, hasStale = record.Value, true
}

if !ok {
c.reportDistributedCacheHit(false)
// Before we call the fetchFn, we'll do an unblocking read to see if the
// context has been cancelled. If it has, we'll return a stale value if we
// have one available.
select {
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One could argue that it’s simpler to just perform an error check here, e.g. if ctx.Err() != nil, but I just find this approach more explicit. To me it's very clear that this code is trying to determine whether the work should proceed or not.

case <-ctx.Done():
if hasStale {
return stale, errors.Join(errOnlyDistributedRecords, ctx.Err())
}
return *(new(V)), ctx.Err()
default:
}

// If it's not fresh enough, we'll retrieve it from the source.
Expand Down Expand Up @@ -146,7 +155,7 @@ func distributedFetch[V, T any](c *Client[T], key string, fetchFn FetchFn[V]) Fe

if hasStale {
c.reportDistributedStaleFallback()
return stale, nil
return stale, errors.Join(errOnlyDistributedRecords, fetchErr)
}

return response, fetchErr
Expand Down Expand Up @@ -177,14 +186,14 @@ func distributedBatchFetch[V, T any](c *Client[T], keyFn KeyFn, fetchFn BatchFet
idsToRefresh := make([]string, 0, len(ids))
for _, id := range ids {
key := keyFn(id)
bytes, ok := distributedRecords[key]
if !ok {
c.reportDistributedCacheHit(false)
bytes, existsInDistributedStorage := distributedRecords[key]
c.reportDistributedCacheHit(existsInDistributedStorage)

if !existsInDistributedStorage {
idsToRefresh = append(idsToRefresh, id)
continue
}

c.reportDistributedCacheHit(true)
record, unmarshalErr := unmarshalRecord[V](bytes, key, c.log)
if unmarshalErr != nil {
idsToRefresh = append(idsToRefresh, id)
Expand All @@ -194,29 +203,46 @@ func distributedBatchFetch[V, T any](c *Client[T], keyFn KeyFn, fetchFn BatchFet
// If early refreshes isn't enabled it means all records are fresh, otherwise we'll check the CreatedAt time.
if !c.distributedEarlyRefreshes || c.clock.Since(record.CreatedAt) < c.distributedRefreshAfterDuration {
// We never want to return missing records.
if !record.IsMissingRecord {
fresh[id] = record.Value
} else {
if record.IsMissingRecord {
c.reportDistributedMissingRecord()
continue
}

fresh[id] = record.Value
continue
}

idsToRefresh = append(idsToRefresh, id)
c.reportDistributedRefresh()

// We never want to return missing records.
if !record.IsMissingRecord {
stale[id] = record.Value
} else {
if record.IsMissingRecord {
c.reportDistributedMissingRecord()
continue
}
stale[id] = record.Value
}

if len(idsToRefresh) == 0 {
return fresh, nil
}

// Before we call the fetchFn, we'll do an unblocking read to see if the
// context has been cancelled. If it has, we'll return any potential
// records we got from the distributed storage.
select {
case <-ctx.Done():
maps.Copy(stale, fresh)

// If we didn't get any records from the distributed storage,
// we'll return the error from the fetch function as-is.
if len(stale) < 1 {
return stale, ctx.Err()
}
return stale, errors.Join(errOnlyDistributedRecords, ctx.Err())
default:
}

dataSourceResponses, err := fetchFn(ctx, idsToRefresh)
// In case of an error, we'll proceed with the ones we got from the distributed storage.
// NOTE: It's important that we return a specific error here, otherwise we'll potentially
Expand All @@ -227,17 +253,22 @@ func distributedBatchFetch[V, T any](c *Client[T], keyFn KeyFn, fetchFn BatchFet
c.reportDistributedStaleFallback()
}
maps.Copy(stale, fresh)
return stale, errOnlyDistributedRecords

// If we didn't get any records from the distributed storage,
// we'll return the error from the fetch function as-is.
if len(stale) < 1 {
return dataSourceResponses, err
}

return stale, errors.Join(errOnlyDistributedRecords, err)
}

// Next, we'll want to check if we should change any of the records to be missing or perform deletions.
recordsToWrite := make(map[string][]byte, len(dataSourceResponses))
keysToDelete := make([]string, 0, max(len(idsToRefresh)-len(dataSourceResponses), 0))
for _, id := range idsToRefresh {
key := keyFn(id)
response, ok := dataSourceResponses[id]

if ok {
if response, ok := dataSourceResponses[id]; ok {
if recordBytes, marshalErr := marshalRecord[V](response, c); marshalErr == nil {
recordsToWrite[key] = recordBytes
}
Expand Down
Loading