Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: fetch only new inputs on refresh of inputs history #5315

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
4 changes: 4 additions & 0 deletions backend/windmill-api/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10159,6 +10159,10 @@ paths:
in: query
schema:
type: boolean
- name: since
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding a description and format (e.g. ISO8601) for the new 'since' query parameter to improve API documentation clarity.

in: query
schema:
type: string
responses:
"200":
description: Input history for completed jobs
Expand Down
14 changes: 12 additions & 2 deletions backend/windmill-api/src/inputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ use serde_json::Value;
use sqlx::{types::Uuid, FromRow};
use std::{
fmt::{Display, Formatter},
str::FromStr,
vec,
};
use windmill_common::{
db::UserDB,
error::JsonResult,
error::{self, JsonResult},
jobs::JobKind,
scripts::to_i64,
utils::{not_found_if_none, paginate, Pagination},
Expand Down Expand Up @@ -118,6 +119,7 @@ pub struct CompletedJobMini {
#[derive(Deserialize)]
struct GetInputHistory {
include_preview: Option<bool>,
since: Option<String>,
}

async fn get_input_history(
Expand All @@ -135,7 +137,8 @@ async fn get_input_history(
let sql = &format!(
"select id, v2_job.created_at, created_by, 'null'::jsonb as args, status = 'success' as success from v2_job JOIN v2_job_completed USING (id) \
where {} = $1 and kind = any($2) and v2_job.workspace_id = $3 AND v2_job_completed.status != 'skipped' \
order by v2_job.created_at desc limit $4 offset $5",
AND v2_job_completed.started_at >= $4 \
order by v2_job.created_at desc limit $5 offset $6",
r.runnable_type.column_name()
);

Expand All @@ -156,9 +159,16 @@ async fn get_input_history(
kind => vec![kind],
};

let since: DateTime<chrono::Local> = if let Some(date) = g.since {
DateTime::from_str(date.as_str()).map_err(|e| error::Error::BadRequest(format!("{e}")))?
} else {
DateTime::default()
};

let rows = query
.bind(job_kinds)
.bind(&w_id)
.bind(since)
.bind(per_page as i32)
.bind(offset as i32)
.fetch_all(&mut *tx)
Expand Down
35 changes: 30 additions & 5 deletions frontend/src/lib/components/HistoricList.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,40 @@
export let selected: string | undefined = undefined

let infiniteList: InfiniteList | undefined = undefined
let loadInputsPageFn: ((page: number, perPage: number) => Promise<any>) | undefined = undefined
let loadInputsPageFn:
| ((page: number, perPage: number, discovery: boolean) => Promise<any>)
| undefined = undefined

export function refresh() {
if (infiniteList) {
infiniteList.loadData('refresh')
}
}
let cachedArgs: Record<string, any> = {}
let items: any[] = []
let potentialItems: any[] = []
let perPageBind = 0
let lastChecked: string | undefined = undefined

let interval: NodeJS.Timeout | undefined = undefined
function initLoadInputs() {
interval && clearInterval(interval)
interval = setInterval(() => {
refresh()
}, 10000)
loadInputsPageFn = async (page: number, perPage: number) => {
const inputs = await InputService.getInputHistory({
loadInputsPageFn = async (page: number, perPage: number, discovery: boolean) => {
const request = InputService.getInputHistory({
workspace: $workspaceStore!,
runnableId,
runnableType,
page,
perPage,
includePreview: true
includePreview: true,
// If it is discovery, then we would like to fetch all values
since: !discovery ? lastChecked : undefined
})
if (!discovery) lastChecked = new Date().toJSON()
const inputs = await request

const inputsWithPayload = await Promise.all(
inputs.map(async (input) => {
Expand All @@ -58,7 +68,22 @@
}
})
)
return inputsWithPayload
if (!discovery) {
// Add new items to beginning
items.unshift(...inputsWithPayload)
// We need to know when to apply potential items,
// it only happens when InfiniteList decides to expand list
//
// We cannot apply potential items right after fetch,
// because that would trigger expansion in list
// and old items will be loading by 10 every reload
if (perPageBind != perPage) items.push(...potentialItems), (perPageBind = perPage)
return items
} else {
// Save discovered items to buffer and apply later
potentialItems = inputsWithPayload
return potentialItems
}
}
infiniteList?.setLoader(loadInputsPageFn)
}
Expand Down
8 changes: 4 additions & 4 deletions frontend/src/lib/components/InfiniteList.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
let hasAlreadyFailed = false
let hovered: any | undefined = undefined
let initLoad = false
let loadInputs: ((page: number, perPage: number) => Promise<any[]>) | undefined = undefined
let loadInputs: ((page: number, perPage: number, discovery: boolean) => Promise<any[]>) | undefined = undefined
let deleteItemFn: ((id: any) => Promise<any>) | undefined = undefined

export async function loadData(loadOption: 'refresh' | 'forceRefresh' | 'loadMore' = 'loadMore') {
Expand All @@ -33,7 +33,7 @@
}

try {
const newItems = await loadInputs(1, page * perPage)
const newItems = await loadInputs(1, page * perPage, false)

if (
loadOption === 'refresh' &&
Expand Down Expand Up @@ -63,7 +63,7 @@
page = Math.ceil(items.length / perPage)
hasMore = items.length === perPage * page
if (hasMore) {
const potentialNewItems = await loadInputs(page + 1, perPage)
const potentialNewItems = await loadInputs(page + 1, perPage, true)
hasMore = potentialNewItems.length > 0
}
initLoad = true
Expand Down Expand Up @@ -96,7 +96,7 @@
}
}

export async function setLoader(loader: (page: number, perPage: number) => Promise<any[]>) {
export async function setLoader(loader: (page: number, perPage: number, discovery: boolean) => Promise<any[]>) {
loadInputs = loader
loadData('forceRefresh')
}
Expand Down
2 changes: 1 addition & 1 deletion frontend/src/lib/components/SavedInputsPicker.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
}

function initLoadInputs() {
const loadInputsPageFn = async (page: number, perPage: number) => {
const loadInputsPageFn = async (page: number, perPage: number, _discovery: boolean) => {
const inputs = await InputService.listInputs({
workspace: $workspaceStore!,
runnableId,
Expand Down
2 changes: 1 addition & 1 deletion frontend/src/lib/components/triggers/CaptureTable.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
}

function initLoadCaptures(testKind: 'preprocessor' | 'main' = 'main') {
const loadInputsPageFn = async (page: number, perPage: number) => {
const loadInputsPageFn = async (page: number, perPage: number, _discovery: boolean) => {
const captures = await CaptureService.listCaptures({
workspace: $workspaceStore!,
runnableKind: isFlow ? 'flow' : 'script',
Expand Down