Skip to content

Commit 813e806

Browse files
authored
[HOPS-1654] Reconnect to NDB after network failure
1 parent 74e72c7 commit 813e806

File tree

6 files changed

+145
-35
lines changed

6 files changed

+145
-35
lines changed

src/main/java/io/hops/metadata/ndb/ClusterjConnector.java

+16-10
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@
5656
import org.apache.commons.logging.LogFactory;
5757

5858
import java.sql.SQLException;
59+
import java.util.ArrayList;
60+
import java.util.List;
5961
import java.util.Properties;
6062
import io.hops.metadata.yarn.dal.ReservationStateDataAccess;
6163

@@ -111,12 +113,11 @@ public HopsSession obtainSession() throws StorageException {
111113
}
112114

113115
@Override
114-
public void returnSession(boolean error) throws StorageException {
116+
public void returnSession(Exception... e) throws StorageException {
115117
DBSession dbSession = sessions.get();
116118
if (dbSession != null) {
117119
sessions.remove(); // remove, and return to the pool
118-
dbSessionProvider.returnSession(dbSession,
119-
error); // if there was an error then close the session
120+
dbSessionProvider.returnSession(dbSession, e);
120121
}
121122
}
122123

@@ -143,16 +144,16 @@ public void beginTransaction() throws StorageException {
143144
@Override
144145
public void commit() throws StorageException {
145146
HopsSession session = null;
146-
boolean dbError = false;
147+
Exception dbError = null;
147148
try {
148149
session = obtainSession();
149150
HopsTransaction tx = session.currentTransaction();
150151
if (!tx.isActive()) {
151-
throw new StorageException("The transaction is not began!");
152+
throw new StorageException("Cannot commit, no active transaction");
152153
}
153154
tx.commit();
154155
} catch (StorageException e) {
155-
dbError = true;
156+
dbError = e;
156157
throw e;
157158
} finally {
158159
returnSession(dbError);
@@ -163,20 +164,25 @@ public void commit() throws StorageException {
163164
* It rolls back only when the transaction is active.
164165
*/
165166
@Override
166-
public void rollback() throws StorageException {
167+
public void rollback(Exception exception) throws StorageException {
168+
List<Exception> allExceptions = new ArrayList<>();
169+
if (exception != null) {
170+
allExceptions.add(exception);
171+
}
172+
167173
HopsSession session = null;
168-
boolean dbError = false;
169174
try {
170175
session = obtainSession();
171176
HopsTransaction tx = session.currentTransaction();
172177
if (tx.isActive()) {
173178
tx.rollback();
174179
}
175180
} catch (StorageException e) {
176-
dbError = true;
181+
allExceptions.add(e);
177182
throw e;
178183
} finally {
179-
returnSession(dbError);
184+
LOG.fatal("rolback return session. errors count: "+allExceptions.size());
185+
returnSession(allExceptions.toArray(new Exception[allExceptions.size()]));
180186
}
181187
}
182188

src/main/java/io/hops/metadata/ndb/DBSession.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,30 @@
2020

2121
import io.hops.metadata.ndb.wrapper.HopsSession;
2222

23+
import java.util.UUID;
24+
2325
public class DBSession {
2426

2527
private HopsSession session;
2628
private final int MAX_REUSE_COUNT;
2729
private int sessionUseCount;
30+
private final UUID connectionID;
2831

29-
public DBSession(HopsSession session, int maxReuseCount) {
32+
public DBSession(HopsSession session, int maxReuseCount, UUID connectionID) {
3033
this.session = session;
3134
this.MAX_REUSE_COUNT = maxReuseCount;
3235
this.sessionUseCount = 0;
36+
this.connectionID = connectionID;
3337
}
3438

3539
public HopsSession getSession() {
3640
return session;
3741
}
3842

43+
public UUID getConnectionID() {
44+
return connectionID;
45+
}
46+
3947
public int getSessionUseCount() {
4048
return sessionUseCount;
4149
}

src/main/java/io/hops/metadata/ndb/DBSessionProvider.java

+84-22
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818
*/
1919
package io.hops.metadata.ndb;
2020

21+
import com.mysql.clusterj.ClusterJDatastoreException;
2122
import com.mysql.clusterj.ClusterJException;
2223
import com.mysql.clusterj.ClusterJHelper;
24+
import com.mysql.clusterj.ClusterJUserException;
2325
import com.mysql.clusterj.Constants;
2426
import com.mysql.clusterj.LockMode;
2527
import io.hops.exception.StorageException;
@@ -34,6 +36,7 @@
3436
import java.util.NoSuchElementException;
3537
import java.util.Properties;
3638
import java.util.Random;
39+
import java.util.UUID;
3740
import java.util.concurrent.ConcurrentLinkedQueue;
3841
import java.util.concurrent.atomic.AtomicInteger;
3942

@@ -54,6 +57,8 @@ public class DBSessionProvider implements Runnable {
5457
private boolean automaticRefresh = false;
5558
private Thread thread;
5659
private final ClusterJCaching clusterJCaching;
60+
private UUID currentConnectionID;
61+
private final int initialPoolSize;
5762

5863
public DBSessionProvider(Properties conf)
5964
throws StorageException {
@@ -65,7 +70,7 @@ public DBSessionProvider(Properties conf)
6570
(String) conf.get("io.hops.enable.clusterj.session.cache"));
6671
clusterJCaching = new ClusterJCaching(useClusterjDtoCache, useClusterjSessionCache);
6772

68-
int initialPoolSize = Integer.parseInt(
73+
initialPoolSize = Integer.parseInt(
6974
(String) conf.get("io.hops.session.pool.size"));
7075
int reuseCount = Integer.parseInt(
7176
(String) conf.get("io.hops.session.reuse.count"));
@@ -88,6 +93,8 @@ private void start(int initialPoolSize) throws StorageException {
8893
"Database name: " + conf.get(Constants.PROPERTY_CLUSTER_DATABASE));
8994
LOG.info("Max Transactions: " +
9095
conf.get(Constants.PROPERTY_CLUSTER_MAX_TRANSACTIONS));
96+
LOG.info("Reconnect Timeout: " +
97+
conf.get(Constants.PROPERTY_CONNECTION_RECONNECT_TIMEOUT));
9198
LOG.info("Using ClusterJ Session Cache: "+clusterJCaching.useClusterjSessionCache());
9299
LOG.info("Using ClusterJ DTO Cache: "+clusterJCaching.useClusterjDtoCache());
93100
try {
@@ -97,9 +104,7 @@ private void start(int initialPoolSize) throws StorageException {
97104
throw HopsExceptionHelper.wrap(ex);
98105
}
99106

100-
for (int i = 0; i < initialPoolSize; i++) {
101-
sessionPool.add(initSession());
102-
}
107+
createNewSessions();
103108

104109
thread = new Thread(this, "Session Pool Refresh Daemon");
105110
thread.setDaemon(true);
@@ -115,7 +120,7 @@ private DBSession initSession() throws StorageException {
115120
sessionCreationTime;
116121

117122
int reuseCount = rand.nextInt(MAX_REUSE_COUNT) + 1;
118-
DBSession dbSession = new DBSession(session, reuseCount);
123+
DBSession dbSession = new DBSession(session, reuseCount, currentConnectionID);
119124
sessionsCreated.incrementAndGet();
120125
return dbSession;
121126
}
@@ -141,20 +146,38 @@ public DBSession getSession() throws StorageException {
141146
DBSession session = sessionPool.remove();
142147
return session;
143148
} catch (NoSuchElementException e) {
144-
LOG.warn(
145-
"DB Sessino provider cant keep up with the demand for new sessions");
149+
LOG.warn("DB session provider cant keep up with the demand for new sessions");
146150
return initSession();
147151
}
148152
}
149153

150-
public void returnSession(DBSession returnedSession, boolean forceClose) throws StorageException {
154+
public void returnSession(DBSession returnedSession, Exception... exceptions) throws StorageException {
155+
boolean forceClose = false;
156+
if(sessionFactory.isOpen()){
157+
for(Exception e : exceptions){
158+
if (e == null) continue;
159+
Throwable cause = e.getCause();
160+
if (cause instanceof ClusterJDatastoreException) {
161+
forceClose = true;
162+
} else if (cause instanceof ClusterJUserException){
163+
if(cause.getMessage().contains("No more operations can be performed while this Db is " +
164+
"closing")){
165+
forceClose = true;
166+
}
167+
} else if (returnedSession.getConnectionID() != currentConnectionID) {
168+
forceClose = true;
169+
}
170+
}
171+
}
172+
151173
//session has been used, increment the use counter
152174
returnedSession
153175
.setSessionUseCount(returnedSession.getSessionUseCount() + 1);
154176

155-
if ((returnedSession.getSessionUseCount() >=
177+
if (sessionFactory.isOpen() && ((returnedSession.getSessionUseCount() >=
156178
returnedSession.getMaxReuseCount()) ||
157-
forceClose) { // session can be closed even before the reuse count has expired. Close the session incase of database errors.
179+
forceClose)) { // session can be closed even before the reuse count has expired. Close
180+
// the session incase of database errors.
158181
toGC.add(returnedSession);
159182
} else { // increment the count and return it to the pool
160183
returnedSession.getSession().setLockMode(LockMode.READ_COMMITTED);
@@ -179,24 +202,29 @@ public int getAvailableSessions() {
179202
return sessionPool.size();
180203
}
181204

205+
boolean reconnecting = false;
182206
@Override
183207
public void run() {
184208
while (automaticRefresh) {
185209
try {
186-
int toGCSize = toGC.size();
187-
188-
if (toGCSize > 0) {
189-
LOG.debug("Renewing a session(s) " + toGCSize);
190-
for (int i = 0; i < toGCSize; i++) {
191-
DBSession session = toGC.remove();
192-
session.getSession().close();
193-
}
210+
if (!sessionFactory.isOpen()) {
211+
reconnecting = true;
212+
sessionFactory.reconnect();
213+
}
194214

195-
for (int i = 0; i < toGCSize; i++) {
196-
sessionPool.add(initSession());
197-
}
215+
if ( sessionFactory.isOpen() && reconnecting ){
216+
// reconnected after network failure
217+
// close all old sessions and start new sessions
218+
reconnecting = false;
219+
currentConnectionID = UUID.randomUUID();
220+
renewAllSessions();
221+
gcSessions(false);
222+
} else {
223+
// do normal gc
224+
gcSessions(true);
198225
}
199-
Thread.sleep(5);
226+
227+
Thread.sleep(50);
200228
} catch (NoSuchElementException e) {
201229
for (int i = 0; i < 100; i++) {
202230
try {
@@ -214,6 +242,40 @@ public void run() {
214242
}
215243
}
216244

245+
public void renewAllSessions() throws StorageException {
246+
while (!sessionPool.isEmpty()) {
247+
DBSession session = sessionPool.poll();
248+
if (session != null) {
249+
closeSession(session);
250+
}
251+
}
252+
createNewSessions();
253+
}
254+
255+
public void gcSessions(boolean createNewSessions) throws StorageException {
256+
int toGCSize = toGC.size();
257+
258+
if (toGCSize > 0) {
259+
LOG.debug("Renewing a session(s) " + toGCSize);
260+
for (int i = 0; i < toGCSize; i++) {
261+
DBSession session = toGC.remove();
262+
closeSession(session);
263+
}
264+
265+
if(createNewSessions){
266+
for (int i = 0; i < toGCSize; i++) {
267+
sessionPool.add(initSession());
268+
}
269+
}
270+
}
271+
}
272+
273+
public void createNewSessions() throws StorageException {
274+
for (int i = 0; i < initialPoolSize; i++) {
275+
sessionPool.add(initSession());
276+
}
277+
}
278+
217279
public void clearCache() throws StorageException {
218280
Iterator<DBSession> itr = sessionPool.iterator();
219281
while(itr.hasNext()){

src/main/java/io/hops/metadata/ndb/mysqlserver/MysqlServerConnector.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ public static void truncateTable(boolean transactional, String tableName,
193193
}
194194

195195
@Override
196-
public void returnSession(boolean error) throws StorageException {
196+
public void returnSession(Exception... error) throws StorageException {
197197
throw new UnsupportedOperationException("Not supported yet.");
198198
}
199199

@@ -208,7 +208,7 @@ public void commit() throws StorageException {
208208
}
209209

210210
@Override
211-
public void rollback() throws StorageException {
211+
public void rollback(Exception e) throws StorageException {
212212
throw new UnsupportedOperationException("Not supported yet.");
213213
}
214214

src/main/java/io/hops/metadata/ndb/wrapper/HopsSessionFactory.java

+17
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.mysql.clusterj.ClusterJException;
2222
import com.mysql.clusterj.SessionFactory;
2323
import io.hops.exception.StorageException;
24+
import io.hops.metadata.hdfs.entity.Storage;
2425

2526
import java.util.Map;
2627

@@ -56,4 +57,20 @@ public void close() throws StorageException {
5657
throw HopsExceptionHelper.wrap(e);
5758
}
5859
}
60+
61+
public boolean isOpen() throws StorageException {
62+
try {
63+
return factory.currentState() == SessionFactory.State.Open;
64+
} catch (ClusterJException e) {
65+
throw HopsExceptionHelper.wrap(e);
66+
}
67+
}
68+
69+
public void reconnect() throws StorageException {
70+
try {
71+
factory.reconnect();
72+
} catch (ClusterJException e) {
73+
throw HopsExceptionHelper.wrap(e);
74+
}
75+
}
5976
}

src/main/resources/ndb-config.properties.template

+17
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,23 @@ com.mysql.clusterj.database=hops_db
33
com.mysql.clusterj.connection.pool.size=1
44
com.mysql.clusterj.max.transactions=1024
55

6+
#determines the number of seconds to wait until the first “live” node is detected.
7+
#If this amount of time is exceeded with no live nodes detected,
8+
#then the method immediately returns a negative value. Default=30
9+
com.mysql.clusterj.connect.timeout.before=30
10+
11+
#determines the number of seconds to wait after the first “live” node is
12+
#detected for all nodes to become active. If this amount of time is exceeded
13+
#without all nodes becoming active, then the method immediately returns a
14+
#value greater than zero. Default=20
15+
com.mysql.clusterj.connect.timeout.after=20
16+
17+
#The number of seconds to wait for all sessions to be closed when reconnecting a SessionFactory
18+
#due to network failures. The default, 0, indicates that the automatic reconnection to the cluster
19+
#due to network failures is disabled. Reconnection can be enabled by using the method
20+
#SessionFactory.reconnect(int timeout) and specifying a new timeout value.
21+
com.mysql.clusterj.connection.reconnect.timeout=5
22+
623
#clusterj caching
724
#set io.hops.enable.clusterj.dto.cache and io.hops.enable.clusterj.session.cache to use dto and session caching provided by clusterj
825
io.hops.enable.clusterj.dto.cache=false

0 commit comments

Comments
 (0)