Skip to content

Deduplication --> Antispam #293

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
May 13, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cmd/aws/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ var (
dbPassword = flag.String("db_password", "", "AuroraDB password")
dbMaxConns = flag.Int("db_max_conns", 0, "Maximum connections to the database, defaults to 0, i.e unlimited")
dbMaxIdle = flag.Int("db_max_idle_conns", 2, "Maximum idle database connections in the connection pool, defaults to 2")
inMemoryAntispamCacheSize = flag.Uint("inmemory_antispam_cache_size", 2<<10, "Maximum number of entries to keep in the in-memory antispam cache.")
inMemoryAntispamCacheSize = flag.Uint("inmemory_antispam_cache_size", 256<<10, "Maximum number of entries to keep in the in-memory antispam cache.")
rootsPemFile = flag.String("roots_pem_file", "", "Path to the file containing root certificates that are acceptable to the log. The certs are served through get-roots endpoint.")
rejectExpired = flag.Bool("reject_expired", false, "If true then the certificate validity period will be checked against the current time during the validation of submissions. This will cause expired certificates to be rejected.")
rejectUnexpired = flag.Bool("reject_unexpired", false, "If true then CTFE rejects certificates that are either currently valid or not yet valid.")
Expand Down Expand Up @@ -157,7 +157,7 @@ func newAWSStorage(ctx context.Context, signer note.Signer) (*storage.CTStorage,
}
}

appender, _, _, err := tessera.NewAppender(ctx, driver, tessera.NewAppendOptions().
appender, _, reader, err := tessera.NewAppender(ctx, driver, tessera.NewAppendOptions().
WithCheckpointSigner(signer).
WithCTLayout().
WithAntispam(*inMemoryAntispamCacheSize, antispam))
Expand All @@ -170,7 +170,7 @@ func newAWSStorage(ctx context.Context, signer note.Signer) (*storage.CTStorage,
return nil, fmt.Errorf("failed to initialize AWS issuer storage: %v", err)
}

return storage.NewCTStorage(appender, issuerStorage)
return storage.NewCTStorage(appender, issuerStorage, reader)
}

