-
Notifications
You must be signed in to change notification settings - Fork 118
/
Copy pathsweep_batch.go
2063 lines (1707 loc) · 58.2 KB
/
sweep_batch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package sweepbatcher
import (
"bytes"
"context"
"encoding/hex"
"errors"
"fmt"
"math"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/btcsuite/btcd/blockchain"
"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/btcec/v2/schnorr/musig2"
"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/btcutil/psbt"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btclog"
"github.com/lightninglabs/lndclient"
"github.com/lightninglabs/loop/loopdb"
"github.com/lightninglabs/loop/swap"
sweeppkg "github.com/lightninglabs/loop/sweep"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/lnrpc/walletrpc"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
)
const (
// defaultFeeRateStep is the default value by which the batch tx's
// fee rate is increased when an rbf is attempted.
defaultFeeRateStep = chainfee.SatPerKWeight(100)
// batchConfHeight is the default confirmation height of the batch
// transaction.
batchConfHeight = 3
// maxFeeToSwapAmtRatio is the maximum fee to swap amount ratio that
// we allow for a batch transaction.
maxFeeToSwapAmtRatio = 0.2
// MaxSweepsPerBatch is the maximum number of sweeps in a single batch.
// It is needed to prevent sweep tx from becoming non-standard. Max
// standard transaction is 400k wu, a non-cooperative input is 393 wu.
MaxSweepsPerBatch = 1000
)
var (
ErrBatchShuttingDown = errors.New("batch shutting down")
)
// sweep stores any data related to sweeping a specific outpoint.
type sweep struct {
// swapHash is the hash of the swap that the sweep belongs to.
swapHash lntypes.Hash
// outpoint is the outpoint being swept.
outpoint wire.OutPoint
// value is the value of the outpoint being swept.
value btcutil.Amount
// confTarget is the confirmation target of the sweep.
confTarget int32
// timeout is the timeout of the swap that the sweep belongs to.
timeout int32
// initiationHeight is the height at which the swap was initiated.
initiationHeight int32
// htlc is the HTLC that is being swept.
htlc swap.Htlc
// preimage is the preimage of the HTLC that is being swept.
preimage lntypes.Preimage
// swapInvoicePaymentAddr is the payment address of the swap invoice.
swapInvoicePaymentAddr [32]byte
// htlcKeys is the set of keys used to sign the HTLC.
htlcKeys loopdb.HtlcKeys
// htlcSuccessEstimator is a function that estimates the weight of the
// HTLC success script.
htlcSuccessEstimator func(*input.TxWeightEstimator) error
// protocolVersion is the protocol version of the swap that the sweep
// belongs to.
protocolVersion loopdb.ProtocolVersion
// isExternalAddr is true if the sweep spends to a non-wallet address.
isExternalAddr bool
// destAddr is the destination address of the sweep.
destAddr btcutil.Address
// notifier is a collection of channels used to communicate the status
// of the sweep back to the swap that requested it.
notifier *SpendNotifier
// minFeeRate is minimum fee rate that must be used by a batch of
// the sweep. If it is specified, confTarget is ignored.
minFeeRate chainfee.SatPerKWeight
// nonCoopHint is set, if the sweep can not be spent cooperatively and
// has to be spent using preimage. This is only used in fee estimations
// when selecting a batch for the sweep to minimize fees.
nonCoopHint bool
// coopFailed is set, if we have tried to spend the sweep cooperatively,
// but it failed. We try to spend a sweep cooperatively only once. This
// status is not persisted in the DB.
coopFailed bool
// presigned is set, if the sweep should be handled in presigned mode.
presigned bool
}
// batchState is the state of the batch.
type batchState uint8
const (
// Open is the state in which the batch is able to accept new sweeps.
Open batchState = 0
// Closed is the state in which the batch is no longer able to accept
// new sweeps.
Closed batchState = 1
// Confirmed is the state in which the batch transaction has reached the
// configured conf height.
Confirmed batchState = 2
)
// batchConfig is the configuration for a batch.
type batchConfig struct {
// maxTimeoutDistance is the maximum timeout distance that 2 distinct
// sweeps can have in the same batch.
maxTimeoutDistance int32
// batchConfTarget is the confirmation target of the batch transaction.
batchConfTarget int32
// clock provides methods to work with time and timers.
clock clock.Clock
// initialDelay is the delay of first batch publishing after creation.
// It only affects newly created batches, not batches loaded from DB,
// so publishing does happen in case of a daemon restart (especially
// important in case of a crashloop).
initialDelay time.Duration
// batchPublishDelay is the delay between receiving a new block or
// initial delay completion and publishing the batch transaction.
batchPublishDelay time.Duration
// noBumping instructs sweepbatcher not to fee bump itself and rely on
// external source of fee rates (FeeRateProvider).
noBumping bool
// txLabeler is a function generating a transaction label. It is called
// before publishing a batch transaction. Batch ID is passed to it.
txLabeler func(batchID int32) string
// customMuSig2Signer is a custom signer. If it is set, it is used to
// create musig2 signatures instead of musig2SignSweep and signerClient.
// Note that musig2SignSweep must be nil in this case, however signer
// client must still be provided, as it is used for non-coop spendings.
customMuSig2Signer SignMuSig2
// presignedHelper provides methods used when presigned batches are
// enabled.
presignedHelper PresignedHelper
// chainParams are the chain parameters of the chain that is used by
// batches.
chainParams *chaincfg.Params
}
// rbfCache stores data related to our last fee bump.
type rbfCache struct {
// LastHeight is the last height at which we increased our feerate.
LastHeight int32
// FeeRate is the last used fee rate we used to publish a batch tx.
FeeRate chainfee.SatPerKWeight
// SkipNextBump instructs updateRbfRate to skip one fee bumping.
// It is set upon updating FeeRate externally.
SkipNextBump bool
}
// batch is a collection of sweeps that are published together.
type batch struct {
// id is the primary identifier of this batch.
id int32
// state is the current state of the batch.
state batchState
// primarySweepID is the swap hash of the primary sweep in the batch.
primarySweepID lntypes.Hash
// sweeps store the sweeps that this batch currently contains.
sweeps map[lntypes.Hash]sweep
// currentHeight is the current block height.
currentHeight int32
// spendChan is the channel over which spend notifications are received.
spendChan chan *chainntnfs.SpendDetail
// confChan is the channel over which confirmation notifications are
// received.
confChan chan *chainntnfs.TxConfirmation
// reorgChan is the channel over which reorg notifications are received.
reorgChan chan struct{}
// testReqs is a channel where test requests are received.
// This is used only in unit tests! The reason to have this is to
// avoid data races in require.Eventually calls running in parallel
// to the event loop. See method testRunInEventLoop().
testReqs chan *testRequest
// errChan is the channel over which errors are received.
errChan chan error
// batchTxid is the transaction that is currently being monitored for
// confirmations.
batchTxid *chainhash.Hash
// batchPkScript is the pkScript of the batch transaction's output.
batchPkScript []byte
// batchAddress is the address of the batch transaction's output.
batchAddress btcutil.Address
// rbfCache stores data related to the RBF fee bumping mechanism.
rbfCache rbfCache
// callEnter is used to sequentialize calls to the batch handler's
// main event loop.
callEnter chan struct{}
// callLeave is used to resume the execution flow of the batch handler's
// main event loop.
callLeave chan struct{}
// stopping signals that the batch is stopping.
stopping chan struct{}
// finished signals that the batch has stopped and all child goroutines
// have finished.
finished chan struct{}
// quit is owned by the parent batcher and signals that the batch must
// stop.
quit chan struct{}
// wallet is the wallet client used to create and publish the batch
// transaction.
wallet lndclient.WalletKitClient
// chainNotifier is the chain notifier client used to monitor the
// blockchain for spends and confirmations.
chainNotifier lndclient.ChainNotifierClient
// signerClient is the signer client used to sign the batch transaction.
signerClient lndclient.SignerClient
// muSig2SignSweep includes all the required functionality to collect
// and verify signatures by the swap server in order to cooperatively
// sweep funds.
muSig2SignSweep MuSig2SignSweep
// verifySchnorrSig is a function that verifies a schnorr signature.
verifySchnorrSig VerifySchnorrSig
// publishErrorHandler is a function that handles transaction publishing
// error. By default, it logs all errors as warnings, but "insufficient
// fee" as Info.
publishErrorHandler PublishErrorHandler
// purger is a function that can take a sweep which is being purged and
// hand it over to the batcher for further processing.
purger Purger
// store includes all the database interactions that are needed by the
// batch.
store BatcherStore
// cfg is the configuration for this batch.
cfg *batchConfig
// log_ is the logger for this batch.
log_ atomic.Pointer[btclog.Logger]
wg sync.WaitGroup
}
// Purger is a function that takes a sweep request and feeds it back to the
// batcher main entry point. The name is inspired by its purpose, which is to
// purge the batch from sweeps that didn't make it to the confirmed tx.
type Purger func(sweepReq *SweepRequest) error
// batchKit is a kit of dependencies that are used to initialize a batch. This
// struct is only used as a wrapper for the arguments that are required to
// create a new batch.
type batchKit struct {
id int32
batchTxid *chainhash.Hash
batchPkScript []byte
state batchState
primaryID lntypes.Hash
sweeps map[lntypes.Hash]sweep
rbfCache rbfCache
returnChan chan SweepRequest
wallet lndclient.WalletKitClient
chainNotifier lndclient.ChainNotifierClient
signerClient lndclient.SignerClient
musig2SignSweep MuSig2SignSweep
verifySchnorrSig VerifySchnorrSig
publishErrorHandler PublishErrorHandler
purger Purger
store BatcherStore
log btclog.Logger
quit chan struct{}
}
// scheduleNextCall schedules the next call to the batch handler's main event
// loop. It returns a function that must be called when the call is finished.
func (b *batch) scheduleNextCall() (func(), error) {
select {
case b.callEnter <- struct{}{}:
case <-b.quit:
return func() {}, ErrBatcherShuttingDown
case <-b.stopping:
return func() {}, ErrBatchShuttingDown
case <-b.finished:
return func() {}, ErrBatchShuttingDown
}
return func() {
b.callLeave <- struct{}{}
}, nil
}
// NewBatch creates a new batch.
func NewBatch(cfg batchConfig, bk batchKit) *batch {
return &batch{
// We set the ID to a negative value to flag that this batch has
// never been persisted, so it needs to be assigned a new ID.
id: -1,
state: Open,
sweeps: make(map[lntypes.Hash]sweep),
spendChan: make(chan *chainntnfs.SpendDetail),
confChan: make(chan *chainntnfs.TxConfirmation, 1),
reorgChan: make(chan struct{}, 1),
testReqs: make(chan *testRequest),
errChan: make(chan error, 1),
callEnter: make(chan struct{}),
callLeave: make(chan struct{}),
stopping: make(chan struct{}),
finished: make(chan struct{}),
quit: bk.quit,
batchTxid: bk.batchTxid,
wallet: bk.wallet,
chainNotifier: bk.chainNotifier,
signerClient: bk.signerClient,
muSig2SignSweep: bk.musig2SignSweep,
verifySchnorrSig: bk.verifySchnorrSig,
publishErrorHandler: bk.publishErrorHandler,
purger: bk.purger,
store: bk.store,
cfg: &cfg,
}
}
// NewBatchFromDB creates a new batch that already existed in storage.
func NewBatchFromDB(cfg batchConfig, bk batchKit) (*batch, error) {
// Make sure the batch is not empty.
if len(bk.sweeps) == 0 {
// This should never happen, as this precondition is already
// ensured in spinUpBatchFromDB.
return nil, fmt.Errorf("empty batch is not allowed")
}
// Assign batchConfTarget to primary sweep's confTarget.
for _, sweep := range bk.sweeps {
if sweep.swapHash == bk.primaryID {
cfg.batchConfTarget = sweep.confTarget
break
}
}
b := &batch{
id: bk.id,
state: bk.state,
primarySweepID: bk.primaryID,
sweeps: bk.sweeps,
spendChan: make(chan *chainntnfs.SpendDetail),
confChan: make(chan *chainntnfs.TxConfirmation, 1),
reorgChan: make(chan struct{}, 1),
testReqs: make(chan *testRequest),
errChan: make(chan error, 1),
callEnter: make(chan struct{}),
callLeave: make(chan struct{}),
stopping: make(chan struct{}),
finished: make(chan struct{}),
quit: bk.quit,
batchTxid: bk.batchTxid,
batchPkScript: bk.batchPkScript,
rbfCache: bk.rbfCache,
wallet: bk.wallet,
chainNotifier: bk.chainNotifier,
signerClient: bk.signerClient,
muSig2SignSweep: bk.musig2SignSweep,
verifySchnorrSig: bk.verifySchnorrSig,
publishErrorHandler: bk.publishErrorHandler,
purger: bk.purger,
store: bk.store,
cfg: &cfg,
}
b.setLog(bk.log)
return b, nil
}
// log returns current logger.
func (b *batch) log() btclog.Logger {
return *b.log_.Load()
}
// setLog atomically replaces the logger.
func (b *batch) setLog(logger btclog.Logger) {
b.log_.Store(&logger)
}
// Debugf logs a message with level DEBUG.
func (b *batch) Debugf(format string, params ...interface{}) {
b.log().Debugf(format, params...)
}
// Infof logs a message with level INFO.
func (b *batch) Infof(format string, params ...interface{}) {
b.log().Infof(format, params...)
}
// Warnf logs a message with level WARN.
func (b *batch) Warnf(format string, params ...interface{}) {
b.log().Warnf(format, params...)
}
// Errorf logs a message with level ERROR.
func (b *batch) Errorf(format string, params ...interface{}) {
b.log().Errorf(format, params...)
}
// addSweep tries to add a sweep to the batch. If this is the first sweep being
// added to the batch then it also sets the primary sweep ID. If presigned mode
// is enabled, the result depends on the outcome of presignedHelper.Presign for
// a non-empty batch. For an empty batch, the input needs to pass PresignSweep.
func (b *batch) addSweep(ctx context.Context, sweep *sweep) (bool, error) {
done, err := b.scheduleNextCall()
defer done()
if err != nil {
return false, err
}
// If the provided sweep is nil, we can't proceed with any checks, so
// we just return early.
if sweep == nil {
b.Infof("the sweep is nil")
return false, nil
}
// Before we run through the acceptance checks, let's just see if this
// sweep is already in our batch. In that case, just update the sweep.
oldSweep, ok := b.sweeps[sweep.swapHash]
if ok {
// Preserve coopFailed value not to forget about cooperative
// spending failure in this sweep.
tmp := *sweep
tmp.coopFailed = oldSweep.coopFailed
// If the sweep was resumed from storage, and the swap requested
// to sweep again, a new sweep notifier will be created by the
// swap. By re-assigning to the batch's sweep we make sure that
// everything, including the notifier, is up to date.
b.sweeps[sweep.swapHash] = tmp
// If this is the primary sweep, we also need to update the
// batch's confirmation target and fee rate.
if b.primarySweepID == sweep.swapHash {
b.cfg.batchConfTarget = sweep.confTarget
b.rbfCache.SkipNextBump = true
}
// Update batch's fee rate to be greater than or equal to
// minFeeRate of the sweep. Make sure batch's fee rate does not
// decrease (otherwise it won't pass RBF rules and won't be
// broadcasted) and that it is not lower that minFeeRate of
// other sweeps (so it is applied).
if b.rbfCache.FeeRate < sweep.minFeeRate {
b.rbfCache.FeeRate = sweep.minFeeRate
}
return true, nil
}
// Enforce MaxSweepsPerBatch. If there are already too many sweeps in
// the batch, do not add another sweep to prevent the tx from becoming
// non-standard.
if len(b.sweeps) >= MaxSweepsPerBatch {
b.Infof("the batch has already too many sweeps %d >= %d",
len(b.sweeps), MaxSweepsPerBatch)
return false, nil
}
// Since all the actions of the batch happen sequentially, we could
// arrive here after the batch got closed because of a spend. In this
// case we cannot add the sweep to this batch.
if b.state != Open {
b.Infof("the batch state (%v) is not open", b.state)
return false, nil
}
// If this batch contains a single sweep that spends to a non-wallet
// address, or the incoming sweep is spending to non-wallet address,
// we cannot add this sweep to the batch.
for _, s := range b.sweeps {
if s.isExternalAddr {
b.Infof("the batch already has a sweep %x with "+
"an external address", s.swapHash[:6])
return false, nil
}
if sweep.isExternalAddr {
b.Infof("the batch is not empty and new sweep %x "+
"has an external address", sweep.swapHash[:6])
return false, nil
}
}
// Check the timeout of the incoming sweep against the timeout of all
// already contained sweeps. If that difference exceeds the configured
// maximum we cannot add this sweep.
for _, s := range b.sweeps {
timeoutDistance :=
int32(math.Abs(float64(sweep.timeout - s.timeout)))
if timeoutDistance > b.cfg.maxTimeoutDistance {
b.Infof("too long timeout distance between the "+
"batch and sweep %x: %d > %d",
sweep.swapHash[:6], timeoutDistance,
b.cfg.maxTimeoutDistance)
return false, nil
}
}
// If presigned mode is enabled, we should first presign the new version
// of batch transaction. Also ensure that all the sweeps in the batch
// use the same mode (presigned or regular).
if sweep.presigned {
// Ensure that all the sweeps in the batch use presigned mode.
for _, s := range b.sweeps {
if !s.presigned {
b.Infof("failed to add presigned sweep %x to "+
"the batch, because the batch has "+
"non-presigned sweep %x",
sweep.swapHash[:6], s.swapHash[:6])
return false, nil
}
}
if len(b.sweeps) != 0 {
if err := b.presign(ctx, sweep); err != nil {
b.Infof("failed to add sweep %x to the "+
"batch, because failed to presign new "+
"version of batch tx: %v",
sweep.swapHash[:6], err)
return false, nil
}
} else {
if err := b.ensurePresigned(ctx, sweep); err != nil {
return false, fmt.Errorf("failed to check "+
"signing of input %x, this means that "+
"batcher.PresignSweep was not called "+
"prior to AddSweep for this input: %w",
sweep.swapHash[:6], err)
}
}
} else {
// Ensure that all the sweeps in the batch don't use presigned.
for _, s := range b.sweeps {
if s.presigned {
b.Infof("failed to add a non-presigned sweep "+
"%x to the batch, because the batch "+
"has presigned sweep %x",
sweep.swapHash[:6], s.swapHash[:6])
return false, nil
}
}
}
// Past this point we know that a new incoming sweep passes the
// acceptance criteria and is now ready to be added to this batch.
// If this is the first sweep being added to the batch, make it the
// primary sweep.
if b.primarySweepID == lntypes.ZeroHash {
b.primarySweepID = sweep.swapHash
b.cfg.batchConfTarget = sweep.confTarget
b.rbfCache.FeeRate = sweep.minFeeRate
b.rbfCache.SkipNextBump = true
// We also need to start the spend monitor for this new primary
// sweep.
err := b.monitorSpend(ctx, *sweep)
if err != nil {
return false, err
}
}
// Add the sweep to the batch's sweeps.
b.Infof("adding sweep %x", sweep.swapHash[:6])
b.sweeps[sweep.swapHash] = *sweep
// Update FeeRate. Max(sweep.minFeeRate) for all the sweeps of
// the batch is the basis for fee bumps.
if b.rbfCache.FeeRate < sweep.minFeeRate {
b.rbfCache.FeeRate = sweep.minFeeRate
b.rbfCache.SkipNextBump = true
}
return true, b.persistSweep(ctx, *sweep, false)
}
// sweepExists returns true if the batch contains the sweep with the given hash.
func (b *batch) sweepExists(hash lntypes.Hash) bool {
done, err := b.scheduleNextCall()
defer done()
if err != nil {
return false
}
_, ok := b.sweeps[hash]
return ok
}
// Wait waits for the batch to gracefully stop.
func (b *batch) Wait() {
b.Infof("Stopping")
<-b.finished
}
// stillWaitingMsg is the format of the message printed if the batch is about
// to publish, but initial delay has not ended yet.
const stillWaitingMsg = "Skipping publishing, initial delay will end at " +
"%v, now is %v."
// Run is the batch's main event loop.
func (b *batch) Run(ctx context.Context) error {
runCtx, cancel := context.WithCancel(ctx)
defer func() {
cancel()
close(b.stopping)
// Make sure not to call b.wg.Wait from any other place to avoid
// race condition between b.wg.Add(1) and b.wg.Wait().
b.wg.Wait()
close(b.finished)
}()
if b.muSig2SignSweep == nil && b.cfg.customMuSig2Signer == nil {
return fmt.Errorf("no musig2 signer available")
}
if b.muSig2SignSweep != nil && b.cfg.customMuSig2Signer != nil {
return fmt.Errorf("both musig2 signers provided")
}
// Cache clock variable.
clock := b.cfg.clock
blockChan, blockErrChan, err :=
b.chainNotifier.RegisterBlockEpochNtfn(runCtx)
if err != nil {
return err
}
// Set currentHeight here, because it may be needed in monitorSpend.
select {
case b.currentHeight = <-blockChan:
b.Debugf("initial height for the batch is %v", b.currentHeight)
case <-runCtx.Done():
return runCtx.Err()
}
// If a primary sweep exists we immediately start monitoring for its
// spend.
if b.primarySweepID != lntypes.ZeroHash {
sweep := b.sweeps[b.primarySweepID]
err := b.monitorSpend(runCtx, sweep)
if err != nil {
return err
}
}
// skipBefore is the time before which we skip batch publishing.
// This is needed to facilitate better grouping of sweeps.
// For batches loaded from DB initialDelay should be 0.
skipBefore := clock.Now().Add(b.cfg.initialDelay)
// initialDelayChan is a timer which fires upon initial delay end.
// If initialDelay is set to 0, it will not trigger to avoid setting up
// timerChan twice, which could lead to double publishing if
// batchPublishDelay is also 0.
var initialDelayChan <-chan time.Time
if b.cfg.initialDelay > 0 {
initialDelayChan = clock.TickAfter(b.cfg.initialDelay)
}
// We use a timer in order to not publish new transactions at the same
// time as the block epoch notification. This is done to prevent
// unnecessary transaction publishments when a spend is detected on that
// block. This timer starts after new block arrives (including the
// current tip which we read from blockChan above) or when initialDelay
// completes.
timerChan := clock.TickAfter(b.cfg.batchPublishDelay)
b.Infof("started, primary %x, total sweeps %v",
b.primarySweepID[0:6], len(b.sweeps))
for {
select {
case <-b.callEnter:
<-b.callLeave
// blockChan provides immediately the current tip.
case height := <-blockChan:
b.Debugf("received block %v", height)
// Set the timer to publish the batch transaction after
// the configured delay.
timerChan = clock.TickAfter(b.cfg.batchPublishDelay)
b.currentHeight = height
case <-initialDelayChan:
b.Debugf("initial delay of duration %v has ended",
b.cfg.initialDelay)
// Set the timer to publish the batch transaction after
// the configured delay.
timerChan = clock.TickAfter(b.cfg.batchPublishDelay)
case <-timerChan:
// Check that batch is still open.
if b.state != Open {
b.Debugf("Skipping publishing, because "+
"the batch is not open (%v).", b.state)
continue
}
// If the batch became urgent, skipBefore is set to now.
if b.isUrgent(skipBefore) {
skipBefore = clock.Now()
}
// Check that the initial delay has ended. We have also
// batchPublishDelay on top of initialDelay, so if
// initialDelayChan has just fired, this check passes.
now := clock.Now()
if skipBefore.After(now) {
b.Debugf(stillWaitingMsg, skipBefore, now)
continue
}
err := b.publish(ctx)
if err != nil {
return err
}
case spend := <-b.spendChan:
err := b.handleSpend(runCtx, spend.SpendingTx)
if err != nil {
return err
}
case <-b.confChan:
return b.handleConf(runCtx)
case <-b.reorgChan:
b.state = Open
b.Warnf("reorg detected, batch is able to " +
"accept new sweeps")
err := b.monitorSpend(ctx, b.sweeps[b.primarySweepID])
if err != nil {
return err
}
case testReq := <-b.testReqs:
testReq.handler()
close(testReq.quit)
case err := <-blockErrChan:
return err
case err := <-b.errChan:
return err
case <-runCtx.Done():
return runCtx.Err()
}
}
}
// testRunInEventLoop runs a function in the event loop blocking until
// the function returns. For unit tests only!
func (b *batch) testRunInEventLoop(ctx context.Context, handler func()) {
// If the event loop is finished, run the function.
select {
case <-b.stopping:
handler()
return
default:
}
quit := make(chan struct{})
req := &testRequest{
handler: handler,
quit: quit,
}
select {
case b.testReqs <- req:
case <-ctx.Done():
return
}
select {
case <-quit:
case <-ctx.Done():
}
}
// timeout returns minimum timeout as block height among sweeps of the batch.
// If the batch is empty, return -1.
func (b *batch) timeout() int32 {
// Find minimum among sweeps' timeouts.
minTimeout := int32(-1)
for _, sweep := range b.sweeps {
if minTimeout == -1 || minTimeout > sweep.timeout {
minTimeout = sweep.timeout
}
}
return minTimeout
}
// isUrgent checks if the batch became urgent. This is determined by comparing
// the remaining number of blocks until timeout to the initial delay remained,
// given one block is 10 minutes.
func (b *batch) isUrgent(skipBefore time.Time) bool {
timeout := b.timeout()
if timeout <= 0 {
// This may happen if the batch is empty or if SweepInfo.Timeout
// is not set, may be possible in tests or if there is a bug.
b.Warnf("Method timeout() returned %v. Number of "+
"sweeps: %d. It may be an empty batch.",
timeout, len(b.sweeps))
return false
}
if b.currentHeight == 0 {
// currentHeight is not initiated yet.
return false
}
blocksToTimeout := timeout - b.currentHeight
const blockTime = 10 * time.Minute
timeBank := time.Duration(blocksToTimeout) * blockTime
// We want to have at least 2x as much time to be safe.
const safetyFactor = 2
remainingWaiting := skipBefore.Sub(b.cfg.clock.Now())
if timeBank >= safetyFactor*remainingWaiting {
// There is enough time, keep waiting.
return false
}
b.Debugf("cancelling waiting for urgent sweep (timeBank is %v, "+
"remainingWaiting is %v)", timeBank, remainingWaiting)
// Signal to the caller to cancel initialDelay.
return true
}
// isPresigned returns if the batch uses presigned mode. Currently presigned and
// non-presigned sweeps never appear in the same batch. Fails if the batch is
// empty or contains both presigned and regular sweeps.
func (b *batch) isPresigned() (bool, error) {
var (
hasPresigned bool
hasRegular bool
)
for _, sweep := range b.sweeps {
if sweep.presigned {
hasPresigned = true
} else {
hasRegular = true
}
}
switch {
case hasPresigned && !hasRegular:
return true, nil
case !hasPresigned && hasRegular:
return false, nil
case hasPresigned && hasRegular:
return false, fmt.Errorf("the batch has both presigned and " +
"non-presigned sweeps")
default:
return false, fmt.Errorf("the batch is empty")
}
}
// publish creates and publishes the latest batch transaction to the network.
func (b *batch) publish(ctx context.Context) error {
var (
err error
fee btcutil.Amount
signSuccess bool
)
if len(b.sweeps) == 0 {
b.Debugf("skipping publish: no sweeps in the batch")
return nil
}
// Run the RBF rate update.
err = b.updateRbfRate(ctx)
if err != nil {
return err
}
// logPublishError is a function which logs publish errors.
logPublishError := func(errMsg string, err error) {
b.publishErrorHandler(err, errMsg, b.log())
}
// Determine if we should use presigned mode for the batch.
presigned, err := b.isPresigned()
if err != nil {
return fmt.Errorf("failed to determine if the batch %d uses "+
"presigned mode: %w", b.id, err)
}
if presigned {
fee, err, signSuccess = b.publishPresigned(ctx)
} else {
fee, err, signSuccess = b.publishMixedBatch(ctx)
}
if err != nil {
if signSuccess {