diff --git a/deployment/modules/gcp/storage/main.tf b/deployment/modules/gcp/storage/main.tf index a36edd4d..b24b29d2 100644 --- a/deployment/modules/gcp/storage/main.tf +++ b/deployment/modules/gcp/storage/main.tf @@ -60,6 +60,6 @@ resource "google_spanner_database" "dedup_db" { instance = google_spanner_instance.log_spanner.name name = "${var.base_name}-dedup-db" ddl = [ - "CREATE TABLE IDSeq (id INT64 NOT NULL, h BYTES(MAX) NOT NULL, idx INT64 NOT NULL,) PRIMARY KEY (id, h)", + "CREATE TABLE IDSeq (id INT64 NOT NULL, h BYTES(MAX) NOT NULL, idx INT64 NOT NULL, timestamp INT64 NOT NULL,) PRIMARY KEY (id, h)", ] } diff --git a/handlers.go b/handlers.go index 4a2e327e..7375c047 100644 --- a/handlers.go +++ b/handlers.go @@ -34,6 +34,7 @@ import ( "github.com/google/certificate-transparency-go/x509" "github.com/google/certificate-transparency-go/x509util" "github.com/google/trillian/monitoring" + "github.com/transparency-dev/static-ct/modules/dedup" tessera "github.com/transparency-dev/trillian-tessera" "github.com/transparency-dev/trillian-tessera/ctonly" "k8s.io/klog/v2" @@ -319,13 +320,15 @@ func addChainInternal(ctx context.Context, li *logInfo, w http.ResponseWriter, r } klog.V(2).Infof("%s: %s => storage.GetCertIndex", li.LogOrigin, method) - idx, isDup, err := li.storage.GetCertIndex(ctx, chain[0]) + sctDedupInfo, isDup, err := li.storage.GetCertDedupInfo(ctx, chain[0]) + idx := sctDedupInfo.Idx if err != nil { return http.StatusInternalServerError, fmt.Errorf("couldn't deduplicate the request: %s", err) } if isDup { klog.V(3).Infof("%s: %s - found duplicate entry at index %d", li.LogOrigin, method, idx) + entry.Timestamp = sctDedupInfo.Timestamp } else { if err := li.storage.AddIssuerChain(ctx, chain[1:]); err != nil { return http.StatusInternalServerError, fmt.Errorf("failed to store issuer chain: %s", err) @@ -344,7 +347,7 @@ func addChainInternal(ctx context.Context, li *logInfo, w http.ResponseWriter, r // It might be stored again later, if a local deduplication storage is synced, potentially // with a smaller value. klog.V(2).Infof("%s: %s => storage.AddCertIndex", li.LogOrigin, method) - err := li.storage.AddCertIndex(ctx, chain[0], idx) + err = li.storage.AddCertDedupInfo(ctx, chain[0], dedup.SCTDedupInfo{Idx: idx, Timestamp: entry.Timestamp}) // TODO: block log writes if deduplication breaks if err != nil { klog.Warningf("AddCertIndex(): failed to store certificate index: %v", err) diff --git a/handlers_test.go b/handlers_test.go index 5212ba85..8e3c04e9 100644 --- a/handlers_test.go +++ b/handlers_test.go @@ -36,6 +36,7 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" "github.com/google/trillian/monitoring" "github.com/transparency-dev/static-ct/mockstorage" + "github.com/transparency-dev/static-ct/modules/dedup" "github.com/transparency-dev/static-ct/testdata" "github.com/transparency-dev/trillian-tessera/ctonly" "google.golang.org/grpc/codes" @@ -291,10 +292,10 @@ func TestAddChainWhitespace(t *testing.T) { for _, test := range tests { t.Run(test.descr, func(t *testing.T) { if test.want == http.StatusOK { - info.storage.EXPECT().GetCertIndex(deadlineMatcher(), cmpMatcher{leafChain[0]}).Return(uint64(0), false, nil) + info.storage.EXPECT().GetCertDedupInfo(deadlineMatcher(), cmpMatcher{leafChain[0]}).Return(dedup.SCTDedupInfo{Idx: uint64(0), Timestamp: fakeTimeMillis}, false, nil) info.storage.EXPECT().AddIssuerChain(deadlineMatcher(), cmpMatcher{leafChain[1:]}).Return(nil) info.storage.EXPECT().Add(deadlineMatcher(), cmpMatcher{req}).Return(func() (uint64, error) { return rsp, nil }) - info.storage.EXPECT().AddCertIndex(deadlineMatcher(), cmpMatcher{leafChain[0]}, uint64(0)).Return(nil) + info.storage.EXPECT().AddCertDedupInfo(deadlineMatcher(), cmpMatcher{leafChain[0]}, dedup.SCTDedupInfo{Idx: uint64(0), Timestamp: fakeTimeMillis}).Return(nil) } recorder := httptest.NewRecorder() @@ -367,11 +368,11 @@ func TestAddChain(t *testing.T) { if len(test.toSign) > 0 { req, leafChain := parseChain(t, false, test.chain, info.roots.RawCertificates()[0]) rsp := uint64(0) - info.storage.EXPECT().GetCertIndex(deadlineMatcher(), cmpMatcher{leafChain[0]}).Return(uint64(0), false, nil) + info.storage.EXPECT().GetCertDedupInfo(deadlineMatcher(), cmpMatcher{leafChain[0]}).Return(dedup.SCTDedupInfo{Idx: uint64(0), Timestamp: fakeTimeMillis}, false, nil) info.storage.EXPECT().AddIssuerChain(deadlineMatcher(), cmpMatcher{leafChain[1:]}).Return(nil) info.storage.EXPECT().Add(deadlineMatcher(), cmpMatcher{req}).Return(func() (uint64, error) { return rsp, test.err }) if test.want == http.StatusOK { - info.storage.EXPECT().AddCertIndex(deadlineMatcher(), cmpMatcher{leafChain[0]}, uint64(0)).Return(nil) + info.storage.EXPECT().AddCertDedupInfo(deadlineMatcher(), cmpMatcher{leafChain[0]}, dedup.SCTDedupInfo{Idx: uint64(0), Timestamp: fakeTimeMillis}).Return(nil) } } @@ -460,11 +461,11 @@ func TestAddPrechain(t *testing.T) { if len(test.toSign) > 0 { req, leafChain := parseChain(t, true, test.chain, info.roots.RawCertificates()[0]) rsp := uint64(0) - info.storage.EXPECT().GetCertIndex(deadlineMatcher(), cmpMatcher{leafChain[0]}).Return(uint64(0), false, nil) + info.storage.EXPECT().GetCertDedupInfo(deadlineMatcher(), cmpMatcher{leafChain[0]}).Return(dedup.SCTDedupInfo{Idx: uint64(0), Timestamp: fakeTimeMillis}, false, nil) info.storage.EXPECT().AddIssuerChain(deadlineMatcher(), cmpMatcher{leafChain[1:]}).Return(nil) info.storage.EXPECT().Add(deadlineMatcher(), cmpMatcher{req}).Return(func() (uint64, error) { return rsp, test.err }) if test.want == http.StatusOK { - info.storage.EXPECT().AddCertIndex(deadlineMatcher(), cmpMatcher{leafChain[0]}, uint64(0)).Return(nil) + info.storage.EXPECT().AddCertDedupInfo(deadlineMatcher(), cmpMatcher{leafChain[0]}, dedup.SCTDedupInfo{Idx: uint64(0), Timestamp: fakeTimeMillis}).Return(nil) } } @@ -484,7 +485,7 @@ func TestAddPrechain(t *testing.T) { if got, want := resp.ID, demoLogID[:]; !bytes.Equal(got, want) { t.Errorf("resp.ID=%x; want %x", got, want) } - if got, want := resp.Timestamp, uint64(1469185273000); got != want { + if got, want := resp.Timestamp, fakeTimeMillis; got != want { t.Errorf("resp.Timestamp=%d; want %d", got, want) } if got, want := hex.EncodeToString(resp.Signature), "040300067369676e6564"; got != want { diff --git a/mockstorage/mock_ct_storage.go b/mockstorage/mock_ct_storage.go index 7897fb66..14b0ad1d 100644 --- a/mockstorage/mock_ct_storage.go +++ b/mockstorage/mock_ct_storage.go @@ -10,6 +10,7 @@ import ( gomock "github.com/golang/mock/gomock" x509 "github.com/google/certificate-transparency-go/x509" + dedup "github.com/transparency-dev/static-ct/modules/dedup" tessera "github.com/transparency-dev/trillian-tessera" ctonly "github.com/transparency-dev/trillian-tessera/ctonly" ) @@ -51,18 +52,18 @@ func (mr *MockStorageMockRecorder) Add(arg0, arg1 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Add", reflect.TypeOf((*MockStorage)(nil).Add), arg0, arg1) } -// AddCertIndex mocks base method. -func (m *MockStorage) AddCertIndex(arg0 context.Context, arg1 *x509.Certificate, arg2 uint64) error { +// AddCertDedupInfo mocks base method. +func (m *MockStorage) AddCertDedupInfo(arg0 context.Context, arg1 *x509.Certificate, arg2 dedup.SCTDedupInfo) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AddCertIndex", arg0, arg1, arg2) + ret := m.ctrl.Call(m, "AddCertDedupInfo", arg0, arg1, arg2) ret0, _ := ret[0].(error) return ret0 } -// AddCertIndex indicates an expected call of AddCertIndex. -func (mr *MockStorageMockRecorder) AddCertIndex(arg0, arg1, arg2 interface{}) *gomock.Call { +// AddCertDedupInfo indicates an expected call of AddCertDedupInfo. +func (mr *MockStorageMockRecorder) AddCertDedupInfo(arg0, arg1, arg2 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddCertIndex", reflect.TypeOf((*MockStorage)(nil).AddCertIndex), arg0, arg1, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddCertDedupInfo", reflect.TypeOf((*MockStorage)(nil).AddCertDedupInfo), arg0, arg1, arg2) } // AddIssuerChain mocks base method. @@ -79,18 +80,18 @@ func (mr *MockStorageMockRecorder) AddIssuerChain(arg0, arg1 interface{}) *gomoc return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddIssuerChain", reflect.TypeOf((*MockStorage)(nil).AddIssuerChain), arg0, arg1) } -// GetCertIndex mocks base method. -func (m *MockStorage) GetCertIndex(arg0 context.Context, arg1 *x509.Certificate) (uint64, bool, error) { +// GetCertDedupInfo mocks base method. +func (m *MockStorage) GetCertDedupInfo(arg0 context.Context, arg1 *x509.Certificate) (dedup.SCTDedupInfo, bool, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetCertIndex", arg0, arg1) - ret0, _ := ret[0].(uint64) + ret := m.ctrl.Call(m, "GetCertDedupInfo", arg0, arg1) + ret0, _ := ret[0].(dedup.SCTDedupInfo) ret1, _ := ret[1].(bool) ret2, _ := ret[2].(error) return ret0, ret1, ret2 } -// GetCertIndex indicates an expected call of GetCertIndex. -func (mr *MockStorageMockRecorder) GetCertIndex(arg0, arg1 interface{}) *gomock.Call { +// GetCertDedupInfo indicates an expected call of GetCertDedupInfo. +func (mr *MockStorageMockRecorder) GetCertDedupInfo(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetCertIndex", reflect.TypeOf((*MockStorage)(nil).GetCertIndex), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetCertDedupInfo", reflect.TypeOf((*MockStorage)(nil).GetCertDedupInfo), arg0, arg1) } diff --git a/modules/dedup/dedup.go b/modules/dedup/dedup.go index bee87674..465b150f 100644 --- a/modules/dedup/dedup.go +++ b/modules/dedup/dedup.go @@ -28,25 +28,31 @@ import ( "k8s.io/klog/v2" ) -// LeafIdx holds a LeafID and an Idx for deduplication -type LeafIdx struct { +// LeafDedupInfo enables building deduplicated add-pre-chain/add-chain responses. +type LeafDedupInfo struct { LeafID []byte - Idx uint64 + SCTDedupInfo +} + +// SCTDedupInfo contains data to build idempotent SCTs. +type SCTDedupInfo struct { + Idx uint64 + Timestamp uint64 } type BEDedupStorage interface { - Add(ctx context.Context, lidxs []LeafIdx) error - Get(ctx context.Context, leafID []byte) (uint64, bool, error) + Add(ctx context.Context, lidxs []LeafDedupInfo) error + Get(ctx context.Context, leafID []byte) (SCTDedupInfo, bool, error) } // TODO: re-architecture to prevent creating a LocaLBEDedupStorage without calling UpdateFromLog type LocalBEDedupStorage interface { - Add(ctx context.Context, lidxs []LeafIdx) error - Get(ctx context.Context, leafID []byte) (uint64, bool, error) + Add(ctx context.Context, lidxs []LeafDedupInfo) error + Get(ctx context.Context, leafID []byte) (SCTDedupInfo, bool, error) LogSize() (uint64, error) } -type ParseBundleFunc func([]byte, uint64) ([]LeafIdx, error) +type ParseBundleFunc func([]byte, uint64) ([]LeafDedupInfo, error) // UpdateFromLog synchronises a local best effort deduplication storage with a log. func UpdateFromLog(ctx context.Context, lds LocalBEDedupStorage, t time.Duration, fcp client.CheckpointFetcherFunc, fb client.EntryBundleFetcherFunc, pb ParseBundleFunc) { @@ -97,12 +103,12 @@ func sync(ctx context.Context, lds LocalBEDedupStorage, pb ParseBundleFunc, fcp } return fmt.Errorf("failed to fetch leaf bundle at index %d: %v", i, err) } - lidxs, err := pb(eRaw, i) + ldis, err := pb(eRaw, i) if err != nil { return fmt.Errorf("parseBundle(): %v", err) } - if err := lds.Add(ctx, lidxs); err != nil { + if err := lds.Add(ctx, ldis); err != nil { return fmt.Errorf("error storing deduplication data for tile %d: %v", i, err) } klog.V(3).Infof("LocalBEDEdup.sync(): stored dedup data for entry bundle %d, %d more bundles to go", i, ckptSize/256-i) diff --git a/serialize.go b/serialize.go index 8ea5282f..a3184222 100644 --- a/serialize.go +++ b/serialize.go @@ -176,15 +176,15 @@ func NewCpSigner(signer crypto.Signer, origin string, logID [32]byte, timeSource return ctSigner } -// DedupFromBundle converts a bundle into an array of {leafID, idx}. +// DedupFromBundle converts a bundle into an array of dedup.LeafDedupInfo. // // The index of a leaf is computed from its position in the log, instead of parsing SCTs. // Greatly inspired by https://github.com/FiloSottile/sunlight/blob/main/tile.go -func DedupFromBundle(bundle []byte, bundleIdx uint64) ([]dedup.LeafIdx, error) { - kvs := []dedup.LeafIdx{} +func DedupFromBundle(bundle []byte, bundleIdx uint64) ([]dedup.LeafDedupInfo, error) { + kvs := []dedup.LeafDedupInfo{} s := cryptobyte.String(bundle) - for len(s) > 0 { + for i := bundleIdx * 256; len(s) > 0; i++ { var timestamp uint64 var entryType uint16 var extensions, fingerprints cryptobyte.String @@ -213,7 +213,8 @@ func DedupFromBundle(bundle []byte, bundleIdx uint64) ([]dedup.LeafIdx, error) { return nil, fmt.Errorf("invalid data tile: unknown type %d", entryType) } k := sha256.Sum256(crt) - kvs = append(kvs, dedup.LeafIdx{LeafID: k[:], Idx: bundleIdx*256 + uint64(len(kvs))}) + sctDedupInfo := dedup.SCTDedupInfo{Idx: uint64(i), Timestamp: timestamp} + kvs = append(kvs, dedup.LeafDedupInfo{LeafID: k[:], SCTDedupInfo: sctDedupInfo}) } return kvs, nil } diff --git a/storage.go b/storage.go index b0772d4c..0f266286 100644 --- a/storage.go +++ b/storage.go @@ -22,9 +22,9 @@ import ( "sync" "github.com/google/certificate-transparency-go/x509" + "github.com/transparency-dev/static-ct/modules/dedup" tessera "github.com/transparency-dev/trillian-tessera" "github.com/transparency-dev/trillian-tessera/ctonly" - "github.com/transparency-dev/static-ct/modules/dedup" "k8s.io/klog/v2" ) @@ -41,10 +41,10 @@ type Storage interface { Add(context.Context, *ctonly.Entry) tessera.IndexFuture // AddIssuerChain stores every the chain certificate in a content-addressable store under their sha256 hash. AddIssuerChain(context.Context, []*x509.Certificate) error - // AddCertIndex stores the index of certificate in a log under its hash. - AddCertIndex(context.Context, *x509.Certificate, uint64) error - // GetCertIndex gets the index of certificate in a log from its hash. - GetCertIndex(context.Context, *x509.Certificate) (uint64, bool, error) + // AddCertDedupInfo stores the SCTDedupInfo of certificate in a log under its hash. + AddCertDedupInfo(context.Context, *x509.Certificate, dedup.SCTDedupInfo) error + // GetCertDedupInfo gets the SCTDedupInfo of certificate in a log from its hash. + GetCertDedupInfo(context.Context, *x509.Certificate) (dedup.SCTDedupInfo, bool, error) } type KV struct { @@ -131,21 +131,21 @@ func cachedStoreIssuers(s IssuerStorage) func(context.Context, []KV) error { } } -// AddCertIndex stores in the deduplication storage. -func (cts CTStorage) AddCertIndex(ctx context.Context, c *x509.Certificate, idx uint64) error { +// AddCertDedupInfo stores in the deduplication storage. +func (cts CTStorage) AddCertDedupInfo(ctx context.Context, c *x509.Certificate, sctDedupInfo dedup.SCTDedupInfo) error { key := sha256.Sum256(c.Raw) - if err := cts.dedupStorage.Add(ctx, []dedup.LeafIdx{{LeafID: key[:], Idx: idx}}); err != nil { - return fmt.Errorf("error storing index %d of %q: %v", idx, hex.EncodeToString(key[:]), err) + if err := cts.dedupStorage.Add(ctx, []dedup.LeafDedupInfo{{LeafID: key[:], SCTDedupInfo: sctDedupInfo}}); err != nil { + return fmt.Errorf("error storing SCTDedupInfo %+v of \"%x\": %v", sctDedupInfo, key, err) } return nil } -// GetCertIndex fetches the index of a given certificate from the deduplication storage. -func (cts CTStorage) GetCertIndex(ctx context.Context, c *x509.Certificate) (uint64, bool, error) { +// GetCertDedupInfo fetches the SCTDedupInfo of a given certificate from the deduplication storage. +func (cts CTStorage) GetCertDedupInfo(ctx context.Context, c *x509.Certificate) (dedup.SCTDedupInfo, bool, error) { key := sha256.Sum256(c.Raw) - idx, ok, err := cts.dedupStorage.Get(ctx, key[:]) + sctC, ok, err := cts.dedupStorage.Get(ctx, key[:]) if err != nil { - return 0, false, fmt.Errorf("error fetching index of %q: %v", hex.EncodeToString(key[:]), err) + return dedup.SCTDedupInfo{}, false, fmt.Errorf("error fetching index of \"%x\": %v", key, err) } - return idx, ok, nil + return sctC, ok, nil } diff --git a/storage/bbolt/dedup.go b/storage/bbolt/dedup.go index 83f52472..20d998e2 100644 --- a/storage/bbolt/dedup.go +++ b/storage/bbolt/dedup.go @@ -15,12 +15,12 @@ // Package bbolt implements modules/dedup using BBolt. // // It contains two buckets: -// - The dedup bucket stores pairs. Entries can either be added after sequencing, -// by the server that received the request, or later when synchronising the dedup storage with -// the log state. +// - The dedup bucket stores pairs. Entries can either be added after +// sequencing, by the server that received the request, or later when synchronising the dedup +// storage with the log state. // - The size bucket has a single entry: <"size", X>, where X is the largest contiguous index -// from 0 that has been inserted in the dedup bucket. This allows to know what is the next -// to add to the bucket in order to have a full represation of the log. +// from 0 that has been inserted in the dedup bucket. This allows to know at what index +// deduplication synchronisation should start in order to have the full picture of a log. // // Calls to Add will update idx to a smaller value, if possible. package bbolt @@ -28,7 +28,6 @@ package bbolt import ( "context" "encoding/binary" - "encoding/hex" "fmt" "github.com/transparency-dev/static-ct/modules/dedup" @@ -48,13 +47,13 @@ type Storage struct { // NewStorage returns a new BBolt storage instance with a dedup and size bucket. // -// The dedup bucket stores pairs. +// The dedup bucket stores pairs, where idx::timestamp is the +// concatenation of two uint64 8 bytes BigEndian representation. // The size bucket has a single entry: <"size", X>, where X is the largest contiguous index from 0 // that has been inserted in the dedup bucket. // // If a database already exists at the provided path, NewStorage will load it. func NewStorage(path string) (*Storage, error) { - // TODO(better logging message) db, err := bolt.Open(path, 0600, nil) if err != nil { return nil, fmt.Errorf("bolt.Open(): %v", err) @@ -98,27 +97,33 @@ func NewStorage(path string) (*Storage, error) { // Add inserts entries in the dedup bucket and updates the size bucket if need be. // -// If an entry is already stored under a given key, Add only updates it if the new value is smaller. +// If an entry already exists under a key, Add only updates the value if the new idx is smaller. // The context is here for consistency with interfaces, but isn't used by BBolt. -func (s *Storage) Add(_ context.Context, lidxs []dedup.LeafIdx) error { - for _, lidx := range lidxs { +func (s *Storage) Add(_ context.Context, ldis []dedup.LeafDedupInfo) error { + for _, ldi := range ldis { err := s.db.Update(func(tx *bolt.Tx) error { db := tx.Bucket([]byte(dedupBucket)) sb := tx.Bucket([]byte(sizeBucket)) + sizeB := sb.Get([]byte("size")) if sizeB == nil { return fmt.Errorf("can't find log size in bucket %q", sizeBucket) } size := btoi(sizeB) + vB, err := vtob(ldi.Idx, ldi.Timestamp) + if err != nil { + return fmt.Errorf("vtob(): %v", err) + } - if old := db.Get(lidx.LeafID); old != nil && btoi(old) <= lidx.Idx { - klog.V(3).Infof("Add(): bucket %q already contains a smaller index %d < %d for entry %q, not updating", dedupBucket, btoi(old), lidx.Idx, hex.EncodeToString(lidx.LeafID)) - } else if err := db.Put(lidx.LeafID, itob(lidx.Idx)); err != nil { + // old should always be 16 bytes long, but double check + if old := db.Get(ldi.LeafID); len(old) == 16 && btoi(old[:8]) <= ldi.Idx { + klog.V(3).Infof("Add(): bucket %q already contains a smaller index %d < %d for entry \"%x\", not updating", dedupBucket, btoi(old[:8]), ldi.Idx, ldi.LeafID) + } else if err := db.Put(ldi.LeafID, vB); err != nil { return err } - // size is a length, lidx.I an index, so if they're equal, - // lidx is a new entry. - if size == lidx.Idx { + // size is a length, ldi.Idx an index, so if they're equal, + // ldi is a new entry. + if size == ldi.Idx { klog.V(3).Infof("Add(): updating deduped size to %d", size+1) if err := sb.Put([]byte("size"), itob(size+1)); err != nil { return err @@ -127,7 +132,7 @@ func (s *Storage) Add(_ context.Context, lidxs []dedup.LeafIdx) error { return nil }) if err != nil { - return fmt.Errorf("b.Put(): error writing leaf index %d: err", lidx.Idx) + return fmt.Errorf("db.Update(): error writing leaf index %d: err", ldi.Idx) } } return nil @@ -137,21 +142,24 @@ func (s *Storage) Add(_ context.Context, lidxs []dedup.LeafIdx) error { // // If the requested entry is missing from the bucket, returns false ("comma ok" idiom). // The context is here for consistency with interfaces, but isn't used by BBolt. -func (s *Storage) Get(_ context.Context, leafID []byte) (uint64, bool, error) { - var idx []byte +func (s *Storage) Get(_ context.Context, leafID []byte) (dedup.SCTDedupInfo, bool, error) { + var v []byte _ = s.db.View(func(tx *bolt.Tx) error { b := tx.Bucket([]byte(dedupBucket)) - v := b.Get(leafID) - if v != nil { - idx = make([]byte, 8) - copy(idx, v) + if vv := b.Get(leafID); vv != nil { + v = make([]byte, len(vv)) + copy(v, vv) } return nil }) - if idx == nil { - return 0, false, nil + if v == nil { + return dedup.SCTDedupInfo{}, false, nil + } + idx, t, err := btov(v) + if err != nil { + return dedup.SCTDedupInfo{}, false, fmt.Errorf("btov(): %v", err) } - return btoi(idx), true, nil + return dedup.SCTDedupInfo{Idx: idx, Timestamp: t}, true, nil } // LogSize reads the latest entry from the size bucket. @@ -184,3 +192,37 @@ func itob(idx uint64) []byte { func btoi(b []byte) uint64 { return binary.BigEndian.Uint64(b) } + +// vtob concatenates an index and timestamp values into a byte array. +func vtob(idx uint64, timestamp uint64) ([]byte, error) { + b := make([]byte, 0, 16) + var err error + + b, err = binary.Append(b, binary.BigEndian, idx) + if err != nil { + return nil, fmt.Errorf("binary.Append() could not encode idx: %v", err) + } + b, err = binary.Append(b, binary.BigEndian, timestamp) + if err != nil { + return nil, fmt.Errorf("binary.Append() could not encode timestamp: %v", err) + } + + return b, nil +} + +// btov parses a byte array into an index and timestamp values. +func btov(b []byte) (uint64, uint64, error) { + var idx, timestamp uint64 + if l := len(b); l != 16 { + return 0, 0, fmt.Errorf("input value is %d bytes long, expected %d", l, 16) + } + n, err := binary.Decode(b, binary.BigEndian, &idx) + if err != nil { + return 0, 0, fmt.Errorf("binary.Decode() could not decode idx: %v", err) + } + _, err = binary.Decode(b[n:], binary.BigEndian, ×tamp) + if err != nil { + return 0, 0, fmt.Errorf("binary.Decode() could not decode timestamp: %v", err) + } + return idx, timestamp, nil +} diff --git a/storage/gcp/dedup.go b/storage/gcp/dedup.go index b6998d50..721a352f 100644 --- a/storage/gcp/dedup.go +++ b/storage/gcp/dedup.go @@ -37,6 +37,7 @@ func NewDedupeStorage(ctx context.Context, spannerDB string) (*DedupStorage, err id INT64 NOT NULL, h BYTES(MAX) NOT NULL, idx INT64 NOT NULL, + timestamp INT64 NOT NULL, ) PRIMARY KEY (id, h); */ dedupDB, err := spanner.NewClient(ctx, spannerDB) @@ -57,28 +58,31 @@ type DedupStorage struct { var _ dedup.BEDedupStorage = &DedupStorage{} // Get looks up the stored index, if any, for the given identity. -func (d *DedupStorage) Get(ctx context.Context, i []byte) (uint64, bool, error) { - var idx int64 - if row, err := d.dbPool.Single().ReadRow(ctx, "IDSeq", spanner.Key{0, i}, []string{"idx"}); err != nil { +func (d *DedupStorage) Get(ctx context.Context, i []byte) (dedup.SCTDedupInfo, bool, error) { + var idx, timestamp int64 + if row, err := d.dbPool.Single().ReadRow(ctx, "IDSeq", spanner.Key{0, i}, []string{"idx", "timestamp"}); err != nil { if c := spanner.ErrCode(err); c == codes.NotFound { - return 0, false, nil + return dedup.SCTDedupInfo{}, false, nil } - return 0, false, err + return dedup.SCTDedupInfo{}, false, err } else { - if err := row.Column(0, &idx); err != nil { - return 0, false, fmt.Errorf("failed to read dedup index: %v", err) + if err := row.Columns(&idx, ×tamp); err != nil { + return dedup.SCTDedupInfo{}, false, fmt.Errorf("failed to read dedup index: %v", err) } idx := uint64(idx) - return idx, true, nil + t := uint64(timestamp) + return dedup.SCTDedupInfo{Idx: idx, Timestamp: t}, true, nil } } // Add stores associations between the passed-in identities and their indices. -func (d *DedupStorage) Add(ctx context.Context, entries []dedup.LeafIdx) error { +func (d *DedupStorage) Add(ctx context.Context, entries []dedup.LeafDedupInfo) error { m := make([]*spanner.MutationGroup, 0, len(entries)) for _, e := range entries { m = append(m, &spanner.MutationGroup{ - Mutations: []*spanner.Mutation{spanner.Insert("IDSeq", []string{"id", "h", "idx"}, []interface{}{0, e.LeafID, int64(e.Idx)})}, + Mutations: []*spanner.Mutation{ + spanner.Insert("IDSeq", []string{"id", "h", "idx", "timestamp"}, + []interface{}{0, e.LeafID, int64(e.Idx), int64(e.Timestamp)})}, }) }