diff --git a/.changeset/small-rivers-taste.md b/.changeset/small-rivers-taste.md new file mode 100644 index 000000000..150c18a81 --- /dev/null +++ b/.changeset/small-rivers-taste.md @@ -0,0 +1,5 @@ +--- +"open-next": patch +--- + +Fix some cache issue diff --git a/packages/open-next/src/adapters/cache.ts b/packages/open-next/src/adapters/cache.ts index 5e6f4582e..f814a2d14 100644 --- a/packages/open-next/src/adapters/cache.ts +++ b/packages/open-next/src/adapters/cache.ts @@ -1,3 +1,5 @@ +import { DetachedPromise } from "utils/promise.js"; + import { IncrementalCache } from "../cache/incremental/types.js"; import { TagCache } from "../cache/tag/types.js"; import { isBinaryContentType } from "./binary.js"; @@ -144,7 +146,8 @@ export default class S3Cache { value: value, } as CacheHandlerValue; } catch (e) { - error("Failed to get fetch cache", e); + // We can usually ignore errors here as they are usually due to cache not being found + debug("Failed to get fetch cache", e); return null; } } @@ -166,7 +169,7 @@ export default class S3Cache { // If some tags are stale we need to force revalidation return null; } - const requestId = globalThis.__als.getStore() ?? ""; + const requestId = globalThis.__als.getStore()?.requestId ?? ""; globalThis.lastModified[requestId] = _lastModified; if (cacheData?.type === "route") { return { @@ -208,7 +211,8 @@ export default class S3Cache { return null; } } catch (e) { - error("Failed to get body cache", e); + // We can usually ignore errors here as they are usually due to cache not being found + debug("Failed to get body cache", e); return null; } } @@ -221,82 +225,94 @@ export default class S3Cache { if (globalThis.disableIncrementalCache) { return; } - if (data?.kind === "ROUTE") { - const { body, status, headers } = data; - await globalThis.incrementalCache.set( - key, - { - type: "route", - body: body.toString( - isBinaryContentType(String(headers["content-type"])) - ? "base64" - : "utf8", - ), - meta: { - status, - headers, - }, - }, - false, - ); - } else if (data?.kind === "PAGE") { - const { html, pageData } = data; - const isAppPath = typeof pageData === "string"; - if (isAppPath) { - globalThis.incrementalCache.set( + const detachedPromise = new DetachedPromise(); + globalThis.__als.getStore()?.pendingPromises.push(detachedPromise); + try { + if (data?.kind === "ROUTE") { + const { body, status, headers } = data; + await globalThis.incrementalCache.set( key, { - type: "app", - html, - rsc: pageData, + type: "route", + body: body.toString( + isBinaryContentType(String(headers["content-type"])) + ? "base64" + : "utf8", + ), + meta: { + status, + headers, + }, }, false, ); - } else { - globalThis.incrementalCache.set( + } else if (data?.kind === "PAGE") { + const { html, pageData } = data; + const isAppPath = typeof pageData === "string"; + if (isAppPath) { + globalThis.incrementalCache.set( + key, + { + type: "app", + html, + rsc: pageData, + }, + false, + ); + } else { + globalThis.incrementalCache.set( + key, + { + type: "page", + html, + json: pageData, + }, + false, + ); + } + } else if (data?.kind === "FETCH") { + await globalThis.incrementalCache.set(key, data, true); + } else if (data?.kind === "REDIRECT") { + await globalThis.incrementalCache.set( key, { - type: "page", - html, - json: pageData, + type: "redirect", + props: data.props, }, false, ); + } else if (data === null || data === undefined) { + await globalThis.incrementalCache.delete(key); } - } else if (data?.kind === "FETCH") { - await globalThis.incrementalCache.set(key, data, true); - } else if (data?.kind === "REDIRECT") { - await globalThis.incrementalCache.set( - key, - { - type: "redirect", - props: data.props, - }, - false, - ); - } else if (data === null || data === undefined) { - await globalThis.incrementalCache.delete(key); - } - // Write derivedTags to dynamodb - // If we use an in house version of getDerivedTags in build we should use it here instead of next's one - const derivedTags: string[] = - data?.kind === "FETCH" - ? ctx?.tags ?? data?.data?.tags ?? [] // before version 14 next.js used data?.data?.tags so we keep it for backward compatibility - : data?.kind === "PAGE" - ? data.headers?.["x-next-cache-tags"]?.split(",") ?? [] - : []; - debug("derivedTags", derivedTags); - // Get all tags stored in dynamodb for the given key - // If any of the derived tags are not stored in dynamodb for the given key, write them - const storedTags = await globalThis.tagCache.getByPath(key); - const tagsToWrite = derivedTags.filter((tag) => !storedTags.includes(tag)); - if (tagsToWrite.length > 0) { - await globalThis.tagCache.writeTags( - tagsToWrite.map((tag) => ({ - path: key, - tag: tag, - })), + // Write derivedTags to dynamodb + // If we use an in house version of getDerivedTags in build we should use it here instead of next's one + const derivedTags: string[] = + data?.kind === "FETCH" + ? ctx?.tags ?? data?.data?.tags ?? [] // before version 14 next.js used data?.data?.tags so we keep it for backward compatibility + : data?.kind === "PAGE" + ? data.headers?.["x-next-cache-tags"]?.split(",") ?? [] + : []; + debug("derivedTags", derivedTags); + // Get all tags stored in dynamodb for the given key + // If any of the derived tags are not stored in dynamodb for the given key, write them + const storedTags = await globalThis.tagCache.getByPath(key); + const tagsToWrite = derivedTags.filter( + (tag) => !storedTags.includes(tag), ); + if (tagsToWrite.length > 0) { + await globalThis.tagCache.writeTags( + tagsToWrite.map((tag) => ({ + path: key, + tag: tag, + })), + ); + } + debug("Finished setting cache"); + } catch (e) { + error("Failed to set cache", e); + } finally { + // We need to resolve the promise even if there was an error + detachedPromise.resolve(); } } @@ -304,16 +320,20 @@ export default class S3Cache { if (globalThis.disableDynamoDBCache || globalThis.disableIncrementalCache) { return; } - debug("revalidateTag", tag); - // Find all keys with the given tag - const paths = await globalThis.tagCache.getByTag(tag); - debug("Items", paths); - // Update all keys with the given tag with revalidatedAt set to now - await globalThis.tagCache.writeTags( - paths?.map((path) => ({ - path: path, - tag: tag, - })) ?? [], - ); + try { + debug("revalidateTag", tag); + // Find all keys with the given tag + const paths = await globalThis.tagCache.getByTag(tag); + debug("Items", paths); + // Update all keys with the given tag with revalidatedAt set to now + await globalThis.tagCache.writeTags( + paths?.map((path) => ({ + path: path, + tag: tag, + })) ?? [], + ); + } catch (e) { + error("Failed to revalidate tag", e); + } } } diff --git a/packages/open-next/src/adapters/server-adapter.ts b/packages/open-next/src/adapters/server-adapter.ts index 3fa6df50a..34e9b616a 100644 --- a/packages/open-next/src/adapters/server-adapter.ts +++ b/packages/open-next/src/adapters/server-adapter.ts @@ -11,6 +11,12 @@ setNodeEnv(); setBuildIdEnv(); setNextjsServerWorkingDirectory(); +// Because next is messing with fetch, we have to make sure that we use an untouched version of fetch +declare global { + var internalFetch: typeof fetch; +} +globalThis.internalFetch = fetch; + ///////////// // Handler // ///////////// diff --git a/packages/open-next/src/cache/tag/dynamodb-lite.ts b/packages/open-next/src/cache/tag/dynamodb-lite.ts index 30cdd844e..f89f5a72e 100644 --- a/packages/open-next/src/cache/tag/dynamodb-lite.ts +++ b/packages/open-next/src/cache/tag/dynamodb-lite.ts @@ -179,7 +179,7 @@ const tagCache: TagCache = { for (const paramsChunk of toInsert) { await Promise.all( paramsChunk.map(async (params) => { - const response = await awsClient.fetch( + const response = await awsFetch( `https://dynamodb.${CACHE_BUCKET_REGION}.amazonaws.com`, { method: "POST", diff --git a/packages/open-next/src/core/createMainHandler.ts b/packages/open-next/src/core/createMainHandler.ts index c8bec9ef6..6d171b7ff 100644 --- a/packages/open-next/src/core/createMainHandler.ts +++ b/packages/open-next/src/core/createMainHandler.ts @@ -1,6 +1,7 @@ import type { AsyncLocalStorage } from "node:async_hooks"; import type { OpenNextConfig } from "types/open-next"; +import { DetachedPromise } from "utils/promise"; import { debug } from "../adapters/logger"; import { generateUniqueId } from "../adapters/util"; @@ -20,7 +21,10 @@ declare global { var incrementalCache: IncrementalCache; var fnName: string | undefined; var serverId: string; - var __als: AsyncLocalStorage; + var __als: AsyncLocalStorage<{ + requestId: string; + pendingPromises: DetachedPromise[]; + }>; } export async function createMainHandler() { diff --git a/packages/open-next/src/core/requestHandler.ts b/packages/open-next/src/core/requestHandler.ts index f48912831..b61c3508d 100644 --- a/packages/open-next/src/core/requestHandler.ts +++ b/packages/open-next/src/core/requestHandler.ts @@ -6,6 +6,7 @@ import { StreamCreator, } from "http/index.js"; import { InternalEvent, InternalResult } from "types/open-next"; +import { DetachedPromise } from "utils/promise"; import { debug, error, warn } from "../adapters/logger"; import { convertRes, createServerResponse, proxyRequest } from "./routing/util"; @@ -13,7 +14,10 @@ import routingHandler, { MiddlewareOutputEvent } from "./routingHandler"; import { requestHandler, setNextjsPrebundledReact } from "./util"; // This is used to identify requests in the cache -globalThis.__als = new AsyncLocalStorage(); +globalThis.__als = new AsyncLocalStorage<{ + requestId: string; + pendingPromises: DetachedPromise[]; +}>(); export async function openNextHandler( internalEvent: InternalEvent, @@ -81,37 +85,46 @@ export async function openNextHandler( remoteAddress: preprocessedEvent.remoteAddress, }; const requestId = Math.random().toString(36); - const internalResult = await globalThis.__als.run(requestId, async () => { - const preprocessedResult = preprocessResult as MiddlewareOutputEvent; - const req = new IncomingMessage(reqProps); - const res = createServerResponse( - preprocessedEvent, - overwrittenResponseHeaders, - responseStreaming, - ); - - await processRequest( - req, - res, - preprocessedEvent, - preprocessedResult.isExternalRewrite, - ); - - const { statusCode, headers, isBase64Encoded, body } = convertRes(res); - - const internalResult = { - type: internalEvent.type, - statusCode, - headers, - body, - isBase64Encoded, - }; - - // reset lastModified. We need to do this to avoid memory leaks - delete globalThis.lastModified[requestId]; - - return internalResult; - }); + const pendingPromises: DetachedPromise[] = []; + const internalResult = await globalThis.__als.run( + { requestId, pendingPromises }, + async () => { + const preprocessedResult = preprocessResult as MiddlewareOutputEvent; + const req = new IncomingMessage(reqProps); + const res = createServerResponse( + preprocessedEvent, + overwrittenResponseHeaders, + responseStreaming, + ); + + await processRequest( + req, + res, + preprocessedEvent, + preprocessedResult.isExternalRewrite, + ); + + const { statusCode, headers, isBase64Encoded, body } = convertRes(res); + + const internalResult = { + type: internalEvent.type, + statusCode, + headers, + body, + isBase64Encoded, + }; + + // reset lastModified. We need to do this to avoid memory leaks + delete globalThis.lastModified[requestId]; + + // Wait for all promises to resolve + // We are not catching errors here, because they are catched before + // This may need to change in the future + await Promise.all(pendingPromises.map((p) => p.promise)); + + return internalResult; + }, + ); return internalResult; } } diff --git a/packages/open-next/src/core/routing/util.ts b/packages/open-next/src/core/routing/util.ts index ba33fb77a..292b7ab2e 100644 --- a/packages/open-next/src/core/routing/util.ts +++ b/packages/open-next/src/core/routing/util.ts @@ -7,6 +7,7 @@ import { OpenNextNodeResponse } from "http/openNextResponse.js"; import { parseHeaders } from "http/util.js"; import type { MiddlewareManifest } from "types/next-types"; import { InternalEvent } from "types/open-next.js"; +import { DetachedPromise } from "utils/promise.js"; import { isBinaryContentType } from "../../adapters/binary.js"; import { debug, error } from "../../adapters/logger.js"; @@ -323,7 +324,7 @@ export function addOpenNextHeader(headers: OutgoingHttpHeaders) { headers["X-OpenNext"] = "1"; if (globalThis.openNextDebug) { headers["X-OpenNext-Version"] = globalThis.openNextVersion; - headers["X-OpenNext-RequestId"] = globalThis.__als.getStore(); + headers["X-OpenNext-RequestId"] = globalThis.__als.getStore()?.requestId; } } @@ -355,6 +356,11 @@ export async function revalidateIfRequired( : internalMeta?._nextRewroteUrl : rawPath; + // We want to ensure that the revalidation is done in the background + // But we should still wait for the queue send to be successful + const detachedPromise = new DetachedPromise(); + globalThis.__als.getStore()?.pendingPromises.push(detachedPromise); + // We need to pass etag to the revalidation queue to try to bypass the default 5 min deduplication window. // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagededuplicationid-property.html // If you need to have a revalidation happen more frequently than 5 minutes, @@ -363,7 +369,7 @@ export async function revalidateIfRequired( try { const hash = (str: string) => crypto.createHash("md5").update(str).digest("hex"); - const requestId = globalThis.__als.getStore() ?? ""; + const requestId = globalThis.__als.getStore()?.requestId ?? ""; const lastModified = globalThis.lastModified[requestId] > 0 @@ -380,8 +386,10 @@ export async function revalidateIfRequired( MessageGroupId: generateMessageGroupId(rawPath), }); } catch (e) { - debug(`Failed to revalidate stale page ${rawPath}`); - debug(e); + error(`Failed to revalidate stale page ${rawPath}`, e); + } finally { + // We don't care if it fails or not, we don't want to block the request + detachedPromise.resolve(); } } } @@ -440,7 +448,7 @@ export function fixISRHeaders(headers: OutgoingHttpHeaders) { "private, no-cache, no-store, max-age=0, must-revalidate"; return; } - const requestId = globalThis.__als.getStore() ?? ""; + const requestId = globalThis.__als.getStore()?.requestId ?? ""; const _lastModified = globalThis.lastModified[requestId] ?? 0; if (headers[CommonHeaders.NEXT_CACHE] === "HIT" && _lastModified > 0) { // calculate age diff --git a/packages/open-next/src/utils/fetch.ts b/packages/open-next/src/utils/fetch.ts index ffad0f30e..8378fafbf 100644 --- a/packages/open-next/src/utils/fetch.ts +++ b/packages/open-next/src/utils/fetch.ts @@ -8,14 +8,17 @@ export function customFetchClient(client: AwsClient) { signed.headers.forEach((value, key) => { headers[key] = value; }); - return fetch(signed.url, { + const response = await globalThis.internalFetch(signed.url, { method: signed.method, headers, body: init.body, - // @ts-expect-error - next: { - internal: true, - }, }); + /** + * Response body must be consumed to avoid socket error. + * This is necessary otherwise we get some error : SocketError: other side closed + * https://github.com/nodejs/undici/issues/583#issuecomment-855384858 + */ + const clonedResponse = response.clone(); + return clonedResponse; }; } diff --git a/packages/open-next/src/utils/promise.ts b/packages/open-next/src/utils/promise.ts new file mode 100644 index 000000000..dffb609f3 --- /dev/null +++ b/packages/open-next/src/utils/promise.ts @@ -0,0 +1,27 @@ +/** + * A `Promise.withResolvers` implementation that exposes the `resolve` and + * `reject` functions on a `Promise`. + * Copied from next https://github.com/vercel/next.js/blob/canary/packages/next/src/lib/detached-promise.ts + * @see https://tc39.es/proposal-promise-with-resolvers/ + */ +export class DetachedPromise { + public readonly resolve: (value: T | PromiseLike) => void; + public readonly reject: (reason: any) => void; + public readonly promise: Promise; + + constructor() { + let resolve: (value: T | PromiseLike) => void; + let reject: (reason: any) => void; + + // Create the promise and assign the resolvers to the object. + this.promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + + // We know that resolvers is defined because the Promise constructor runs + // synchronously. + this.resolve = resolve!; + this.reject = reject!; + } +}