Skip to content

Commit

Permalink
Backport missing patch from elastic/elasticsearch-js#2027
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshMock committed Apr 3, 2024
1 parent 1caa7f4 commit 6fca2d3
Show file tree
Hide file tree
Showing 2 changed files with 373 additions and 14 deletions.
30 changes: 16 additions & 14 deletions src/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ export interface BulkStats {
aborted: boolean
}

interface IndexAction {
interface IndexActionOperation {
index: T.BulkIndexOperation
}

interface CreateAction {
interface CreateActionOperation {
create: T.BulkCreateOperation
}

Expand All @@ -90,7 +90,9 @@ interface DeleteAction {
delete: T.BulkDeleteOperation
}

type UpdateAction = [UpdateActionOperation, Record<string, any>]
type CreateAction = CreateActionOperation | [CreateActionOperation, unknown]
type IndexAction = IndexActionOperation | [IndexActionOperation, unknown]
type UpdateAction = [UpdateActionOperation, T.BulkUpdateAction]
type Action = IndexAction | CreateAction | UpdateAction | DeleteAction

export interface OnDropDocument<TDocument = unknown> {
Expand Down Expand Up @@ -646,22 +648,21 @@ export default class Helpers {
for await (const chunk of datasource) {
if (shouldAbort) break
timeoutRef.refresh()
const action = onDocument(chunk)
const operation = Array.isArray(action)
? Object.keys(action[0])[0]
: Object.keys(action)[0]
const result = onDocument(chunk)
const [action, payload] = Array.isArray(result) ? result : [result, chunk]
const operation = Object.keys(action)[0]
if (operation === 'index' || operation === 'create') {
actionBody = serializer.serialize(action)
payloadBody = typeof chunk === 'string' ? chunk : serializer.serialize(chunk)
payloadBody = typeof payload === 'string'
? payload
: serializer.serialize(payload)
chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody)
bulkBody.push(actionBody, payloadBody)
} else if (operation === 'update') {
// @ts-expect-error in case of update action is an array
actionBody = serializer.serialize(action[0])
actionBody = serializer.serialize(action)
payloadBody = typeof chunk === 'string'
? `{"doc":${chunk}}`
// @ts-expect-error in case of update action is an array
: serializer.serialize({ doc: chunk, ...action[1] })
: serializer.serialize({ doc: chunk, ...payload })
chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody)
bulkBody.push(actionBody, payloadBody)
} else if (operation === 'delete') {
Expand All @@ -675,10 +676,11 @@ export default class Helpers {

if (chunkBytes >= flushBytes) {
stats.bytes += chunkBytes
const send = await semaphore()
send(bulkBody.slice())
const bulkBodyCopy = bulkBody.slice()
bulkBody.length = 0
chunkBytes = 0
const send = await semaphore()
send(bulkBodyCopy)
}
}

Expand Down
Loading

0 comments on commit 6fca2d3

Please sign in to comment.