Skip to content

Commit 6b2b609

Browse files
committed
[HOPS-1635] RonDB 21.04.0 and configurable ClusterJ caching
1 parent c34b785 commit 6b2b609

8 files changed

+157
-97
lines changed

pom.xml

+2-2
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525

2626
<dependency>
2727
<groupId>com.mysql.ndb</groupId>
28-
<artifactId>clusterj-hops-fix</artifactId>
29-
<version>8.0.21</version>
28+
<artifactId>clusterj-rondb</artifactId>
29+
<version>21.04.0</version>
3030
</dependency>
3131

3232
<dependency>

src/main/java/io/hops/metadata/ndb/ClusterjConnector.java

+59-52
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
* modify it under the terms of the GNU General Public License
77
* as published by the Free Software Foundation; either version 2
88
* of the License, or (at your option) any later version.
9-
*
9+
*
1010
* This program is distributed in the hope that it will be useful,
1111
* but WITHOUT ANY WARRANTY; without even the implied warranty of
1212
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
@@ -68,7 +68,7 @@ public class ClusterjConnector implements StorageConnector<DBSession> {
6868
static final Log LOG = LogFactory.getLog(ClusterjConnector.class);
6969
private String clusterConnectString;
7070
private String databaseName;
71-
71+
7272
private ClusterjConnector() {
7373
}
7474

@@ -82,22 +82,16 @@ public void setConfiguration(Properties conf) throws StorageException {
8282
LOG.warn("SessionFactory is already initialized");
8383
return;
8484
}
85-
85+
8686
clusterConnectString = (String) conf.get(Constants.PROPERTY_CLUSTER_CONNECTSTRING);
8787
LOG.info("Database connect string: " +
88-
conf.get(Constants.PROPERTY_CLUSTER_CONNECTSTRING));
88+
conf.get(Constants.PROPERTY_CLUSTER_CONNECTSTRING));
8989
databaseName = (String) conf.get(Constants.PROPERTY_CLUSTER_DATABASE);
9090
LOG.info("Database name: " + conf.get(Constants.PROPERTY_CLUSTER_DATABASE));
9191
LOG.info("Max Transactions: " +
92-
conf.get(Constants.PROPERTY_CLUSTER_MAX_TRANSACTIONS));
93-
94-
int initialPoolSize =
95-
Integer.parseInt((String) conf.get("io.hops.session.pool.size"));
96-
int reuseCount =
97-
Integer.parseInt((String) conf.get("io.hops.session.reuse.count"));
98-
dbSessionProvider =
99-
new DBSessionProvider(conf, reuseCount, initialPoolSize);
100-
92+
conf.get(Constants.PROPERTY_CLUSTER_MAX_TRANSACTIONS));
93+
94+
dbSessionProvider = new DBSessionProvider(conf);
10195
isInitialized = true;
10296
}
10397

@@ -122,7 +116,7 @@ public void returnSession(boolean error) throws StorageException {
122116
if (dbSession != null) {
123117
sessions.remove(); // remove, and return to the pool
124118
dbSessionProvider.returnSession(dbSession,
125-
error); // if there was an error then close the session
119+
error); // if there was an error then close the session
126120
}
127121
}
128122

