1
1
import { Readable , Transform } from 'node:stream'
2
2
import { promisify } from 'node:util'
3
+ import { DBPatch } from '@naturalcycles/db-lib'
3
4
import {
4
5
BaseCommonDB ,
5
6
CommonDB ,
@@ -15,6 +16,7 @@ import {
15
16
_mapKeys ,
16
17
_mapValues ,
17
18
_Memo ,
19
+ _omit ,
18
20
AnyObjectWithId ,
19
21
CommonLogger ,
20
22
commonLoggerPrefix ,
@@ -34,7 +36,7 @@ import {
34
36
TypeCast ,
35
37
} from 'mysql'
36
38
import * as mysql from 'mysql'
37
- import { dbQueryToSQLDelete , dbQueryToSQLSelect , insertSQL } from './query.util'
39
+ import { dbQueryToSQLDelete , dbQueryToSQLSelect , dbQueryToSQLUpdate , insertSQL } from './query.util'
38
40
import {
39
41
jsonSchemaToMySQLDDL ,
40
42
mapNameFromMySQL ,
@@ -306,7 +308,7 @@ export class MysqlDB extends BaseCommonDB implements CommonDB {
306
308
307
309
const verb = opt . saveMethod === 'insert' ? 'INSERT' : 'REPLACE'
308
310
309
- if ( opt . assignGeneratedIds ) {
311
+ if ( opt . assignGeneratedIds && opt . saveMethod !== 'update' ) {
310
312
// Insert rows one-by-one, to get their auto-generated id
311
313
312
314
let i = - 1
@@ -323,10 +325,18 @@ export class MysqlDB extends BaseCommonDB implements CommonDB {
323
325
}
324
326
325
327
// inserts are split into multiple sentenses to respect the max_packet_size (1Mb usually)
326
- const sqls = insertSQL ( table , rows , verb , this . cfg . logger )
328
+ if ( opt . saveMethod === 'update' ) {
329
+ for await ( const row of rows ) {
330
+ _assert ( row . id , 'Cannot update without providing an id' )
331
+ const query = new DBQuery ( table ) . filterEq ( 'id' , row . id )
332
+ await this . updateByQuery ( query , _omit ( row , [ 'id' ] ) )
333
+ }
334
+ } else {
335
+ const sqls = insertSQL ( table , rows , verb , this . cfg . logger )
327
336
328
- for await ( const sql of sqls ) {
329
- await this . runSQL ( { sql } )
337
+ for await ( const sql of sqls ) {
338
+ await this . runSQL ( { sql } )
339
+ }
330
340
}
331
341
}
332
342
@@ -393,4 +403,15 @@ export class MysqlDB extends BaseCommonDB implements CommonDB {
393
403
394
404
return mysqlTableStatsToJsonSchemaField < ROW > ( table , stats , this . cfg . logger )
395
405
}
406
+
407
+ override async updateByQuery < ROW extends ObjectWithId > (
408
+ q : DBQuery < ROW > ,
409
+ patch : DBPatch < ROW > ,
410
+ ) : Promise < number > {
411
+ const sql = dbQueryToSQLUpdate ( q , patch )
412
+ if ( ! sql ) return 0
413
+
414
+ const { affectedRows } = await this . runSQL < OkPacket > ( { sql } )
415
+ return affectedRows
416
+ }
396
417
}
0 commit comments