Skip to content

Commit df2db80

Browse files
authored
Merge pull request #634 from GeorgeTsagk/sweep-batcher
Loop Out Sweep Batcher
2 parents e9d374a + 6f75a11 commit df2db80

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+6013
-1812
lines changed

client.go

+30-18
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ import (
1717
"github.com/lightninglabs/loop/loopdb"
1818
"github.com/lightninglabs/loop/swap"
1919
"github.com/lightninglabs/loop/sweep"
20+
"github.com/lightninglabs/loop/sweepbatcher"
21+
"github.com/lightninglabs/loop/utils"
2022
"github.com/lightningnetwork/lnd/lntypes"
2123
"github.com/lightningnetwork/lnd/routing/route"
2224
"google.golang.org/grpc"
@@ -60,7 +62,7 @@ var (
6062
// probeTimeout is the maximum time until a probe is allowed to take.
6163
probeTimeout = 3 * time.Minute
6264

63-
republishDelay = 10 * time.Second
65+
repushDelay = 1 * time.Second
6466

6567
// MinerFeeEstimationFailed is a magic number that is returned in a
6668
// quote call as the miner fee if the fee estimation in lnd's wallet
@@ -133,7 +135,8 @@ type ClientConfig struct {
133135

134136
// NewClient returns a new instance to initiate swaps with.
135137
func NewClient(dbDir string, loopDB loopdb.SwapStore,
136-
cfg *ClientConfig) (*Client, func(), error) {
138+
sweeperDb sweepbatcher.BatcherStore, cfg *ClientConfig) (
139+
*Client, func(), error) {
137140

138141
lsatStore, err := lsat.NewFileStore(dbDir)
139142
if err != nil {
@@ -161,27 +164,36 @@ func NewClient(dbDir string, loopDB loopdb.SwapStore,
161164
Lnd: cfg.Lnd,
162165
}
163166

167+
verifySchnorrSig := func(pubKey *btcec.PublicKey, hash, sig []byte) error {
168+
schnorrSig, err := schnorr.ParseSignature(sig)
169+
if err != nil {
170+
return err
171+
}
172+
173+
if !schnorrSig.Verify(hash, pubKey) {
174+
return fmt.Errorf("invalid signature")
175+
}
176+
177+
return nil
178+
}
179+
180+
batcher := sweepbatcher.NewBatcher(
181+
cfg.Lnd.WalletKit, cfg.Lnd.ChainNotifier, cfg.Lnd.Signer,
182+
swapServerClient.MultiMuSig2SignSweep, verifySchnorrSig,
183+
cfg.Lnd.ChainParams, sweeperDb, loopDB,
184+
)
185+
164186
executor := newExecutor(&executorConfig{
165187
lnd: cfg.Lnd,
166188
store: loopDB,
167189
sweeper: sweeper,
190+
batcher: batcher,
168191
createExpiryTimer: config.CreateExpiryTimer,
169192
loopOutMaxParts: cfg.LoopOutMaxParts,
170193
totalPaymentTimeout: cfg.TotalPaymentTimeout,
171194
maxPaymentRetries: cfg.MaxPaymentRetries,
172195
cancelSwap: swapServerClient.CancelLoopOutSwap,
173-
verifySchnorrSig: func(pubKey *btcec.PublicKey, hash, sig []byte) error {
174-
schnorrSig, err := schnorr.ParseSignature(sig)
175-
if err != nil {
176-
return err
177-
}
178-
179-
if !schnorrSig.Verify(hash, pubKey) {
180-
return fmt.Errorf("invalid signature")
181-
}
182-
183-
return nil
184-
},
196+
verifySchnorrSig: verifySchnorrSig,
185197
})
186198

187199
client := &Client{
@@ -232,7 +244,7 @@ func (s *Client) FetchSwaps(ctx context.Context) ([]*SwapInfo, error) {
232244
LastUpdate: swp.LastUpdateTime(),
233245
}
234246

235-
htlc, err := GetHtlc(
247+
htlc, err := utils.GetHtlc(
236248
swp.Hash, &swp.Contract.SwapContract,
237249
s.lndServices.ChainParams,
238250
)
@@ -265,7 +277,7 @@ func (s *Client) FetchSwaps(ctx context.Context) ([]*SwapInfo, error) {
265277
LastUpdate: swp.LastUpdateTime(),
266278
}
267279

268-
htlc, err := GetHtlc(
280+
htlc, err := utils.GetHtlc(
269281
swp.Hash, &swp.Contract.SwapContract,
270282
s.lndServices.ChainParams,
271283
)
@@ -540,7 +552,7 @@ func (s *Client) getLoopOutSweepFee(ctx context.Context, confTarget int32) (
540552
return 0, err
541553
}
542554

543-
scriptVersion := GetHtlcScriptVersion(
555+
scriptVersion := utils.GetHtlcScriptVersion(
544556
loopdb.CurrentProtocolVersion(),
545557
)
546558

@@ -731,7 +743,7 @@ func (s *Client) estimateFee(ctx context.Context, amt btcutil.Amount,
731743
// Generate a dummy address for fee estimation.
732744
witnessProg := [32]byte{}
733745

734-
scriptVersion := GetHtlcScriptVersion(
746+
scriptVersion := utils.GetHtlcScriptVersion(
735747
loopdb.CurrentProtocolVersion(),
736748
)
737749

client_test.go

+21-14
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/lightninglabs/loop/loopdb"
1414
"github.com/lightninglabs/loop/swap"
1515
"github.com/lightninglabs/loop/test"
16+
"github.com/lightninglabs/loop/utils"
1617
"github.com/lightningnetwork/lnd/lnrpc"
1718
"github.com/lightningnetwork/lnd/lntypes"
1819
"github.com/stretchr/testify/require"
@@ -146,8 +147,6 @@ func TestLoopOutFailWrongAmount(t *testing.T) {
146147
// TestLoopOutResume tests that swaps in various states are properly resumed
147148
// after a restart.
148149
func TestLoopOutResume(t *testing.T) {
149-
defer test.Guard(t)()
150-
151150
defaultConfs := loopdb.DefaultLoopOutHtlcConfirmations
152151

153152
storedVersion := []loopdb.ProtocolVersion{
@@ -279,7 +278,7 @@ func testLoopOutResume(t *testing.T, confs uint32, expired, preimageRevealed,
279278
preimageRevealed, int32(confs),
280279
)
281280

282-
htlc, err := GetHtlc(
281+
htlc, err := utils.GetHtlc(
283282
hash, &pendingSwap.Contract.SwapContract,
284283
&chaincfg.TestNet3Params,
285284
)
@@ -304,7 +303,7 @@ func testLoopOutResume(t *testing.T, confs uint32, expired, preimageRevealed,
304303
func(r error) {},
305304
func(r error) {},
306305
preimageRevealed,
307-
confIntent, GetHtlcScriptVersion(protocolVersion),
306+
confIntent, utils.GetHtlcScriptVersion(protocolVersion),
308307
)
309308
}
310309

@@ -317,15 +316,28 @@ func testLoopOutSuccess(ctx *testContext, amt btcutil.Amount, hash lntypes.Hash,
317316

318317
signalPrepaymentResult(nil)
319318

320-
ctx.AssertRegisterSpendNtfn(confIntent.PkScript)
321-
322319
// Assert that a call to track payment was sent, and respond with status
323320
// in flight so that our swap will push its preimage to the server.
324321
ctx.trackPayment(lnrpc.Payment_IN_FLIGHT)
325322

323+
// We need to notify the height, as the loopout is going to attempt a
324+
// sweep when a new block is received.
325+
err := ctx.Lnd.NotifyHeight(ctx.Lnd.Height + 1)
326+
require.NoError(ctx.Context.T, err)
327+
326328
// Publish tick.
327329
ctx.expiryChan <- testTime
328330

331+
// One spend notifier is registered by batch to watch primary sweep.
332+
ctx.AssertRegisterSpendNtfn(confIntent.PkScript)
333+
334+
ctx.AssertEpochListeners(2)
335+
336+
// Mock the blockheight again as that's when the batch will broadcast
337+
// the tx.
338+
err = ctx.Lnd.NotifyHeight(ctx.Lnd.Height + 1)
339+
require.NoError(ctx.Context.T, err)
340+
329341
// Expect a signing request in the non taproot case.
330342
if scriptVersion != swap.HtlcV3 {
331343
<-ctx.Context.Lnd.SignOutputRawChannel
@@ -340,14 +352,7 @@ func testLoopOutSuccess(ctx *testContext, amt btcutil.Amount, hash lntypes.Hash,
340352
// preimage before sweeping in order for the server to trust us with
341353
// our MuSig2 signing attempts.
342354
if scriptVersion == swap.HtlcV3 {
343-
ctx.assertPreimagePush(ctx.store.loopOutSwaps[hash].Preimage)
344-
345-
// Try MuSig2 signing first and fail it so that we go for a
346-
// normal sweep.
347-
for i := 0; i < maxMusigSweepRetries; i++ {
348-
ctx.expiryChan <- testTime
349-
ctx.assertPreimagePush(ctx.store.loopOutSwaps[hash].Preimage)
350-
}
355+
ctx.assertPreimagePush(ctx.store.LoopOutSwaps[hash].Preimage)
351356
<-ctx.Context.Lnd.SignOutputRawChannel
352357
}
353358

@@ -388,6 +393,8 @@ func testLoopOutSuccess(ctx *testContext, amt btcutil.Amount, hash lntypes.Hash,
388393

389394
ctx.NotifySpend(sweepTx, 0)
390395

396+
ctx.AssertRegisterConf(true, 3)
397+
391398
ctx.assertStatus(loopdb.StateSuccess)
392399

393400
ctx.assertStoreFinished(loopdb.StateSuccess)

cmd/loop/loopout.go

+1
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,7 @@ func loopOut(ctx *cli.Context) error {
238238
resp, err := client.LoopOut(context.Background(), &looprpc.LoopOutRequest{
239239
Amt: int64(amt),
240240
Dest: destAddr,
241+
IsExternalAddr: destAddr != "",
241242
Account: account,
242243
AccountAddrType: accountAddrType,
243244
MaxMinerFee: int64(limits.maxMinerFee),

executor.go

+23
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/lightninglabs/lndclient"
1414
"github.com/lightninglabs/loop/loopdb"
1515
"github.com/lightninglabs/loop/sweep"
16+
"github.com/lightninglabs/loop/sweepbatcher"
1617
"github.com/lightningnetwork/lnd/lntypes"
1718
"github.com/lightningnetwork/lnd/queue"
1819
)
@@ -23,6 +24,8 @@ type executorConfig struct {
2324

2425
sweeper *sweep.Sweeper
2526

27+
batcher *sweepbatcher.Batcher
28+
2629
store loopdb.SwapStore
2730

2831
createExpiryTimer func(expiry time.Duration) <-chan time.Time
@@ -71,6 +74,7 @@ func (s *executor) run(mainCtx context.Context,
7174
err error
7275
blockEpochChan <-chan int32
7376
blockErrorChan <-chan error
77+
batcherErrChan chan error
7478
)
7579

7680
for {
@@ -121,6 +125,21 @@ func (s *executor) run(mainCtx context.Context,
121125
return mainCtx.Err()
122126
}
123127

128+
batcherErrChan = make(chan error, 1)
129+
130+
s.wg.Add(1)
131+
go func() {
132+
defer s.wg.Done()
133+
134+
err := s.batcher.Run(mainCtx)
135+
if err != nil {
136+
select {
137+
case batcherErrChan <- err:
138+
case <-mainCtx.Done():
139+
}
140+
}
141+
}()
142+
124143
// Start main event loop.
125144
log.Infof("Starting event loop at height %v", height)
126145

@@ -156,6 +175,7 @@ func (s *executor) run(mainCtx context.Context,
156175
err := newSwap.execute(mainCtx, &executeConfig{
157176
statusChan: statusChan,
158177
sweeper: s.sweeper,
178+
batcher: s.batcher,
159179
blockEpochChan: queue.ChanOut(),
160180
timerFactory: s.executorConfig.createExpiryTimer,
161181
loopOutMaxParts: s.executorConfig.loopOutMaxParts,
@@ -211,6 +231,9 @@ func (s *executor) run(mainCtx context.Context,
211231
case err := <-blockErrorChan:
212232
return fmt.Errorf("block error: %v", err)
213233

234+
case err := <-batcherErrChan:
235+
return fmt.Errorf("batcher error: %v", err)
236+
214237
case <-mainCtx.Done():
215238
return mainCtx.Err()
216239
}

interface.go

+5
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@ type OutRequest struct {
2020
// Destination address for the swap.
2121
DestAddr btcutil.Address
2222

23+
// IsExternalAddr indicates whether the provided destination address
24+
// does not belong to the underlying wallet. This helps indicate
25+
// whether the sweep of this swap can be batched or not.
26+
IsExternalAddr bool
27+
2328
// MaxSwapRoutingFee is the maximum off-chain fee in msat that may be
2429
// paid for payment to the server. This limit is applied during path
2530
// finding. Typically this value is taken from the response of the

labels/lnd_labels.go

+7
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ const (
1717
// loopInTimeout is the label used for loop in swaps to sweep an HTLC
1818
// that has timed out.
1919
loopInSweepTimeout = "InSweepTimeout"
20+
21+
loopOutBatchSweepSuccess = "BatchOutSweepSuccess -- %d"
2022
)
2123

2224
// LoopOutSweepSuccess returns the label used for loop out swaps to sweep the
@@ -25,6 +27,11 @@ func LoopOutSweepSuccess(swapHash string) string {
2527
return fmt.Sprintf(loopdLabelPattern, loopOutSweepSuccess, swapHash)
2628
}
2729

30+
// LoopOutBatchSweepSuccess returns the label used for loop out sweep batcher.
31+
func LoopOutBatchSweepSuccess(batchID int32) string {
32+
return fmt.Sprintf(loopOutBatchSweepSuccess, batchID)
33+
}
34+
2835
// LoopInHtlcLabel returns the label used for loop in swaps to publish an HTLC.
2936
func LoopInHtlcLabel(swapHash string) string {
3037
return fmt.Sprintf(loopdLabelPattern, loopInHtlc, swapHash)

liquidity/autoloop_test.go

+2
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,7 @@ func TestAutoloopAddress(t *testing.T) {
422422
Amount: amt,
423423
// Define the expected destination address.
424424
DestAddr: addr,
425+
IsExternalAddr: true,
425426
MaxSwapRoutingFee: maxRouteFee,
426427
MaxPrepayRoutingFee: ppmToSat(
427428
quote1.PrepayAmount, prepayFeePPM,
@@ -439,6 +440,7 @@ func TestAutoloopAddress(t *testing.T) {
439440
Amount: amt,
440441
// Define the expected destination address.
441442
DestAddr: addr,
443+
IsExternalAddr: true,
442444
MaxSwapRoutingFee: maxRouteFee,
443445
MaxPrepayRoutingFee: ppmToSat(
444446
quote2.PrepayAmount, routeFeePPM,

liquidity/liquidity.go

+7
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,13 @@ func (m *Manager) autoloop(ctx context.Context) error {
450450
// Create a copy of our range var so that we can reference it.
451451
swap := swap
452452

453+
// Check if the parameter for custom address is defined for loop
454+
// outs.
455+
if m.params.DestAddr != nil {
456+
swap.DestAddr = m.params.DestAddr
457+
swap.IsExternalAddr = true
458+
}
459+
453460
go m.dispatchStickyLoopOut(
454461
ctx, swap, defaultAmountBackoffRetry,
455462
defaultAmountBackoff,

liquidity/loopout_builder.go

+3
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ func (b *loopOutBuilder) buildSwap(ctx context.Context, pubkey route.Vertex,
138138
// already validated them.
139139
request := loop.OutRequest{
140140
Amount: amount,
141+
IsExternalAddr: false,
141142
OutgoingChanSet: chanSet,
142143
MaxPrepayRoutingFee: prepayMaxFee,
143144
MaxSwapRoutingFee: routeMaxFee,
@@ -160,9 +161,11 @@ func (b *loopOutBuilder) buildSwap(ctx context.Context, pubkey route.Vertex,
160161
if len(params.Account) > 0 {
161162
account = params.Account
162163
addrType = params.AccountAddrType
164+
request.IsExternalAddr = true
163165
}
164166
if params.DestAddr != nil {
165167
request.DestAddr = params.DestAddr
168+
request.IsExternalAddr = true
166169
} else {
167170
addr, err := b.cfg.Lnd.WalletKit.NextAddr(
168171
ctx, account, addrType, false,

loopd/daemon.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/lightninglabs/loop"
1919
"github.com/lightninglabs/loop/loopd/perms"
2020
"github.com/lightninglabs/loop/loopdb"
21+
"github.com/lightninglabs/loop/sweepbatcher"
2122

2223
"github.com/lightninglabs/loop/instantout/reservation"
2324
loop_looprpc "github.com/lightninglabs/loop/looprpc"
@@ -412,9 +413,11 @@ func (d *Daemon) initialize(withMacaroonService bool) error {
412413
return err
413414
}
414415

416+
sweeperDb := sweepbatcher.NewSQLStore(baseDb, chainParams)
417+
415418
// Create an instance of the loop client library.
416419
swapClient, clientCleanup, err := getClient(
417-
d.cfg, swapDb, &d.lnd.LndServices,
420+
d.cfg, swapDb, sweeperDb, &d.lnd.LndServices,
418421
)
419422
if err != nil {
420423
return err

loopd/log.go

+2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/lightninglabs/loop/instantout/reservation"
1010
"github.com/lightninglabs/loop/liquidity"
1111
"github.com/lightninglabs/loop/loopdb"
12+
"github.com/lightninglabs/loop/sweepbatcher"
1213
"github.com/lightningnetwork/lnd"
1314
"github.com/lightningnetwork/lnd/build"
1415
"github.com/lightningnetwork/lnd/signal"
@@ -32,6 +33,7 @@ func SetupLoggers(root *build.RotatingLogWriter, intercept signal.Interceptor) {
3233

3334
lnd.SetSubLogger(root, Subsystem, log)
3435
lnd.AddSubLogger(root, "LOOP", intercept, loop.UseLogger)
36+
lnd.AddSubLogger(root, "SWEEP", intercept, sweepbatcher.UseLogger)
3537
lnd.AddSubLogger(root, "LNDC", intercept, lndclient.UseLogger)
3638
lnd.AddSubLogger(root, "STORE", intercept, loopdb.UseLogger)
3739
lnd.AddSubLogger(root, lsat.Subsystem, intercept, lsat.UseLogger)

0 commit comments

Comments
 (0)