27
27
ErrUnknownVersion = fmt .Errorf ("unknown version" )
28
28
//ErrNotFound is returned if an object is not found
29
29
ErrNotFound = fmt .Errorf ("object not found" )
30
+ //ErrClosed is returned if the client is closed
31
+ ErrClosed = fmt .Errorf ("client closed" )
30
32
)
31
33
32
34
// Versioned base for all types
@@ -145,6 +147,9 @@ func (p *mgrImpl) Raw() (Conn, Meta, error) {
145
147
return cl , meta , err
146
148
}
147
149
150
+ // connect connects to the next endpoint in roundrobin fashion
151
+ // and replaces the current connection with the new one.
152
+ // need to be called while lock is acquired.
148
153
func (p * mgrImpl ) connect (s * Substrate ) error {
149
154
cl , meta , err := p .Raw ()
150
155
if err != nil {
@@ -176,8 +181,10 @@ func (p *mgrImpl) put(s *Substrate) {
176
181
177
182
// Substrate client
178
183
type Substrate struct {
179
- cl Conn
180
- meta Meta
184
+ mu sync.Mutex
185
+ cl Conn
186
+ meta Meta
187
+ closed bool
181
188
182
189
close func (s * Substrate )
183
190
connect func (s * Substrate ) error
@@ -189,10 +196,23 @@ func newSubstrate(cl Conn, meta Meta, close func(*Substrate), connect func(s *Su
189
196
}
190
197
191
198
func (s * Substrate ) Close () {
199
+ s .mu .Lock ()
200
+ defer s .mu .Unlock ()
201
+ if s .closed {
202
+ return
203
+ }
192
204
s .close (s )
205
+ s .closed = true
193
206
}
194
207
195
208
func (s * Substrate ) GetClient () (Conn , Meta , error ) {
209
+ s .mu .Lock ()
210
+ defer s .mu .Unlock ()
211
+
212
+ if s .closed {
213
+ return nil , nil , ErrClosed
214
+ }
215
+
196
216
// check if connection is healthy
197
217
if _ , err := getTime (s .cl , s .meta ); err != nil {
198
218
log .Info ().Str ("url" , s .cl .Client .URL ()).Msg ("connection unhealthy, attempting failover" )
0 commit comments