@@ -33,44 +33,55 @@ type manager struct {
33
33
func (m * manager ) Filter (ctx context.Context , hashes []protocol.ID ) ([]protocol.ID , error ) {
34
34
m .mutex .Lock ()
35
35
defer m .mutex .Unlock ()
36
+
36
37
if m .filter == nil || m .shouldFlush () {
37
38
if flushErr := m .flush (ctx ); flushErr != nil {
38
39
return nil , flushErr
39
40
}
40
41
}
42
+
41
43
var filtered []protocol.ID
44
+
42
45
for _ , hash := range hashes {
43
46
if _ , ok := m .buffer [hash ]; ok {
44
47
continue
45
48
}
49
+
46
50
if m .filter .Test (hash [:]) {
47
51
continue
48
52
}
53
+
49
54
filtered = append (filtered , hash )
50
55
}
56
+
51
57
return filtered , nil
52
58
}
53
59
54
60
func (m * manager ) Block (ctx context.Context , hashes []protocol.ID , flush bool ) error {
55
61
m .mutex .Lock ()
56
62
defer m .mutex .Unlock ()
63
+
57
64
for _ , hash := range hashes {
58
65
m .buffer [hash ] = struct {}{}
59
66
}
67
+
60
68
if flush || m .shouldFlush () {
61
69
if flushErr := m .flush (ctx ); flushErr != nil {
62
70
return flushErr
63
71
}
64
72
}
73
+
65
74
return nil
66
75
}
67
76
68
77
func (m * manager ) Flush (ctx context.Context ) error {
69
78
m .mutex .Lock ()
70
79
defer m .mutex .Unlock ()
80
+
71
81
if len (m .buffer ) == 0 {
72
82
return nil
73
83
}
84
+
74
85
return m .flush (ctx )
75
86
}
76
87
@@ -88,6 +99,7 @@ func (m *manager) flush(ctx context.Context) error {
88
99
if err != nil {
89
100
return fmt .Errorf ("failed to begin transaction: %w" , err )
90
101
}
102
+
91
103
defer func () {
92
104
_ = tx .Rollback (ctx )
93
105
}()
@@ -106,18 +118,24 @@ func (m *manager) flush(ctx context.Context) error {
106
118
found := false
107
119
108
120
var oid uint32
121
+
109
122
var nullOid sql.NullInt32
123
+
110
124
err = tx .QueryRow (ctx , "SELECT oid FROM bloom_filters WHERE key = $1" , blockedTorrentsBloomFilterKey ).Scan (& nullOid )
111
125
if err == nil {
112
126
found = true
127
+
113
128
if nullOid .Valid {
114
129
oid = uint32 (nullOid .Int32 )
130
+
115
131
obj , err := lobs .Open (ctx , oid , pgx .LargeObjectModeRead )
116
132
if err != nil {
117
133
return fmt .Errorf ("failed to open large object for reading: %w" , err )
118
134
}
135
+
119
136
_ , err = bf .ReadFrom (obj )
120
137
obj .Close ()
138
+
121
139
if err != nil {
122
140
return fmt .Errorf ("failed to read current bloom filter: %w" , err )
123
141
}
0 commit comments