Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix hang in bulk helper semaphore when server responses are slower than flushInterval #2027

Merged
merged 5 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions src/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ export default class Helpers {
let chunkBytes = 0
timeoutRef = setTimeout(onFlushTimeout, flushInterval) // eslint-disable-line

// @ts-expect-error datasoruce is an iterable
// @ts-expect-error datasource is an iterable
for await (const chunk of datasource) {
if (shouldAbort) break
timeoutRef.refresh()
Expand Down Expand Up @@ -656,15 +656,16 @@ export default class Helpers {

if (chunkBytes >= flushBytes) {
stats.bytes += chunkBytes
const send = await semaphore()
send(bulkBody.slice())
const bulkBodyCopy = bulkBody.slice()
bulkBody.length = 0
chunkBytes = 0
const send = await semaphore()
send(bulkBodyCopy)
}
}

clearTimeout(timeoutRef)
// In some cases the previos http call does not have finished,
// In some cases the previous http call has not finished,
// or we didn't reach the flush bytes threshold, so we force one last operation.
if (!shouldAbort && chunkBytes > 0) {
const send = await semaphore()
Expand Down Expand Up @@ -708,8 +709,8 @@ export default class Helpers {
// to guarantee that no more than the number of operations
// allowed to run at the same time are executed.
// It returns a semaphore function which resolves in the next tick
// if we didn't reach the maximim concurrency yet, otherwise it returns
// a promise that resolves as soon as one of the running request has finshed.
// if we didn't reach the maximum concurrency yet, otherwise it returns
// a promise that resolves as soon as one of the running requests has finished.
// The semaphore function resolves a send function, which will be used
// to send the actual bulk request.
// It also returns a finish function, which returns a promise that is resolved
Expand Down
149 changes: 149 additions & 0 deletions test/unit/helpers/bulk.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ import { createReadStream } from 'fs'
import * as http from 'http'
import { join } from 'path'
import split from 'split2'
import { Readable } from 'stream'
import { test } from 'tap'
import { Client, errors } from '../../../'
import { buildServer, connection } from '../../utils'
const { sleep } = require('../../integration/helper')

let clientVersion: string = require('../../../package.json').version // eslint-disable-line
if (clientVersion.includes('-')) {
Expand Down Expand Up @@ -1594,3 +1596,150 @@ test('Flush interval', t => {

t.end()
})

test(`flush timeout does not lock process when flushInterval is less than server timeout`, async t => {
const flushInterval = 500

async function handler (req: http.IncomingMessage, res: http.ServerResponse) {
setTimeout(() => {
res.writeHead(200, { 'content-type': 'application/json' })
res.end(JSON.stringify({ errors: false, items: [{}] }))
}, 1000)
}

const [{ port }, server] = await buildServer(handler)
const client = new Client({ node: `http://localhost:${port}` })

async function * generator () {
const data = dataset.slice()
for (const doc of data) {
await sleep(flushInterval)
yield doc
}
}

const result = await client.helpers.bulk({
datasource: Readable.from(generator()),
flushBytes: 1,
flushInterval: flushInterval,
concurrency: 1,
onDocument (_) {
return {
index: { _index: 'test' }
}
},
onDrop (_) {
t.fail('This should never be called')
}
})

t.type(result.time, 'number')
t.type(result.bytes, 'number')
t.match(result, {
total: 3,
successful: 3,
retry: 0,
failed: 0,
aborted: false
})

server.stop()
})

test(`flush timeout does not lock process when flushInterval is greater than server timeout`, async t => {
const flushInterval = 500

async function handler (req: http.IncomingMessage, res: http.ServerResponse) {
setTimeout(() => {
res.writeHead(200, { 'content-type': 'application/json' })
res.end(JSON.stringify({ errors: false, items: [{}] }))
}, 250)
}

const [{ port }, server] = await buildServer(handler)
const client = new Client({ node: `http://localhost:${port}` })

async function * generator () {
const data = dataset.slice()
for (const doc of data) {
await sleep(flushInterval)
yield doc
}
}

const result = await client.helpers.bulk({
datasource: Readable.from(generator()),
flushBytes: 1,
flushInterval: flushInterval,
concurrency: 1,
onDocument (_) {
return {
index: { _index: 'test' }
}
},
onDrop (_) {
t.fail('This should never be called')
}
})

t.type(result.time, 'number')
t.type(result.bytes, 'number')
t.match(result, {
total: 3,
successful: 3,
retry: 0,
failed: 0,
aborted: false
})

server.stop()
})

test(`flush timeout does not lock process when flushInterval is equal to server timeout`, async t => {
const flushInterval = 500

async function handler (req: http.IncomingMessage, res: http.ServerResponse) {
setTimeout(() => {
res.writeHead(200, { 'content-type': 'application/json' })
res.end(JSON.stringify({ errors: false, items: [{}] }))
}, flushInterval)
}

const [{ port }, server] = await buildServer(handler)
const client = new Client({ node: `http://localhost:${port}` })

async function * generator () {
const data = dataset.slice()
for (const doc of data) {
await sleep(flushInterval)
yield doc
}
}

const result = await client.helpers.bulk({
datasource: Readable.from(generator()),
flushBytes: 1,
flushInterval: flushInterval,
concurrency: 1,
onDocument (_) {
return {
index: { _index: 'test' }
}
},
onDrop (_) {
t.fail('This should never be called')
}
})

t.type(result.time, 'number')
t.type(result.bytes, 'number')
t.match(result, {
total: 3,
successful: 3,
retry: 0,
failed: 0,
aborted: false
})

server.stop()
})
Loading