@@ -164,7 +158,7 @@ public void commit() throws StorageException {
164158
returnSession(dbError);
165159
}
166160
}
167-
161+
168162
/**
169163
* It rolls back only when the transaction is active.
170164
*/
@@ -206,7 +200,7 @@ public boolean formatHDFSStorage() throws StorageException {
206200

207201
@Override
208202
public boolean formatStorage(Class<? extends EntityDataAccess>... das)
209-
throws StorageException {
203+
throws StorageException {
210204
return format(true, das);
211205
}
212206

@@ -241,7 +235,7 @@ public void readCommitted() throws StorageException {
241235

242236
@Override
243237
public void setPartitionKey(Class className, Object key)
244-
throws StorageException {
238+
throws StorageException {
245239
Class cls = null;
246240
if (className == BlockInfoDataAccess.class) {
247241
cls = BlockInfoClusterj.BlockInfoDTO.class;
@@ -316,7 +310,7 @@ public void setPartitionKey(Class className, Object key)
316310
session.flush();
317311
}
318312

319-
@Override
313+
@Override
320314
public boolean formatAllStorageNonTransactional() throws StorageException {
321315
return formatAll(false);
322316
}
@@ -333,39 +327,39 @@ public boolean formatHDFSStorageNonTransactional() throws StorageException {
333327

334328
private boolean formatYarn(boolean transactional) throws StorageException{
335329
return format(transactional, YarnLeDescriptorDataAccess.class, ApplicationStateDataAccess.class,
336-
ApplicationAttemptStateDataAccess.class, DelegationKeyDataAccess.class, DelegationTokenDataAccess.class,
337-
ProjectQuotaDataAccess.class, ProjectsDailyCostDataAccess.class, PriceMultiplicatorDataAccess.class,
338-
ReservationStateDataAccess.class, ConfMutationDataAccess.class, ConfDataAccess.class,
339-
AppProvenanceDataAccess.class);
330+
ApplicationAttemptStateDataAccess.class, DelegationKeyDataAccess.class, DelegationTokenDataAccess.class,
331+
ProjectQuotaDataAccess.class, ProjectsDailyCostDataAccess.class, PriceMultiplicatorDataAccess.class,
332+
ReservationStateDataAccess.class, ConfMutationDataAccess.class, ConfDataAccess.class,
333+
AppProvenanceDataAccess.class);
340334
}
341-
335+
342336
private boolean formatHDFS(boolean transactional) throws StorageException{
343337
return format(transactional,
344-
INodeDataAccess.class, InMemoryInodeDataAccess.class,
345-
SmallOnDiskInodeDataAccess.class, MediumOnDiskInodeDataAccess.class,
346-
LargeOnDiskInodeDataAccess.class,
347-
BlockInfoDataAccess.class, LeaseDataAccess.class,
348-
LeasePathDataAccess.class, ReplicaDataAccess.class,
349-
ReplicaUnderConstructionDataAccess.class,
350-
InvalidateBlockDataAccess.class, ExcessReplicaDataAccess.class,
351-
PendingBlockDataAccess.class, CorruptReplicaDataAccess.class,
352-
UnderReplicatedBlockDataAccess.class, HdfsLeDescriptorDataAccess.class,
353-
DirectoryWithQuotaFeatureDataAccess.class, StorageIdMapDataAccess.class,
354-
BlockLookUpDataAccess.class, SafeBlocksDataAccess.class,
355-
MisReplicatedRangeQueueDataAccess.class, QuotaUpdateDataAccess.class,
356-
EncodingStatusDataAccess.class, BlockChecksumDataAccess.class,
357-
OngoingSubTreeOpsDataAccess.class,
358-
MetadataLogDataAccess.class, EncodingJobsDataAccess.class,
359-
RepairJobsDataAccess.class, UserDataAccess.class, GroupDataAccess.class,
360-
UserGroupDataAccess.class,VariableDataAccess.class,
361-
HashBucketDataAccess.class, StorageDataAccess.class,
362-
AceDataAccess.class, RetryCacheEntryDataAccess.class, CacheDirectiveDataAccess.class,
363-
CachePoolDataAccess.class, CachedBlockDataAccess.class,
364-
ActiveBlockReportsDataAccess.class, XAttrDataAccess.class, EncryptionZoneDataAccess.class,
365-
FileProvenanceDataAccess.class, FileProvXAttrBufferDataAccess.class,
366-
LeaseCreationLocksDataAccess.class);
338+
INodeDataAccess.class, InMemoryInodeDataAccess.class,
339+
SmallOnDiskInodeDataAccess.class, MediumOnDiskInodeDataAccess.class,
340+
LargeOnDiskInodeDataAccess.class,
341+
BlockInfoDataAccess.class, LeaseDataAccess.class,
342+
LeasePathDataAccess.class, ReplicaDataAccess.class,
343+
ReplicaUnderConstructionDataAccess.class,
344+
InvalidateBlockDataAccess.class, ExcessReplicaDataAccess.class,
345+
PendingBlockDataAccess.class, CorruptReplicaDataAccess.class,
346+
UnderReplicatedBlockDataAccess.class, HdfsLeDescriptorDataAccess.class,
347+
DirectoryWithQuotaFeatureDataAccess.class, StorageIdMapDataAccess.class,
348+
BlockLookUpDataAccess.class, SafeBlocksDataAccess.class,
349+
MisReplicatedRangeQueueDataAccess.class, QuotaUpdateDataAccess.class,
350+
EncodingStatusDataAccess.class, BlockChecksumDataAccess.class,
351+
OngoingSubTreeOpsDataAccess.class,
352+
MetadataLogDataAccess.class, EncodingJobsDataAccess.class,
353+
RepairJobsDataAccess.class, UserDataAccess.class, GroupDataAccess.class,
354+
UserGroupDataAccess.class,VariableDataAccess.class,
355+
HashBucketDataAccess.class, StorageDataAccess.class,
356+
AceDataAccess.class, RetryCacheEntryDataAccess.class, CacheDirectiveDataAccess.class,
357+
CachePoolDataAccess.class, CachedBlockDataAccess.class,
358+
ActiveBlockReportsDataAccess.class, XAttrDataAccess.class, EncryptionZoneDataAccess.class,
359+
FileProvenanceDataAccess.class, FileProvXAttrBufferDataAccess.class,
360+
LeaseCreationLocksDataAccess.class);
367361
}
368-
362+
369363
private boolean formatAll(boolean transactional) throws StorageException {
370364
//HDFS
371365
if (!formatHDFS(transactional)) {
@@ -383,9 +377,22 @@ private boolean formatAll(boolean transactional) throws StorageException {
383377
}
384378

385379
private boolean format(boolean transactional,
386-
Class<? extends EntityDataAccess>... das) throws StorageException {
387-
380+
Class<? extends EntityDataAccess>... das) throws StorageException {
381+
388382
final int RETRIES = 5; // in test
383+
384+
// we need to clear the cache objects
385+
if(!transactional) {
386+
// This calls the SQL truncate command which changes the schema ID.
387+
// After calling truncate we reload the schema to avoid schema ID change
388+
// exceptions. However, reloading the schema invalidates the
389+
// objects in the DTO cache in the ClusterJ causing NPEs.
390+
// Wipe the cache before we call truncate and reload the schema
391+
392+
// we clear the cache for all open sessions
393+
dbSessionProvider.clearCache();
394+
}
395+
389396
for (int i = 0; i < RETRIES; i++) {
390397
try {
391398
for (Class e : das) {
@@ -636,9 +643,9 @@ private boolean format(boolean transactional,
636643
} // end retry loop
637644
return false;
638645
}
639-
646+
640647
private void truncate(boolean transactional, String tableName, Class dtoClass)
641-
throws StorageException, SQLException {
648+
throws StorageException, SQLException {
642649
MysqlServerConnector.truncateTable(transactional, tableName);
643650

644651
if(!transactional){ // this means that SQL Truncate is used to empty the tables
@@ -664,5 +671,5 @@ public String getClusterConnectString() {
664671
public String getDatabaseName() {
665672
return databaseName;
666673
}
667-
674+
668675
}

src/main/java/io/hops/metadata/ndb/DBSessionProvider.java

+28-19
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,15 @@
2222
import com.mysql.clusterj.ClusterJHelper;
2323
import com.mysql.clusterj.Constants;
2424
import com.mysql.clusterj.LockMode;
25-
import com.mysql.clusterj.core.util.LoggerFactory;
2625
import io.hops.exception.StorageException;
26+
import io.hops.metadata.ndb.wrapper.ClusterJCaching;
2727
import io.hops.metadata.ndb.wrapper.HopsExceptionHelper;
2828
import io.hops.metadata.ndb.wrapper.HopsSession;
2929
import io.hops.metadata.ndb.wrapper.HopsSessionFactory;
3030
import org.apache.commons.logging.Log;
3131
import org.apache.commons.logging.LogFactory;
3232

33+
import java.util.Iterator;
3334
import java.util.NoSuchElementException;
3435
import java.util.Properties;
3536
import java.util.Random;
@@ -52,14 +53,28 @@ public class DBSessionProvider implements Runnable {
5253
private AtomicInteger rollingAvgIndex = new AtomicInteger(-1);
5354
private boolean automaticRefresh = false;
5455
private Thread thread;
56+
private final ClusterJCaching clusterJCaching;
5557

56-
public DBSessionProvider(Properties conf, int reuseCount, int initialPoolSize)
58+
public DBSessionProvider(Properties conf)
5759
throws StorageException {
5860
this.conf = conf;
61+
62+
boolean useClusterjDtoCache = Boolean.parseBoolean(
63+
(String) conf.get("io.hops.enable.clusterj.dto.cache"));
64+
boolean useClusterjSessionCache = Boolean.parseBoolean(
65+
(String) conf.get("io.hops.enable.clusterj.session.cache"));
66+
clusterJCaching = new ClusterJCaching(useClusterjDtoCache, useClusterjSessionCache);
67+
68+
int initialPoolSize = Integer.parseInt(
69+
(String) conf.get("io.hops.session.pool.size"));
70+
int reuseCount = Integer.parseInt(
71+
(String) conf.get("io.hops.session.reuse.count"));
72+
5973
if (reuseCount <= 0) {
6074
System.err.println("Invalid value for session reuse count");
6175
System.exit(-1);
6276
}
77+
6378
this.MAX_REUSE_COUNT = reuseCount;
6479
rand = new Random(System.currentTimeMillis());
6580
rollingAvg = new long[initialPoolSize];
@@ -73,9 +88,11 @@ private void start(int initialPoolSize) throws StorageException {
7388
"Database name: " + conf.get(Constants.PROPERTY_CLUSTER_DATABASE));
7489
LOG.info("Max Transactions: " +
7590
conf.get(Constants.PROPERTY_CLUSTER_MAX_TRANSACTIONS));
91+
LOG.info("Using ClusterJ Session Cache: "+clusterJCaching.useClusterjSessionCache());
92+
LOG.info("Using ClusterJ DTO Cache: "+clusterJCaching.useClusterjDtoCache());
7693
try {
7794
sessionFactory =
78-
new HopsSessionFactory(ClusterJHelper.getSessionFactory(conf));
95+
new HopsSessionFactory(ClusterJHelper.getSessionFactory(conf), clusterJCaching);
7996
} catch (ClusterJException ex) {
8097
throw HopsExceptionHelper.wrap(ex);
8198
}
@@ -174,29 +191,13 @@ public void run() {
174191
DBSession session = toGC.remove();
175192
session.getSession().close();
176193
}
177-
//System.out.println("CGed " + toGCSize);
178194

179195
for (int i = 0; i < toGCSize; i++) {
180196
sessionPool.add(initSession());
181197
}
182-
//System.out.println("Created " + toGCSize);
183198
}
184-
// for (int i = 0; i < 100; i++) {
185-
// DBSession session = sessionPool.remove();
186-
// double percent = (((double) session.getSessionUseCount() / (double) session.getMaxReuseCount()) * (double) 100);
187-
// // System.out.print(session.getSessionUseCount()+","+session.getMaxReuseCount()+","+percent+" ");
188-
// if (percent > 80) { // more than 80% used then recyle it
189-
// session.getSession().close();
190-
// System.out.println("Recycled a session");
191-
// //add a new session
192-
// sessionPool.add(initSession());
193-
// } else {
194-
// sessionPool.add(session);
195-
// }
196-
// }
197199
Thread.sleep(5);
198200
} catch (NoSuchElementException e) {
199-
//System.out.print(".");
200201
for (int i = 0; i < 100; i++) {
201202
try {
202203
sessionPool.add(initSession());
@@ -212,4 +213,12 @@ public void run() {
212213
}
213214
}
214215
}
216+
217+
public void clearCache() throws StorageException {
218+
Iterator<DBSession> itr = sessionPool.iterator();
219+
while(itr.hasNext()){
220+
DBSession session = itr.next();
221+
session.getSession().dropInstanceCache();
222+
}
223+
}
215224
}

src/main/java/io/hops/metadata/ndb/dalimpl/hdfs/LeaseCreationLocksClusterj.java

-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ public void createLockRows(int count) throws StorageException {
6464
dto.setID(i);
6565
session.savePersistent(dto);
6666
session.release(dto);
67-
LOG.debug("Added lease creation lock row with ID: "+i);
6867
}
6968
}
7069
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package io.hops.metadata.ndb.wrapper;
2+
3+
public class ClusterJCaching {
4+
private final boolean useClusterjDtoCache;
5+
private final boolean useClusterjSessionCache;
6+
7+
public ClusterJCaching(boolean useClusterjDtoCache, boolean useClusterjSessionCache) {
8+
this.useClusterjDtoCache = useClusterjDtoCache;
9+
this.useClusterjSessionCache = useClusterjSessionCache;
10+
}
11+
12+
public boolean useClusterjDtoCache() {
13+
return useClusterjDtoCache;
14+
}
15+
16+
public boolean useClusterjSessionCache() {
17+
return useClusterjSessionCache;
18+
}
19+
}
20+

0 commit comments

Comments
 (0)