6
6
* modify it under the terms of the GNU General Public License
7
7
* as published by the Free Software Foundation; either version 2
8
8
* of the License, or (at your option) any later version.
9
- *
9
+ *
10
10
* This program is distributed in the hope that it will be useful,
11
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
@@ -68,7 +68,7 @@ public class ClusterjConnector implements StorageConnector<DBSession> {
68
68
static final Log LOG = LogFactory .getLog (ClusterjConnector .class );
69
69
private String clusterConnectString ;
70
70
private String databaseName ;
71
-
71
+
72
72
private ClusterjConnector () {
73
73
}
74
74
@@ -82,22 +82,16 @@ public void setConfiguration(Properties conf) throws StorageException {
82
82
LOG .warn ("SessionFactory is already initialized" );
83
83
return ;
84
84
}
85
-
85
+
86
86
clusterConnectString = (String ) conf .get (Constants .PROPERTY_CLUSTER_CONNECTSTRING );
87
87
LOG .info ("Database connect string: " +
88
- conf .get (Constants .PROPERTY_CLUSTER_CONNECTSTRING ));
88
+ conf .get (Constants .PROPERTY_CLUSTER_CONNECTSTRING ));
89
89
databaseName = (String ) conf .get (Constants .PROPERTY_CLUSTER_DATABASE );
90
90
LOG .info ("Database name: " + conf .get (Constants .PROPERTY_CLUSTER_DATABASE ));
91
91
LOG .info ("Max Transactions: " +
92
- conf .get (Constants .PROPERTY_CLUSTER_MAX_TRANSACTIONS ));
93
-
94
- int initialPoolSize =
95
- Integer .parseInt ((String ) conf .get ("io.hops.session.pool.size" ));
96
- int reuseCount =
97
- Integer .parseInt ((String ) conf .get ("io.hops.session.reuse.count" ));
98
- dbSessionProvider =
99
- new DBSessionProvider (conf , reuseCount , initialPoolSize );
100
-
92
+ conf .get (Constants .PROPERTY_CLUSTER_MAX_TRANSACTIONS ));
93
+
94
+ dbSessionProvider = new DBSessionProvider (conf );
101
95
isInitialized = true ;
102
96
}
103
97
@@ -122,7 +116,7 @@ public void returnSession(boolean error) throws StorageException {
122
116
if (dbSession != null ) {
123
117
sessions .remove (); // remove, and return to the pool
124
118
dbSessionProvider .returnSession (dbSession ,
125
- error ); // if there was an error then close the session
119
+ error ); // if there was an error then close the session
126
120
}
127
121
}
128
122
@@ -164,7 +158,7 @@ public void commit() throws StorageException {
164
158
returnSession (dbError );
165
159
}
166
160
}
167
-
161
+
168
162
/**
169
163
* It rolls back only when the transaction is active.
170
164
*/
@@ -206,7 +200,7 @@ public boolean formatHDFSStorage() throws StorageException {
206
200
207
201
@ Override
208
202
public boolean formatStorage (Class <? extends EntityDataAccess >... das )
209
- throws StorageException {
203
+ throws StorageException {
210
204
return format (true , das );
211
205
}
212
206
@@ -241,7 +235,7 @@ public void readCommitted() throws StorageException {
241
235
242
236
@ Override
243
237
public void setPartitionKey (Class className , Object key )
244
- throws StorageException {
238
+ throws StorageException {
245
239
Class cls = null ;
246
240
if (className == BlockInfoDataAccess .class ) {
247
241
cls = BlockInfoClusterj .BlockInfoDTO .class ;
@@ -316,7 +310,7 @@ public void setPartitionKey(Class className, Object key)
316
310
session .flush ();
317
311
}
318
312
319
- @ Override
313
+ @ Override
320
314
public boolean formatAllStorageNonTransactional () throws StorageException {
321
315
return formatAll (false );
322
316
}
@@ -333,39 +327,39 @@ public boolean formatHDFSStorageNonTransactional() throws StorageException {
333
327
334
328
private boolean formatYarn (boolean transactional ) throws StorageException {
335
329
return format (transactional , YarnLeDescriptorDataAccess .class , ApplicationStateDataAccess .class ,
336
- ApplicationAttemptStateDataAccess .class , DelegationKeyDataAccess .class , DelegationTokenDataAccess .class ,
337
- ProjectQuotaDataAccess .class , ProjectsDailyCostDataAccess .class , PriceMultiplicatorDataAccess .class ,
338
- ReservationStateDataAccess .class , ConfMutationDataAccess .class , ConfDataAccess .class ,
339
- AppProvenanceDataAccess .class );
330
+ ApplicationAttemptStateDataAccess .class , DelegationKeyDataAccess .class , DelegationTokenDataAccess .class ,
331
+ ProjectQuotaDataAccess .class , ProjectsDailyCostDataAccess .class , PriceMultiplicatorDataAccess .class ,
332
+ ReservationStateDataAccess .class , ConfMutationDataAccess .class , ConfDataAccess .class ,
333
+ AppProvenanceDataAccess .class );
340
334
}
341
-
335
+
342
336
private boolean formatHDFS (boolean transactional ) throws StorageException {
343
337
return format (transactional ,
344
- INodeDataAccess .class , InMemoryInodeDataAccess .class ,
345
- SmallOnDiskInodeDataAccess .class , MediumOnDiskInodeDataAccess .class ,
346
- LargeOnDiskInodeDataAccess .class ,
347
- BlockInfoDataAccess .class , LeaseDataAccess .class ,
348
- LeasePathDataAccess .class , ReplicaDataAccess .class ,
349
- ReplicaUnderConstructionDataAccess .class ,
350
- InvalidateBlockDataAccess .class , ExcessReplicaDataAccess .class ,
351
- PendingBlockDataAccess .class , CorruptReplicaDataAccess .class ,
352
- UnderReplicatedBlockDataAccess .class , HdfsLeDescriptorDataAccess .class ,
353
- DirectoryWithQuotaFeatureDataAccess .class , StorageIdMapDataAccess .class ,
354
- BlockLookUpDataAccess .class , SafeBlocksDataAccess .class ,
355
- MisReplicatedRangeQueueDataAccess .class , QuotaUpdateDataAccess .class ,
356
- EncodingStatusDataAccess .class , BlockChecksumDataAccess .class ,
357
- OngoingSubTreeOpsDataAccess .class ,
358
- MetadataLogDataAccess .class , EncodingJobsDataAccess .class ,
359
- RepairJobsDataAccess .class , UserDataAccess .class , GroupDataAccess .class ,
360
- UserGroupDataAccess .class ,VariableDataAccess .class ,
361
- HashBucketDataAccess .class , StorageDataAccess .class ,
362
- AceDataAccess .class , RetryCacheEntryDataAccess .class , CacheDirectiveDataAccess .class ,
363
- CachePoolDataAccess .class , CachedBlockDataAccess .class ,
364
- ActiveBlockReportsDataAccess .class , XAttrDataAccess .class , EncryptionZoneDataAccess .class ,
365
- FileProvenanceDataAccess .class , FileProvXAttrBufferDataAccess .class ,
366
- LeaseCreationLocksDataAccess .class );
338
+ INodeDataAccess .class , InMemoryInodeDataAccess .class ,
339
+ SmallOnDiskInodeDataAccess .class , MediumOnDiskInodeDataAccess .class ,
340
+ LargeOnDiskInodeDataAccess .class ,
341
+ BlockInfoDataAccess .class , LeaseDataAccess .class ,
342
+ LeasePathDataAccess .class , ReplicaDataAccess .class ,
343
+ ReplicaUnderConstructionDataAccess .class ,
344
+ InvalidateBlockDataAccess .class , ExcessReplicaDataAccess .class ,
345
+ PendingBlockDataAccess .class , CorruptReplicaDataAccess .class ,
346
+ UnderReplicatedBlockDataAccess .class , HdfsLeDescriptorDataAccess .class ,
347
+ DirectoryWithQuotaFeatureDataAccess .class , StorageIdMapDataAccess .class ,
348
+ BlockLookUpDataAccess .class , SafeBlocksDataAccess .class ,
349
+ MisReplicatedRangeQueueDataAccess .class , QuotaUpdateDataAccess .class ,
350
+ EncodingStatusDataAccess .class , BlockChecksumDataAccess .class ,
351
+ OngoingSubTreeOpsDataAccess .class ,
352
+ MetadataLogDataAccess .class , EncodingJobsDataAccess .class ,
353
+ RepairJobsDataAccess .class , UserDataAccess .class , GroupDataAccess .class ,
354
+ UserGroupDataAccess .class ,VariableDataAccess .class ,
355
+ HashBucketDataAccess .class , StorageDataAccess .class ,
356
+ AceDataAccess .class , RetryCacheEntryDataAccess .class , CacheDirectiveDataAccess .class ,
357
+ CachePoolDataAccess .class , CachedBlockDataAccess .class ,
358
+ ActiveBlockReportsDataAccess .class , XAttrDataAccess .class , EncryptionZoneDataAccess .class ,
359
+ FileProvenanceDataAccess .class , FileProvXAttrBufferDataAccess .class ,
360
+ LeaseCreationLocksDataAccess .class );
367
361
}
368
-
362
+
369
363
private boolean formatAll (boolean transactional ) throws StorageException {
370
364
//HDFS
371
365
if (!formatHDFS (transactional )) {
@@ -383,9 +377,22 @@ private boolean formatAll(boolean transactional) throws StorageException {
383
377
}
384
378
385
379
private boolean format (boolean transactional ,
386
- Class <? extends EntityDataAccess >... das ) throws StorageException {
387
-
380
+ Class <? extends EntityDataAccess >... das ) throws StorageException {
381
+
388
382
final int RETRIES = 5 ; // in test
383
+
384
+ // we need to clear the cache objects
385
+ if (!transactional ) {
386
+ // This calls the SQL truncate command which changes the schema ID.
387
+ // After calling truncate we reload the schema to avoid schema ID change
388
+ // exceptions. However, reloading the schema invalidates the
389
+ // objects in the DTO cache in the ClusterJ causing NPEs.
390
+ // Wipe the cache before we call truncate and reload the schema
391
+
392
+ // we clear the cache for all open sessions
393
+ dbSessionProvider .clearCache ();
394
+ }
395
+
389
396
for (int i = 0 ; i < RETRIES ; i ++) {
390
397
try {
391
398
for (Class e : das ) {
@@ -636,9 +643,9 @@ private boolean format(boolean transactional,
636
643
} // end retry loop
637
644
return false ;
638
645
}
639
-
646
+
640
647
private void truncate (boolean transactional , String tableName , Class dtoClass )
641
- throws StorageException , SQLException {
648
+ throws StorageException , SQLException {
642
649
MysqlServerConnector .truncateTable (transactional , tableName );
643
650
644
651
if (!transactional ){ // this means that SQL Truncate is used to empty the tables
@@ -664,5 +671,5 @@ public String getClusterConnectString() {
664
671
public String getDatabaseName () {
665
672
return databaseName ;
666
673
}
667
-
674
+
668
675
}
0 commit comments