Skip to content

Fix some cache issue #444

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/small-rivers-taste.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"open-next": patch
---

Fix some cache issue
176 changes: 98 additions & 78 deletions packages/open-next/src/adapters/cache.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { DetachedPromise } from "utils/promise.js";

import { IncrementalCache } from "../cache/incremental/types.js";
import { TagCache } from "../cache/tag/types.js";
import { isBinaryContentType } from "./binary.js";
Expand Down Expand Up @@ -144,7 +146,8 @@
value: value,
} as CacheHandlerValue;
} catch (e) {
error("Failed to get fetch cache", e);
// We can usually ignore errors here as they are usually due to cache not being found
debug("Failed to get fetch cache", e);
return null;
}
}
Expand All @@ -166,7 +169,7 @@
// If some tags are stale we need to force revalidation
return null;
}
const requestId = globalThis.__als.getStore() ?? "";
const requestId = globalThis.__als.getStore()?.requestId ?? "";
globalThis.lastModified[requestId] = _lastModified;
if (cacheData?.type === "route") {
return {
Expand Down Expand Up @@ -208,7 +211,8 @@
return null;
}
} catch (e) {
error("Failed to get body cache", e);
// We can usually ignore errors here as they are usually due to cache not being found
debug("Failed to get body cache", e);
return null;
}
}
Expand All @@ -221,99 +225,115 @@
if (globalThis.disableIncrementalCache) {
return;
}
if (data?.kind === "ROUTE") {
const { body, status, headers } = data;
await globalThis.incrementalCache.set(
key,
{
type: "route",
body: body.toString(
isBinaryContentType(String(headers["content-type"]))
? "base64"
: "utf8",
),
meta: {
status,
headers,
},
},
false,
);
} else if (data?.kind === "PAGE") {
const { html, pageData } = data;
const isAppPath = typeof pageData === "string";
if (isAppPath) {
globalThis.incrementalCache.set(
const detachedPromise = new DetachedPromise<void>();
globalThis.__als.getStore()?.pendingPromises.push(detachedPromise);
try {
if (data?.kind === "ROUTE") {
const { body, status, headers } = data;
await globalThis.incrementalCache.set(
key,
{
type: "app",
html,
rsc: pageData,
type: "route",
body: body.toString(
isBinaryContentType(String(headers["content-type"]))
? "base64"
: "utf8",
),
meta: {
status,
headers,
},
},
false,
);
} else {
globalThis.incrementalCache.set(
} else if (data?.kind === "PAGE") {
const { html, pageData } = data;
const isAppPath = typeof pageData === "string";
if (isAppPath) {
globalThis.incrementalCache.set(
key,
{
type: "app",
html,
rsc: pageData,
},
false,
);
} else {
globalThis.incrementalCache.set(
key,
{
type: "page",
html,
json: pageData,
},
false,
);
}
} else if (data?.kind === "FETCH") {
await globalThis.incrementalCache.set<true>(key, data, true);
} else if (data?.kind === "REDIRECT") {
await globalThis.incrementalCache.set(
key,
{
type: "page",
html,
json: pageData,
type: "redirect",
props: data.props,
},
false,
);
} else if (data === null || data === undefined) {

Check warning on line 284 in packages/open-next/src/adapters/cache.ts

View workflow job for this annotation

GitHub Actions / validate

Add the missing "else" clause
await globalThis.incrementalCache.delete(key);
}
} else if (data?.kind === "FETCH") {
await globalThis.incrementalCache.set<true>(key, data, true);
} else if (data?.kind === "REDIRECT") {
await globalThis.incrementalCache.set(
key,
{
type: "redirect",
props: data.props,
},
false,
);
} else if (data === null || data === undefined) {
await globalThis.incrementalCache.delete(key);
}
// Write derivedTags to dynamodb
// If we use an in house version of getDerivedTags in build we should use it here instead of next's one
const derivedTags: string[] =
data?.kind === "FETCH"
? ctx?.tags ?? data?.data?.tags ?? [] // before version 14 next.js used data?.data?.tags so we keep it for backward compatibility
: data?.kind === "PAGE"
? data.headers?.["x-next-cache-tags"]?.split(",") ?? []
: [];
debug("derivedTags", derivedTags);
// Get all tags stored in dynamodb for the given key
// If any of the derived tags are not stored in dynamodb for the given key, write them
const storedTags = await globalThis.tagCache.getByPath(key);
const tagsToWrite = derivedTags.filter((tag) => !storedTags.includes(tag));
if (tagsToWrite.length > 0) {
await globalThis.tagCache.writeTags(
tagsToWrite.map((tag) => ({
path: key,
tag: tag,
})),
// Write derivedTags to dynamodb
// If we use an in house version of getDerivedTags in build we should use it here instead of next's one
const derivedTags: string[] =
data?.kind === "FETCH"
? ctx?.tags ?? data?.data?.tags ?? [] // before version 14 next.js used data?.data?.tags so we keep it for backward compatibility
: data?.kind === "PAGE"
? data.headers?.["x-next-cache-tags"]?.split(",") ?? []
: [];
debug("derivedTags", derivedTags);
// Get all tags stored in dynamodb for the given key
// If any of the derived tags are not stored in dynamodb for the given key, write them
const storedTags = await globalThis.tagCache.getByPath(key);
const tagsToWrite = derivedTags.filter(
(tag) => !storedTags.includes(tag),
);
if (tagsToWrite.length > 0) {
await globalThis.tagCache.writeTags(
tagsToWrite.map((tag) => ({
path: key,
tag: tag,
})),
);
}
debug("Finished setting cache");
} catch (e) {
error("Failed to set cache", e);
} finally {
// We need to resolve the promise even if there was an error
detachedPromise.resolve();
}
}

public async revalidateTag(tag: string) {
if (globalThis.disableDynamoDBCache || globalThis.disableIncrementalCache) {
return;
}
debug("revalidateTag", tag);
// Find all keys with the given tag
const paths = await globalThis.tagCache.getByTag(tag);
debug("Items", paths);
// Update all keys with the given tag with revalidatedAt set to now
await globalThis.tagCache.writeTags(
paths?.map((path) => ({
path: path,
tag: tag,
})) ?? [],
);
try {
debug("revalidateTag", tag);
// Find all keys with the given tag
const paths = await globalThis.tagCache.getByTag(tag);
debug("Items", paths);
// Update all keys with the given tag with revalidatedAt set to now
await globalThis.tagCache.writeTags(
paths?.map((path) => ({
path: path,
tag: tag,
})) ?? [],
);
} catch (e) {
error("Failed to revalidate tag", e);
}
}
}
6 changes: 6 additions & 0 deletions packages/open-next/src/adapters/server-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ setNodeEnv();
setBuildIdEnv();
setNextjsServerWorkingDirectory();

// Because next is messing with fetch, we have to make sure that we use an untouched version of fetch
declare global {
var internalFetch: typeof fetch;
}
globalThis.internalFetch = fetch;

/////////////
// Handler //
/////////////
Expand Down
2 changes: 1 addition & 1 deletion packages/open-next/src/cache/tag/dynamodb-lite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ const tagCache: TagCache = {
for (const paramsChunk of toInsert) {
await Promise.all(
paramsChunk.map(async (params) => {
const response = await awsClient.fetch(
const response = await awsFetch(
`https://dynamodb.${CACHE_BUCKET_REGION}.amazonaws.com`,
{
method: "POST",
Expand Down
6 changes: 5 additions & 1 deletion packages/open-next/src/core/createMainHandler.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { AsyncLocalStorage } from "node:async_hooks";

import type { OpenNextConfig } from "types/open-next";
import { DetachedPromise } from "utils/promise";

import { debug } from "../adapters/logger";
import { generateUniqueId } from "../adapters/util";
Expand All @@ -20,7 +21,10 @@ declare global {
var incrementalCache: IncrementalCache;
var fnName: string | undefined;
var serverId: string;
var __als: AsyncLocalStorage<string>;
var __als: AsyncLocalStorage<{
requestId: string;
pendingPromises: DetachedPromise<void>[];
}>;
}

export async function createMainHandler() {
Expand Down
77 changes: 45 additions & 32 deletions packages/open-next/src/core/requestHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,18 @@ import {
StreamCreator,
} from "http/index.js";
import { InternalEvent, InternalResult } from "types/open-next";
import { DetachedPromise } from "utils/promise";

import { debug, error, warn } from "../adapters/logger";
import { convertRes, createServerResponse, proxyRequest } from "./routing/util";
import routingHandler, { MiddlewareOutputEvent } from "./routingHandler";
import { requestHandler, setNextjsPrebundledReact } from "./util";

// This is used to identify requests in the cache
globalThis.__als = new AsyncLocalStorage<string>();
globalThis.__als = new AsyncLocalStorage<{
requestId: string;
pendingPromises: DetachedPromise<any>[];
}>();

export async function openNextHandler(
internalEvent: InternalEvent,
Expand Down Expand Up @@ -81,37 +85,46 @@ export async function openNextHandler(
remoteAddress: preprocessedEvent.remoteAddress,
};
const requestId = Math.random().toString(36);
const internalResult = await globalThis.__als.run(requestId, async () => {
const preprocessedResult = preprocessResult as MiddlewareOutputEvent;
const req = new IncomingMessage(reqProps);
const res = createServerResponse(
preprocessedEvent,
overwrittenResponseHeaders,
responseStreaming,
);

await processRequest(
req,
res,
preprocessedEvent,
preprocessedResult.isExternalRewrite,
);

const { statusCode, headers, isBase64Encoded, body } = convertRes(res);

const internalResult = {
type: internalEvent.type,
statusCode,
headers,
body,
isBase64Encoded,
};

// reset lastModified. We need to do this to avoid memory leaks
delete globalThis.lastModified[requestId];

return internalResult;
});
const pendingPromises: DetachedPromise<void>[] = [];
const internalResult = await globalThis.__als.run(
{ requestId, pendingPromises },
async () => {
const preprocessedResult = preprocessResult as MiddlewareOutputEvent;
const req = new IncomingMessage(reqProps);
const res = createServerResponse(
preprocessedEvent,
overwrittenResponseHeaders,
responseStreaming,
);

await processRequest(
req,
res,
preprocessedEvent,
preprocessedResult.isExternalRewrite,
);

const { statusCode, headers, isBase64Encoded, body } = convertRes(res);

const internalResult = {
type: internalEvent.type,
statusCode,
headers,
body,
isBase64Encoded,
};

// reset lastModified. We need to do this to avoid memory leaks
delete globalThis.lastModified[requestId];

// Wait for all promises to resolve
// We are not catching errors here, because they are catched before
// This may need to change in the future
await Promise.all(pendingPromises.map((p) => p.promise));

return internalResult;
},
);
return internalResult;
}
}
Expand Down
Loading
Loading