type timestampFlag struct {
Expand Down
9 changes: 4 additions & 5 deletions cmd/gcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ var (
origin = flag.String("origin", "", "Origin of the log, for checkpoints and the monitoring prefix.")
bucket = flag.String("bucket", "", "Name of the bucket to store the log in.")
spannerDB = flag.String("spanner_db_path", "", "Spanner database path: projects/{projectId}/instances/{instanceId}/databases/{databaseId}.")
spannerAntispamDB = flag.String("spanner_antispam_db_path", "", "EXPERIMENTAL: Spanner antispam deduplication database path projects/{projectId}/instances/{instanceId}/databases/{databaseId}.")
inMemoryAntispamCacheSize = flag.Uint("inmemory_antispam_cache_size", 2<<10, "Maximum number of entries to keep in the in-memory antispam cache.")
spannerAntispamDB = flag.String("spanner_antispam_db_path", "", "Spanner antispam deduplication database path projects/{projectId}/instances/{instanceId}/databases/{databaseId}.")
inMemoryAntispamCacheSize = flag.Uint("inmemory_antispam_cache_size", 256<<10, "Maximum number of entries to keep in the in-memory antispam cache.")
rootsPemFile = flag.String("roots_pem_file", "", "Path to the file containing root certificates that are acceptable to the log. The certs are served through get-roots endpoint.")
rejectExpired = flag.Bool("reject_expired", false, "If true then the certificate validity period will be checked against the current time during the validation of submissions. This will cause expired certificates to be rejected.")
rejectUnexpired = flag.Bool("reject_unexpired", false, "If true then CTFE rejects certificates that are either currently valid or not yet valid.")
Expand Down Expand Up @@ -160,7 +160,6 @@ func newGCPStorage(ctx context.Context, signer note.Signer) (*storage.CTStorage,
}

var antispam tessera.Antispam
// Persistent antispam is currently experimental, so there's no terraform or documentation yet!
if *spannerAntispamDB != "" {
antispam, err = gcp_as.NewAntispam(ctx, *spannerAntispamDB, gcp_as.AntispamOpts{})
if err != nil {
Expand All @@ -175,7 +174,7 @@ func newGCPStorage(ctx context.Context, signer note.Signer) (*storage.CTStorage,

// TODO(phbnf): figure out the best way to thread the `shutdown` func NewAppends returns back out to main so we can cleanly close Tessera down
// when it's time to exit.
appender, _, _, err := tessera.NewAppender(ctx, driver, opts)
appender, _, reader, err := tessera.NewAppender(ctx, driver, opts)
if err != nil {
return nil, fmt.Errorf("failed to initialize GCP Tessera appender: %v", err)
}
Expand All @@ -185,7 +184,7 @@ func newGCPStorage(ctx context.Context, signer note.Signer) (*storage.CTStorage,
return nil, fmt.Errorf("failed to initialize GCP issuer storage: %v", err)
}

return storage.NewCTStorage(appender, issuerStorage)
return storage.NewCTStorage(appender, issuerStorage, reader)
}

type timestampFlag struct {
Expand Down
3 changes: 1 addition & 2 deletions internal/ct/ctlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/transparency-dev/static-ct/internal/types/rfc6962"
"github.com/transparency-dev/static-ct/storage"
tessera "github.com/transparency-dev/trillian-tessera"
"github.com/transparency-dev/trillian-tessera/ctonly"
"k8s.io/klog/v2"
)
Expand All @@ -35,7 +34,7 @@ type signSCT func(leaf *rfc6962.MerkleTreeLeaf) (*rfc6962.SignedCertificateTimes
// Storage provides functions to store certificates in a static-ct-api log.
type Storage interface {
// Add assigns an index to the provided Entry, stages the entry for integration, and returns a future for the assigned index.
Add(context.Context, *ctonly.Entry) tessera.IndexFuture
Add(context.Context, *ctonly.Entry) (idx uint64, timestamp uint64, err error)
// AddIssuerChain stores every the chain certificate in a content-addressable store under their sha256 hash.
AddIssuerChain(context.Context, []*x509.Certificate) error
}
Expand Down
19 changes: 8 additions & 11 deletions internal/ct/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,11 @@ func (a appHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
originAttr := originKey.String(a.log.origin)
operationAttr := operationKey.String(a.name)
reqCounter.Add(r.Context(), 1, metric.WithAttributes(originAttr, operationAttr))
startTime := a.opts.TimeSource.Now()
startTime := time.Now()
logCtx := a.opts.RequestLog.start(r.Context())
a.opts.RequestLog.origin(logCtx, a.log.origin)
defer func() {
latency := a.opts.TimeSource.Now().Sub(startTime).Seconds()
latency := time.Since(startTime).Seconds()
reqDuration.Record(r.Context(), latency, metric.WithAttributes(originAttr, operationAttr, codeKey.Int(statusCode)))
}()

Expand All @@ -146,7 +146,7 @@ func (a appHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

// impose a deadline on this onward request.
ctx, cancel := context.WithDeadline(logCtx, deadlineTime(a.opts))
ctx, cancel := context.WithTimeout(logCtx, a.opts.Deadline)
defer cancel()

var err error
Expand Down Expand Up @@ -178,6 +178,7 @@ type HandlerOptions struct {
// or returned to the user containing the full error message.
MaskInternalErrors bool
// TimeSource indicated the system time and can be injfected for testing.
// TODO(phbnf): hide inside the log
TimeSource TimeSource
}

Expand Down Expand Up @@ -274,18 +275,19 @@ func addChainInternal(ctx context.Context, opts *HandlerOptions, log *log, w htt
}

klog.V(2).Infof("%s: %s => storage.Add", log.origin, method)
index, err := log.storage.Add(ctx, entry)()
index, dedupedTimeMillis, err := log.storage.Add(ctx, entry)
if err != nil {
if errors.Is(err, tessera.ErrPushback) {
w.Header().Add("Retry-After", "1")
return http.StatusServiceUnavailable, fmt.Errorf("received pushback from Tessera sequencer: %v", err)
}
return http.StatusInternalServerError, fmt.Errorf("couldn't store the leaf: %v", err)
}
entry.Timestamp = dedupedTimeMillis

// Always use the returned leaf as the basis for an SCT.
var loggedLeaf rfc6962.MerkleTreeLeaf
leafValue := entry.MerkleTreeLeaf(index.Index)
leafValue := entry.MerkleTreeLeaf(index)
if rest, err := tls.Unmarshal(leafValue, &loggedLeaf); err != nil {
return http.StatusInternalServerError, fmt.Errorf("failed to reconstruct MerkleTreeLeaf: %s", err)
} else if len(rest) > 0 {
Expand All @@ -312,7 +314,7 @@ func addChainInternal(ctx context.Context, opts *HandlerOptions, log *log, w htt
klog.V(3).Infof("%s: %s <= SCT", log.origin, method)
if sct.Timestamp == timeMillis {
lastSCTTimestamp.Record(ctx, otel.Clamp64(sct.Timestamp), metric.WithAttributes(originKey.String(log.origin)))
lastSCTIndex.Record(ctx, otel.Clamp64(index.Index), metric.WithAttributes(originKey.String(log.origin)))
lastSCTIndex.Record(ctx, otel.Clamp64(index), metric.WithAttributes(originKey.String(log.origin)))
}

return http.StatusOK, nil
Expand Down Expand Up @@ -355,11 +357,6 @@ func getRoots(ctx context.Context, opts *HandlerOptions, log *log, w http.Respon
return http.StatusOK, nil
}

// deadlineTime calculates the future time a request should expire based on our config.
func deadlineTime(opts *HandlerOptions) time.Time {
return opts.TimeSource.Now().Add(opts.Deadline)
}

// marshalAndWriteAddChainResponse is used by add-chain and add-pre-chain to create and write
// the JSON response to the client
func marshalAndWriteAddChainResponse(sct *rfc6962.SignedCertificateTimestamp, w http.ResponseWriter) error {
Expand Down
Loading
Loading