diff --git a/schema/schema.sql b/schema/schema.sql index 5c75a950..0859b0b7 100644 --- a/schema/schema.sql +++ b/schema/schema.sql @@ -1,3 +1,6 @@ +CREATE DATABASE IF NOT EXISTS `hop_pushparaj`; +USE `hop_pushparaj`; + delimiter $$ CREATE TABLE `hdfs_block_infos` ( @@ -10,6 +13,7 @@ CREATE TABLE `hdfs_block_infos` ( `time_stamp` bigint(20) DEFAULT NULL, `primary_node_index` int(11) DEFAULT NULL, `block_recovery_id` bigint(20) DEFAULT NULL, + `status` int(11) DEFAULT 1, PRIMARY KEY (`inode_id`,`block_id`) ) ENGINE=ndbcluster DEFAULT CHARSET=latin1 /*!50100 PARTITION BY KEY (inode_id) */$$ @@ -57,6 +61,7 @@ CREATE TABLE `hdfs_inode_attributes` ( `dsquota` bigint(20) DEFAULT NULL, `nscount` bigint(20) DEFAULT NULL, `diskspace` bigint(20) DEFAULT NULL, + `status` int(11) DEFAULT 1, PRIMARY KEY (`inodeId`) ) ENGINE=ndbcluster DEFAULT CHARSET=latin1$$ @@ -82,6 +87,8 @@ CREATE TABLE `hdfs_inodes` ( `subtree_lock_owner` bigint(20) DEFAULT NULL, `meta_enabled` bit(8) DEFAULT b'110000', `size` bigint(20) NOT NULL DEFAULT '0', + `isdeleted` int(11) DEFAULT 0, + `status` int(11) DEFAULT 1, PRIMARY KEY (`parent_id`,`name`), KEY `pidex` (`parent_id`), KEY `inode_idx` (`id`) diff --git a/src/main/java/io/hops/metadata/ndb/NdbStorageFactory.java b/src/main/java/io/hops/metadata/ndb/NdbStorageFactory.java index 524ea157..46ade867 100644 --- a/src/main/java/io/hops/metadata/ndb/NdbStorageFactory.java +++ b/src/main/java/io/hops/metadata/ndb/NdbStorageFactory.java @@ -50,6 +50,8 @@ import io.hops.metadata.hdfs.dal.StorageIdMapDataAccess; import io.hops.metadata.hdfs.dal.UnderReplicatedBlockDataAccess; import io.hops.metadata.hdfs.dal.VariableDataAccess; +import io.hops.metadata.rollBack.dal.RollBackAccess; +import io.hops.metadata.ndb.dalimpl.rollBack.RollBackImpl; import io.hops.metadata.ndb.dalimpl.election.HdfsLeaderClusterj; import io.hops.metadata.ndb.dalimpl.election.YarnLeaderClusterj; import io.hops.metadata.ndb.dalimpl.hdfs.AccessTimeLogClusterj; @@ -293,6 +295,7 @@ private void initDataAccessMap() { .put(NextHeartbeatDataAccess.class, new NextHeartbeatClusterJ()); dataAccessMap.put(RMLoadDataAccess.class, new RMLoadClusterJ()); dataAccessMap.put(FullRMNodeDataAccess.class, new FullRMNodeClusterJ()); + dataAccessMap.put(RollBackAccess.class,new RollBackImpl()); dataAccessMap.put(MetadataLogDataAccess.class, new MetadataLogClusterj()); dataAccessMap.put(AccessTimeLogDataAccess.class, new AccessTimeLogClusterj()); diff --git a/src/main/java/io/hops/metadata/ndb/dalimpl/hdfs/BlockInfoClusterj.java b/src/main/java/io/hops/metadata/ndb/dalimpl/hdfs/BlockInfoClusterj.java index 979c79e0..660d59de 100644 --- a/src/main/java/io/hops/metadata/ndb/dalimpl/hdfs/BlockInfoClusterj.java +++ b/src/main/java/io/hops/metadata/ndb/dalimpl/hdfs/BlockInfoClusterj.java @@ -95,8 +95,13 @@ public interface BlockInfoDTO { long getBlockRecoveryId(); void setBlockRecoveryId(long recoveryId); - } - private ClusterjConnector connector = ClusterjConnector.getInstance(); + @Column(name=STATUS) + + int getStatus(); + + void setStatus(int status); + } + private ClusterjConnector connector = ClusterjConnector.getInstance(); private final static int NOT_FOUND_ROW = -1000; @Override @@ -323,7 +328,7 @@ private BlockInfo createBlockInfo(BlockInfoClusterj.BlockInfoDTO bDTO) { new BlockInfo(bDTO.getBlockId(), bDTO.getBlockIndex(), bDTO.getINodeId(), bDTO.getNumBytes(), bDTO.getGenerationStamp(), bDTO.getBlockUCState(), bDTO.getTimestamp(), - bDTO.getPrimaryNodeIndex(), bDTO.getBlockRecoveryId()); + bDTO.getPrimaryNodeIndex(), bDTO.getBlockRecoveryId(),bDTO.getStatus()); return hopBlockInfo; } @@ -338,5 +343,6 @@ private void createPersistable(BlockInfo block, persistable.setBlockUCState(block.getBlockUCState()); persistable.setPrimaryNodeIndex(block.getPrimaryNodeIndex()); persistable.setBlockRecoveryId(block.getBlockRecoveryId()); + persistable.setStatus(block.getStatus()); } } diff --git a/src/main/java/io/hops/metadata/ndb/dalimpl/hdfs/INodeAttributesClusterj.java b/src/main/java/io/hops/metadata/ndb/dalimpl/hdfs/INodeAttributesClusterj.java index a4b8bc05..66236dbc 100644 --- a/src/main/java/io/hops/metadata/ndb/dalimpl/hdfs/INodeAttributesClusterj.java +++ b/src/main/java/io/hops/metadata/ndb/dalimpl/hdfs/INodeAttributesClusterj.java @@ -65,8 +65,12 @@ public interface INodeAttributesDTO { long getDiskspace(); void setDiskspace(long diskspace); - } + + @Column(name = STATUS) + int getStatus(); + void setStatus(int status); + } private ClusterjConnector connector = ClusterjConnector.getInstance(); @Override @@ -141,6 +145,7 @@ private INodeAttributesDTO createPersistable(INodeAttributes attribute, dto.setNSCount(attribute.getNsCount()); dto.setDSQuota(attribute.getDsQuota()); dto.setDiskspace(attribute.getDiskspace()); + dto.setStatus(attribute.getStatus()); return dto; } @@ -150,7 +155,7 @@ private INodeAttributes makeINodeAttributes(INodeAttributesDTO dto) { } INodeAttributes iNodeAttributes = new INodeAttributes(dto.getId(), dto.getNSQuota(), dto.getNSCount(), - dto.getDSQuota(), dto.getDiskspace()); + dto.getDSQuota(), dto.getDiskspace(),dto.getStatus()); return iNodeAttributes; } } \ No newline at end of file diff --git a/src/main/java/io/hops/metadata/ndb/dalimpl/hdfs/INodeClusterj.java b/src/main/java/io/hops/metadata/ndb/dalimpl/hdfs/INodeClusterj.java index dbcce289..3e2c3e0a 100644 --- a/src/main/java/io/hops/metadata/ndb/dalimpl/hdfs/INodeClusterj.java +++ b/src/main/java/io/hops/metadata/ndb/dalimpl/hdfs/INodeClusterj.java @@ -29,6 +29,7 @@ import io.hops.metadata.hdfs.entity.INode; import io.hops.metadata.hdfs.entity.INodeIdentifier; import io.hops.metadata.hdfs.entity.ProjectedINode; +import io.hops.metadata.hdfs.snapshots.SnapShotConstants; import io.hops.metadata.ndb.ClusterjConnector; import io.hops.metadata.ndb.NdbBoolean; import io.hops.metadata.ndb.mysqlserver.HopsSQLExceptionHelper; @@ -157,6 +158,16 @@ public interface InodeDTO { long getSize(); void setSize(long size); + + @Column(name = ISDELETED) + int getIsDeleted(); + + void setIsDeleted(int isdeleted); + + @Column(name = STATUS) + int getStatus(); + + void setStatus(int Status); } private ClusterjConnector connector = ClusterjConnector.getInstance(); @@ -236,7 +247,7 @@ public List indexScanFindInodesByParentId(int parentId) qb.createQueryDefinition(InodeDTO.class); HopsPredicate pred1 = dobj.get("parentId").equal(dobj.param("parentIDParam")); - dobj.where(pred1); + dobj.where(pred1); HopsQuery query = session.createQuery(dobj); query.setParameter("parentIDParam", parentId); @@ -245,14 +256,74 @@ public List indexScanFindInodesByParentId(int parentId) session.release(results); return inodeList; } - + + @Override + public List indexScanFindInodesByParentIdIncludeDeletes(int parentId) throws StorageException { + try { + + HopsSession session = connector.obtainSession(); + HopsQueryBuilder qb = session.getQueryBuilder(); + HopsQueryDomainType dobj = qb.createQueryDefinition(InodeDTO.class); + HopsPredicate pred1 = dobj.get("parentId").equal(dobj.param("parentIDParam1")); + HopsPredicate pred2 = dobj.get("parentId").equal(dobj.param("parentIDParam2")); + HopsPredicate pred3 = dobj.get("isdeleted").equal(dobj.param("isdeletedParam")); + dobj.where(pred1.or(pred2.and(pred3))); + + HopsQuery query = session.createQuery(dobj); + query.setParameter("parentIDParam1", parentId); + query.setParameter("parentIDParam2", -parentId); + query.setParameter("parentIDParam3", 1); + + List results = query.getResultList(); + explain(query); + return createInodeList(results); + } catch (Exception e) { + throw new StorageException(e); + } + } + @Override + public List findInodesByParentIdForSubTreeOpsWithReadLockIncludeDeletes( + int parentId) throws StorageException { + final String query = String.format( + "SELECT %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,%s,%s FROM %s WHERE %s=%d or (%s=%d and %s=%d) LOCK IN SHARE MODE", + ID, NAME, PARENT_ID, PERMISSION, HEADER, SYMLINK, QUOTA_ENABLED, + UNDER_CONSTRUCTION, SUBTREE_LOCKED, SUBTREE_LOCK_OWNER,STATUS,ISDELETED,TABLE_NAME, + PARENT_ID,PARENT_ID,ISDELETED, parentId,-parentId,SnapShotConstants.isDeleted); + ArrayList resultList; + try { + Connection conn = mysqlConnector.obtainSession(); + PreparedStatement s = conn.prepareStatement(query); + ResultSet result = s.executeQuery(); + resultList = new ArrayList(); + + while (result.next()) { + resultList.add( + new ProjectedINode(result.getInt(ID), result.getInt(PARENT_ID), + result.getString(NAME), result.getBytes(PERMISSION), + result.getLong(HEADER), + result.getString(SYMLINK) == null ? false : true, + result.getBoolean(QUOTA_ENABLED), + result.getBoolean(UNDER_CONSTRUCTION), + result.getBoolean(SUBTREE_LOCKED), + result.getLong(SUBTREE_LOCK_OWNER), + result.getLong(SIZE), + result.getInt(ISDELETED), + result.getInt(STATUS))); + } + } catch (SQLException ex) { + throw HopsSQLExceptionHelper.wrap(ex); + } finally { + mysqlConnector.closeSession(); + } + return resultList; + } @Override public List findInodesForSubtreeOperationsWithWriteLock( int parentId) throws StorageException { final String query = String.format( - "SELECT %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s FROM %s WHERE %s=%d FOR UPDATE ", + "SELECT %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,%s,%s FROM %s WHERE %s=%d FOR UPDATE ", ID, NAME, PARENT_ID, PERMISSION, HEADER, SYMLINK, QUOTA_ENABLED, - UNDER_CONSTRUCTION, SUBTREE_LOCKED, SUBTREE_LOCK_OWNER, SIZE, TABLE_NAME, + UNDER_CONSTRUCTION, SUBTREE_LOCKED, SUBTREE_LOCK_OWNER, SIZE,ISDELETED,STATUS, TABLE_NAME, PARENT_ID, parentId); ArrayList resultList; try { @@ -271,7 +342,9 @@ public List findInodesForSubtreeOperationsWithWriteLock( result.getBoolean(UNDER_CONSTRUCTION), result.getBoolean(SUBTREE_LOCKED), result.getLong(SUBTREE_LOCK_OWNER), - result.getLong(SIZE))); + result.getLong(SIZE), + result.getInt(ISDELETED), + result.getInt(STATUS))); } } catch (SQLException ex) { throw HopsSQLExceptionHelper.wrap(ex); @@ -433,7 +506,7 @@ private INode createInode(InodeDTO persistable) { NdbBoolean.convert(persistable.getSubtreeLocked()), persistable.getSubtreeLockOwner(), NdbBoolean.convert(persistable.getMetaEnabled()), - persistable.getSize()); + persistable.getSize(), persistable.getIsDeleted(),persistable.getStatus()); return node; } @@ -456,6 +529,8 @@ private void createPersistable(INode inode, InodeDTO persistable) { persistable.setSubtreeLockOwner(inode.getSubtreeLockOwner()); persistable.setMetaEnabled(NdbBoolean.convert(inode.isMetaEnabled())); persistable.setSize(inode.getSize()); + persistable.setIsDeleted(inode.getIsDeleted()); + persistable.setStatus(inode.getStatus()); } private void explain(HopsQuery query) { diff --git a/src/main/java/io/hops/metadata/ndb/dalimpl/rollBack/RemoveSnapshotImpl.java b/src/main/java/io/hops/metadata/ndb/dalimpl/rollBack/RemoveSnapshotImpl.java new file mode 100644 index 00000000..c3d50e60 --- /dev/null +++ b/src/main/java/io/hops/metadata/ndb/dalimpl/rollBack/RemoveSnapshotImpl.java @@ -0,0 +1,500 @@ +package io.hops.metadata.ndb.dalimpl.rollBack; + +import io.hops.exception.StorageException; +import io.hops.metadata.hdfs.snapshots.SnapShotConstants; +import io.hops.metadata.ndb.ClusterjConnector; +import io.hops.metadata.ndb.dalimpl.hdfs.BlockInfoClusterj; +import io.hops.metadata.ndb.dalimpl.hdfs.INodeAttributesClusterj; +import io.hops.metadata.ndb.dalimpl.hdfs.INodeClusterj; +import io.hops.metadata.ndb.wrapper.*; +import io.hops.metadata.rollBack.dal.RemoveSnapshotAccess; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import static io.hops.metadata.ndb.dalimpl.rollBack.RollBackImpl.execMySqlQuery; +import io.hops.metadata.hdfs.TablesDef.*; + + +/** + * Copyright (c) 01/08/16 DigitalRoute + * All rights reserved. + * Usage of this program and the accompanying materials is subject to license terms + * + * @author pushparaj.motamari + */ +public class RemoveSnapshotImpl implements RemoveSnapshotAccess { + private static final Log LOG = LogFactory.getLog(RollBackImpl.class); + private static int BUFFER_SIZE = 50000; + private int rootId = 2; + + @Override + public boolean processInodesPhase1() throws IOException { + ExecutorService pool = Executors.newCachedThreadPool(); + int intervalStart; + int minId; + //Remove all back-up rows..i.e ..rows with negative id. + try { + minId = execMySqlQuery("select min(id) from hdfs_inodes where id<0"); + + } catch (StorageException ex) { + // Logger.getLogger(RollBackImpl.class.getName()).log(Level.SEVERE, null, ex); + throw new StorageException(ex); + } + + intervalStart = 0; + InodesPhase1Callable mr; + while (intervalStart > minId) { + mr = new InodesPhase1Callable(intervalStart, intervalStart - BUFFER_SIZE); + intervalStart = intervalStart - BUFFER_SIZE; + pool.submit(mr); + } + + //Wait for completion of above task. + RollBackImpl.shutDownPool(pool); + + //Confirm that the task has done. + int count = 1; + try { + count = execMySqlQuery("select count(*) from hdfs_inodes where id <0 "); + + } catch (StorageException ex) { + //Logger.getLogger(RollBackImpl.class.getName()).log(Level.SEVERE, null, ex); + throw new StorageException(ex); + } + + return count == 0; + } + private static class InodesPhase1Callable implements Callable { + + int startId, endId; + + InodesPhase1Callable(int startId, int endId) { + this.startId = startId; + this.endId = endId; + } + + @Override + public Boolean call() throws StorageException { + HopsSession session = ClusterjConnector.getInstance().obtainSession(); + HopsQueryBuilder qb = session.getQueryBuilder(); + HopsQueryDomainType deleteInodes = qb.createQueryDefinition(INodeClusterj.InodeDTO.class); + + HopsPredicate pred5 = deleteInodes.get(INodeTableDef.ID).lessEqual(deleteInodes.param("startId")); + HopsPredicate pred6 = deleteInodes.get(INodeTableDef.ID).greaterEqual(deleteInodes.param("endId")); + deleteInodes.where(pred6.and(pred5)); + + HopsQuery deleteQuery = session.createQuery(deleteInodes); + + deleteQuery.setParameter("startId", startId); + deleteQuery.setParameter("endId", endId); + + deleteQuery.deletePersistentAll(); + + session.flush(); + session.close(); + return true; + } + } + + @Override + public boolean processInodesPhase2() throws IOException { + ExecutorService pool = Executors.newCachedThreadPool(); + int intervalStart; + int minId,maxId; + // Change all new and modified rows to Original. + try { + maxId = execMySqlQuery("select max(id) from hdfs_inodes where id>0 " + + " and (status ="+SnapShotConstants.Modified+" or status = "+SnapShotConstants.New+")"); + minId = execMySqlQuery("select min(id) from hdfs_inodes where id>0" + + " and (status ="+SnapShotConstants.Modified+" or status = "+SnapShotConstants.New+")"); + + } catch (StorageException ex) { + throw new StorageException(ex); + } + + intervalStart = minId; + InodesPhase2Callable mr; + while (intervalStart < maxId) { + mr = new InodesPhase2Callable(intervalStart, intervalStart + BUFFER_SIZE); + intervalStart = intervalStart + BUFFER_SIZE; + pool.submit(mr); + } + + //Wait for completion of above task. + RollBackImpl.shutDownPool(pool); + + //Confirm that the task has done. + int count = 1; + try { + count = execMySqlQuery("select count(*) from hdfs_inodes where id >0 and (status =2 or status = 3) "); + + } catch (StorageException ex) { + throw new StorageException(ex); + } + + return count == 0; + } + private static class InodesPhase2Callable implements Callable { + + int startId, endId; + + InodesPhase2Callable(int startId, int endId) { + this.startId = startId; + this.endId = endId; + } + + @Override + public Boolean call() throws StorageException { + HopsSession session = ClusterjConnector.getInstance().obtainSession(); + HopsQueryBuilder qb = session.getQueryBuilder(); + HopsQueryDomainType updateInodes = qb.createQueryDefinition(INodeClusterj.InodeDTO.class); + + HopsPredicate pred3 = updateInodes.get(INodeTableDef.STATUS).equal(updateInodes.param("statusParam1")); + HopsPredicate pred4 = updateInodes.get(INodeTableDef.STATUS).equal(updateInodes.param("statusParam2")); + HopsPredicate pred5 = updateInodes.get(INodeTableDef.ID).greaterEqual(updateInodes.param("startId")); + HopsPredicate pred6 = updateInodes.get(INodeTableDef.ID).lessEqual(updateInodes.param("endId")); + updateInodes.where(pred6.and(pred5).and(pred3.or(pred4))); + + HopsQuery updateQuery = session.createQuery(updateInodes); + + updateQuery.setParameter("statusParameter1",SnapShotConstants.Modified); + updateQuery.setParameter("statusParameter2",SnapShotConstants.New); + updateQuery.setParameter("startId", startId); + updateQuery.setParameter("endId", endId); + + List resultList = updateQuery.getResultList(); + + for(INodeClusterj.InodeDTO inode : resultList){ + inode.setStatus(SnapShotConstants.Original); + } + + session.updatePersistentAll(resultList); + session.flush(); + session.close(); + return true; + } + } + + @Override + public boolean processInodeAttributesPhase1() throws IOException { + ExecutorService pool = Executors.newCachedThreadPool(); + int intervalStart; + int minId; + //Remove all back-up rows..i.e ..rows with negative id. + try { + minId = execMySqlQuery("select min("+INodeAttributesTableDef.ID +") from hdfs_inode_attributes where "+INodeAttributesTableDef.ID +"<0"); + + } catch (StorageException ex) { + // Logger.getLogger(RollBackImpl.class.getName()).log(Level.SEVERE, null, ex); + throw new StorageException(ex); + } + + intervalStart = 0; + InodeAttributesPhase1Callable mr; + while (intervalStart > minId) { + mr = new InodeAttributesPhase1Callable(intervalStart, intervalStart - BUFFER_SIZE); + intervalStart = intervalStart - BUFFER_SIZE; + pool.submit(mr); + } + + //Wait for completion of above task. + RollBackImpl.shutDownPool(pool); + + //Confirm that the task has done. + int count = 1; + try { + count = execMySqlQuery("select count(*) from hdfs_inode_attributes where "+INodeAttributesTableDef.ID +" <0 "); + + } catch (StorageException ex) { + //Logger.getLogger(RollBackImpl.class.getName()).log(Level.SEVERE, null, ex); + throw new StorageException(ex); + } + + return count == 0; + } + private static class InodeAttributesPhase1Callable implements Callable { + + int startId, endId; + + InodeAttributesPhase1Callable(int startId, int endId) { + this.startId = startId; + this.endId = endId; + } + + @Override + public Boolean call() throws StorageException { + HopsSession session = ClusterjConnector.getInstance().obtainSession(); + HopsQueryBuilder qb = session.getQueryBuilder(); + HopsQueryDomainType deleteInodeAttributes = qb.createQueryDefinition(INodeAttributesClusterj.INodeAttributesDTO.class); + + HopsPredicate pred5 = deleteInodeAttributes.get(INodeAttributesTableDef.ID).lessEqual(deleteInodeAttributes.param("startId")); + HopsPredicate pred6 = deleteInodeAttributes.get(INodeAttributesTableDef.ID).greaterEqual(deleteInodeAttributes.param("endId")); + deleteInodeAttributes.where(pred6.and(pred5)); + + HopsQuery deleteQuery = session.createQuery(deleteInodeAttributes); + + deleteQuery.setParameter("startId", startId); + deleteQuery.setParameter("endId", endId); + + deleteQuery.deletePersistentAll(); + + session.flush(); + session.close(); + return true; + } + } + + @Override + public boolean processInodeAttributesPhase2() throws IOException { + ExecutorService pool = Executors.newCachedThreadPool(); + int intervalStart; + int minId,maxId; + // Change all new and modified rows to Original. + try { + maxId = execMySqlQuery("select max("+INodeAttributesTableDef.ID +") from hdfs_inode_attributes where "+INodeAttributesTableDef.ID +">0" + + " and (status ="+SnapShotConstants.Modified+" or status = "+SnapShotConstants.New+")"); + minId = execMySqlQuery("select min("+INodeAttributesTableDef.ID +") from hdfs_inode_attributes where "+INodeAttributesTableDef.ID +">0 " + + " and (status ="+SnapShotConstants.Modified+" or status = "+SnapShotConstants.New+")"); + } catch (StorageException ex) { + throw new StorageException(ex); + } + + intervalStart = minId; + InodeAttributesPhase2Callable mr; + while (intervalStart < maxId) { + mr = new InodeAttributesPhase2Callable(intervalStart, intervalStart + BUFFER_SIZE); + intervalStart = intervalStart + BUFFER_SIZE; + pool.submit(mr); + } + + //Wait for completion of above task. + RollBackImpl.shutDownPool(pool); + + //Confirm that the task has done. + int count = 1; + try { + count = execMySqlQuery("select count(*) from hdfs_inode_attributes where "+INodeAttributesTableDef.ID +" >0 and (status =2 or status = 3) "); + + } catch (StorageException ex) { + throw new StorageException(ex); + } + + return count == 0; + } + + private static class InodeAttributesPhase2Callable implements Callable { + + int startId, endId; + + InodeAttributesPhase2Callable(int startId, int endId) { + this.startId = startId; + this.endId = endId; + } + + @Override + public Boolean call() throws StorageException { + HopsSession session = ClusterjConnector.getInstance().obtainSession(); + HopsQueryBuilder qb = session.getQueryBuilder(); + HopsQueryDomainType updateInodeAttributes = qb.createQueryDefinition(INodeAttributesClusterj.INodeAttributesDTO.class); + + HopsPredicate pred3 = updateInodeAttributes.get(INodeAttributesTableDef.STATUS).equal(updateInodeAttributes.param("statusParam1")); + HopsPredicate pred4 = updateInodeAttributes.get(INodeAttributesTableDef.STATUS).equal(updateInodeAttributes.param("statusParam2")); + HopsPredicate pred5 = updateInodeAttributes.get(INodeAttributesTableDef.ID).greaterEqual(updateInodeAttributes.param("startId")); + HopsPredicate pred6 = updateInodeAttributes.get(INodeAttributesTableDef.ID).lessEqual(updateInodeAttributes.param("endId")); + updateInodeAttributes.where(pred6.and(pred5).and(pred3.or(pred4))); + + HopsQuery updateQuery = session.createQuery(updateInodeAttributes); + + updateQuery.setParameter("statusParameter1",SnapShotConstants.Modified); + updateQuery.setParameter("statusParameter2",SnapShotConstants.New); + updateQuery.setParameter("startId", startId); + updateQuery.setParameter("endId", endId); + + List resultList = updateQuery.getResultList(); + + for(INodeAttributesClusterj.INodeAttributesDTO inodeAttribute : resultList){ + inodeAttribute.setStatus(SnapShotConstants.Original); + } + + session.updatePersistentAll(resultList); + session.flush(); + session.close(); + return true; + } + } + + public boolean processBlocksPhase1() throws IOException { + ExecutorService pool = Executors.newCachedThreadPool(); + int intervalStart; + int minId; + //Remove all back-up rows..i.e ..rows with negative id. + try { + minId = execMySqlQuery("select min("+BlockInfoTableDef.BLOCK_ID +") from hdfs_block_infos where "+BlockInfoTableDef.BLOCK_ID +"<0"); + } catch (StorageException ex) { + // Logger.getLogger(RollBackImpl.class.getName()).log(Level.SEVERE, null, ex); + throw new StorageException(ex); + } + + intervalStart = 0; + processBlocksPhase1Callable mr; + while (intervalStart > minId) { + mr = new processBlocksPhase1Callable(intervalStart, intervalStart - BUFFER_SIZE); + intervalStart = intervalStart - BUFFER_SIZE; + pool.submit(mr); + } + + //Wait for completion of above task. + RollBackImpl.shutDownPool(pool); + + //Confirm that the task has done. + int count = 1; + try { + count = execMySqlQuery("select count(*) from hdfs_block_infos where "+BlockInfoTableDef.BLOCK_ID +" <0 "); + + } catch (StorageException ex) { + //Logger.getLogger(RollBackImpl.class.getName()).log(Level.SEVERE, null, ex); + throw new StorageException(ex); + } + + return count == 0; + } + private static class processBlocksPhase1Callable implements Callable { + + int startId, endId; + + processBlocksPhase1Callable(int startId, int endId) { + this.startId = startId; + this.endId = endId; + } + + @Override + public Boolean call() throws StorageException { + HopsSession session = ClusterjConnector.getInstance().obtainSession(); + HopsQueryBuilder qb = session.getQueryBuilder(); + HopsQueryDomainType deleteBlocks = qb.createQueryDefinition(BlockInfoClusterj.BlockInfoDTO.class); + + HopsPredicate pred5 = deleteBlocks.get(BlockInfoTableDef.BLOCK_ID).lessEqual(deleteBlocks.param("startId")); + HopsPredicate pred6 = deleteBlocks.get(BlockInfoTableDef.BLOCK_ID).greaterEqual(deleteBlocks.param("endId")); + deleteBlocks.where(pred6.and(pred5)); + + HopsQuery deleteQuery = session.createQuery(deleteBlocks); + + deleteQuery.setParameter("startId", startId); + deleteQuery.setParameter("endId", endId); + + deleteQuery.deletePersistentAll(); + + session.flush(); + session.close(); + return true; + } + } + + @Override + public boolean processBlocksPhase2() throws IOException { + ExecutorService pool = Executors.newCachedThreadPool(); + int intervalStart; + int minId,maxId; + // Change all new and modified rows to Original. + try { + maxId = execMySqlQuery("select max("+BlockInfoTableDef.BLOCK_ID +") from hdfs_block_infos where "+BlockInfoTableDef.BLOCK_ID +">0" + + " and (status ="+SnapShotConstants.Modified+" or status = "+SnapShotConstants.New+")"); + minId = execMySqlQuery("select min("+BlockInfoTableDef.BLOCK_ID +") from hdfs_block_infos where "+BlockInfoTableDef.BLOCK_ID +">0 " + + " and (status ="+SnapShotConstants.Modified+" or status = "+SnapShotConstants.New+")"); + } catch (StorageException ex) { + throw new StorageException(ex); + } + + intervalStart = minId; + processBlocksPhase2Callable mr; + while (intervalStart < maxId) { + mr = new processBlocksPhase2Callable(intervalStart, intervalStart + BUFFER_SIZE); + intervalStart = intervalStart + BUFFER_SIZE; + pool.submit(mr); + } + + //Wait for completion of above task. + RollBackImpl.shutDownPool(pool); + + //Confirm that the task has done. + int count = 1; + try { + count = execMySqlQuery("select count(*) from hdfs_block_infos where "+BlockInfoTableDef.BLOCK_ID +" >0 " + + "and ( "+BlockInfoTableDef.STATUS +" =2 or "+BlockInfoTableDef.STATUS +" = 3) "); + + } catch (StorageException ex) { + throw new StorageException(ex); + } + + return count == 0; + } + private static class processBlocksPhase2Callable implements Callable { + + int startId, endId; + + processBlocksPhase2Callable(int startId, int endId) { + this.startId = startId; + this.endId = endId; + } + + @Override + public Boolean call() throws StorageException { + HopsSession session = ClusterjConnector.getInstance().obtainSession(); + HopsQueryBuilder qb = session.getQueryBuilder(); + HopsQueryDomainType updateBlockInfos = qb.createQueryDefinition(BlockInfoClusterj.BlockInfoDTO.class); + + HopsPredicate pred3 = updateBlockInfos.get(BlockInfoTableDef.STATUS).equal(updateBlockInfos.param("statusParam1")); + HopsPredicate pred4 = updateBlockInfos.get(BlockInfoTableDef.STATUS).equal(updateBlockInfos.param("statusParam2")); + HopsPredicate pred5 = updateBlockInfos.get(BlockInfoTableDef.BLOCK_ID).greaterEqual(updateBlockInfos.param("startId")); + HopsPredicate pred6 = updateBlockInfos.get(BlockInfoTableDef.BLOCK_ID).lessEqual(updateBlockInfos.param("endId")); + updateBlockInfos.where(pred6.and(pred5).and(pred3.or(pred4))); + + HopsQuery updateQuery = session.createQuery(updateBlockInfos); + + updateQuery.setParameter("statusParameter1",SnapShotConstants.Modified); + updateQuery.setParameter("statusParameter2",SnapShotConstants.New); + updateQuery.setParameter("startId", startId); + updateQuery.setParameter("endId", endId); + + List resultList = updateQuery.getResultList(); + + for(BlockInfoClusterj.BlockInfoDTO inodeAttribute : resultList){ + inodeAttribute.setStatus(SnapShotConstants.Original); + } + + session.updatePersistentAll(resultList); + session.flush(); + session.close(); + return true; + } + } + public boolean waitForSubTreeOperations() throws IOException { + String query = "select count(*) from hdfs_on_going_sub_tree_ops"; + long startTime = System.currentTimeMillis(); + while (System.currentTimeMillis() - startTime < 60 * 1000) { + int onGoingSubTreeOpsCount = execMySqlQuery(query); + if (onGoingSubTreeOpsCount == 0) { + return true; + } + } + return false; + } + + public boolean waitForQuotaUpdates() throws IOException { + String query = "select count(*) from hdfs_quota_update"; + long startTime = System.currentTimeMillis(); + while (System.currentTimeMillis() - startTime < 60 * 1000) { + int quotaUpdatesYetToBeApplied = execMySqlQuery(query); + if (quotaUpdatesYetToBeApplied == 0) { + return true; + } + } + return false; + } + +} diff --git a/src/main/java/io/hops/metadata/ndb/dalimpl/rollBack/RollBackImpl.java b/src/main/java/io/hops/metadata/ndb/dalimpl/rollBack/RollBackImpl.java new file mode 100644 index 00000000..096bb13d --- /dev/null +++ b/src/main/java/io/hops/metadata/ndb/dalimpl/rollBack/RollBackImpl.java @@ -0,0 +1,900 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ +package io.hops.metadata.ndb.dalimpl.rollBack; + +import io.hops.exception.StorageException; +import io.hops.metadata.hdfs.TablesDef; +import io.hops.metadata.hdfs.entity.INodeAttributes; +import io.hops.metadata.hdfs.snapshots.SnapShotConstants; +import io.hops.metadata.ndb.ClusterjConnector; +import io.hops.metadata.ndb.dalimpl.hdfs.BlockInfoClusterj.BlockInfoDTO; +import io.hops.metadata.ndb.dalimpl.hdfs.INodeAttributesClusterj.INodeAttributesDTO; +import io.hops.metadata.ndb.dalimpl.hdfs.INodeClusterj.InodeDTO; +import io.hops.metadata.ndb.mysqlserver.MysqlServerConnector; +import io.hops.metadata.ndb.wrapper.*; +import io.hops.metadata.rollBack.dal.RollBackAccess; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import io.hops.metadata.hdfs.TablesDef.*; +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +/** + * + * @author pushparaj + */ +public class RollBackImpl implements RollBackAccess { + + private static final Log LOG = LogFactory.getLog(RollBackImpl.class); + private static int BUFFER_SIZE = 50000; + private int rootId = 2; + + @Override + public boolean processInodesPhase1() throws IOException { + ExecutorService pool = Executors.newCachedThreadPool(); + int intervalStart; + int maxId = 0; + int minId; + //Delete all rows with status=2 or status=3 + + try { + maxId = execMySqlQuery("select max(id) from hdfs_inodes where status=2 or status=3"); + minId = execMySqlQuery("select min(id) from hdfs_inodes where status=2 or status=3"); + + } catch (StorageException ex) { + // Logger.getLogger(RollBackImpl.class.getName()).log(Level.SEVERE, null, ex); + throw new StorageException(ex); + } + + intervalStart = minId; + InodesPhase1Callable dr; + while (intervalStart <= maxId) { + dr = new InodesPhase1Callable(intervalStart, intervalStart + BUFFER_SIZE); + intervalStart = intervalStart + BUFFER_SIZE; + pool.submit(dr); + } + + //Wait for completion of above task. + shutDownPool(pool); + + //Confirm that the task has done. + int count = 1; + try { + count = execMySqlQuery("select count(*) from hdfs_inodes where status=2 or status=3"); + + } catch (StorageException ex) { + // Logger.getLogger(RollBackImpl.class.getName()).log(Level.SEVERE, null, ex); + throw new StorageException(ex); + } + + return count == 0; + } + + @Override + public boolean processInodesPhase2() throws IOException { + ExecutorService pool = Executors.newCachedThreadPool(); + int intervalStart = 0; + int minId = 0; + //Update all rows with id>0 and isDeleted=1 to isDeleted=0 + + try { + minId = execMySqlQuery("select min(id) from hdfs_inodes where id <0 and isDeleted=1"); + } catch (StorageException ex) { + //LOG.error(ex, ex); + throw new StorageException(ex); + } + + + intervalStart = 0; + InodesPhase2Callable mr; + while (intervalStart > minId) { + mr = new InodesPhase2Callable(intervalStart, intervalStart - BUFFER_SIZE); + intervalStart = intervalStart - BUFFER_SIZE; + pool.submit(mr); + } + + //Wait for completion of above task. + shutDownPool(pool); + + //Confirm that the task has done. + int count = 1; + try { + count = execMySqlQuery("select count(*) from hdfs_inodes where id <0 and isDeleted=1"); + + } catch (StorageException ex) { + //Logger.getLogger(RollBackImpl.class.getName()).log(Level.SEVERE, null, ex); + throw new StorageException(ex); + } + + return count == 0; + } + + static int execMySqlQuery(String query) throws StorageException { + MysqlServerConnector connector = MysqlServerConnector.getInstance(); + + try { + Connection conn = connector.obtainSession(); + PreparedStatement s = conn.prepareStatement(query); + ResultSet result = s.executeQuery(); + if (result.next()) { + return result.getInt(1); + } else { + throw new StorageException( + String.format("Count result set is empty. Query: %s", query)); + } + } catch (SQLException ex) { + throw new StorageException(ex); + } finally { + connector.closeSession(); + } + + + } + + static void shutDownPool(ExecutorService pool) { + //Wait for completion of above task. + pool.shutdown(); + try { + //pool.awaitTermination(10, TimeUnit.MINUTES); + while (!pool.isTerminated()) { + Thread.sleep(500); + } + ; + } catch (InterruptedException ex) { + LOG.error(ex, ex); + } + + } + + @Override + public boolean processInodesPhase3() throws IOException { + //Insert new row for each backup row with [new row's id]=-[back-up row's id] and [new row's parentid]=-[back-up row's parentid] + + ExecutorService pool = Executors.newCachedThreadPool(); + int intervalStart = 0; + int minId = 0; + + try { + //Get total number of backup records. + minId = execMySqlQuery("select min(id) from hdfs_inodes where id <0 "); + LOG.debug("The minimum id id " + minId); + //System.out.println(backUpRowsCnt); + } catch (StorageException ex) { + LOG.error(ex, ex); + } + + InodesPhase3Callable ir; + + Lock lock = new Lock(); + + while (intervalStart >= minId) { + ir = new InodesPhase3Callable(intervalStart, intervalStart - BUFFER_SIZE, lock); + //doInsert(intervalStart, intervalStart-BUFFER_SIZE); + intervalStart = intervalStart - BUFFER_SIZE; + + pool.submit(ir); + } + + //Wait for completion of above task. + shutDownPool(pool); + + //Confirm that the task has done. + int count = 1; + try { + count = execMySqlQuery("select count(*) from hdfs_inodes where id<0"); + + } catch (StorageException ex) { + // Logger.getLogger(RollBackImpl.class.getName()).log(Level.SEVERE, null, ex); + throw new StorageException(ex); + } + + return count == 0; + } + + @Override + public boolean processInodeAttributesPhase1() throws IOException { + ExecutorService pool = Executors.newCachedThreadPool(); + int intervalStart; + int maxId = 0; + //Delete all rows with status=2 or status=3 + + try { + maxId = execMySqlQuery("select max(inodeId) from hdfs_inode_attributes where status=2 or status=3"); + + } catch (StorageException ex) { + // Logger.getLogger(RollBackImpl.class.getName()).log(Level.SEVERE, null, ex); + throw new StorageException(ex); + } + + intervalStart = 0; + InodeAttributesPhase1Callable dr; + while (intervalStart <= maxId) { + dr = new InodeAttributesPhase1Callable(intervalStart, intervalStart + BUFFER_SIZE); + intervalStart = intervalStart + BUFFER_SIZE; + pool.submit(dr); + } + + //Wait for completion of above task. + shutDownPool(pool); + + //Confirm that the task has done. + int count = 1; + try { + count = execMySqlQuery("select count(*) from hdfs_inode_attributes where status=2 or status=3"); + + } catch (StorageException ex) { + // Logger.getLogger(RollBackImpl.class.getName()).log(Level.SEVERE, null, ex); + throw new StorageException(ex); + } + + return count == 0; + } + + @Override + public boolean processInodeAttributesPhase2() throws IOException { + //Insert new row for each backup row with [new row's id]=-[back-up row's id] and [new row's parentid]=-[back-up row's parentid] + + ExecutorService pool = Executors.newCachedThreadPool(); + int intervalStart = 0; + int minId = 0; + + try { + //Get total number of backup records. + minId = execMySqlQuery("select min(inodeId) from hdfs_inode_attributes where inodeId <0 "); + //System.out.println(backUpRowsCnt); + } catch (StorageException ex) { + LOG.error(ex, ex); + } + + InodeAttributesPhase2Callable ir; + + Lock lock = new Lock(); + + while (intervalStart >= minId) { + ir = new InodeAttributesPhase2Callable(intervalStart, intervalStart - BUFFER_SIZE, lock); + //doInsert(intervalStart, intervalStart-BUFFER_SIZE); + intervalStart = intervalStart - BUFFER_SIZE; + + pool.submit(ir); + } + + //Wait for completion of above task. + shutDownPool(pool); + + //Confirm that the task has done. + int count = 1; + try { + count = execMySqlQuery("select count(*) from hdfs_inode_attributes where inodeId<0"); + + } catch (StorageException ex) { + //Logger.getLogger(RollBackImpl.class.getName()).log(Level.SEVERE, null, ex); + throw new StorageException(ex); + } + + return count == 0; + + } + + @Override + public boolean processBlocksPhase1() throws IOException { + ExecutorService pool = Executors.newCachedThreadPool(); + int intervalStart; + int maxId = 0; + //Delete all rows with status=2 or status=3 + + try { + maxId = execMySqlQuery("select max(block_id) from hdfs_block_infos where status=2 or status=3"); + + } catch (StorageException ex) { + //Logger.getLogger(RollBackImpl.class.getName()).log(Level.SEVERE, null, ex); + throw new StorageException(ex); + } + + intervalStart = 0; + BlocksPhase1Callable dr; + while (intervalStart <= maxId) { + dr = new BlocksPhase1Callable(intervalStart, intervalStart + BUFFER_SIZE); + intervalStart = intervalStart + BUFFER_SIZE; + pool.submit(dr); + } + + //Wait for completion of above task. + shutDownPool(pool); + + //Confirm that the task has done. + int count = 1; + try { + count = execMySqlQuery("select count(*) from hdfs_block_infos where status=2 or status=3"); + + } catch (StorageException ex) { + //Logger.getLogger(RollBackImpl.class.getName()).log(Level.SEVERE, null, ex); + throw new StorageException(ex); + } + + return count == 0; + } + + @Override + public boolean processBlocksPhase2() throws IOException { + //Insert new row for each backup row with [new row's id]=-[back-up row's id] and [new row's parentid]=-[back-up row's parentid] + ExecutorService pool = Executors.newCachedThreadPool(); + int intervalStart = 0; + int minId = 0; + + try { + //Get total number of backup records. + minId = execMySqlQuery("select min(block_id) from hdfs_block_infos where block_id <0 "); + //System.out.println(backUpRowsCnt); + } catch (StorageException ex) { + LOG.error(ex, ex); + } + + BlocksPhase2Callable ir; + Lock lock = new Lock(); + + while (intervalStart >= minId) { + ir = new BlocksPhase2Callable(intervalStart, intervalStart - BUFFER_SIZE, lock); + //doInsert(intervalStart, intervalStart-BUFFER_SIZE); + intervalStart = intervalStart - BUFFER_SIZE; + pool.submit(ir); + } + + //Wait for completion of above task. + shutDownPool(pool); + + //Confirm that the task has done. + int count = 1; + try { + count = execMySqlQuery("select count(*) from hdfs_block_infos where block_id<0"); + + } catch (StorageException ex) { + //Logger.getLogger(RollBackImpl.class.getName()).log(Level.SEVERE, null, ex); + throw new StorageException(ex); + } + + return count == 0; + } + + @Override + public boolean unSetSubTreeLockedOnRoot() throws IOException { + HopsSession session = ClusterjConnector.getInstance().obtainSession(); + int maxId; + + maxId = execMySqlQuery("select max(id) from hdfs_inodes where id >0"); + + InodeDTO oldRoot = session.find(InodeDTO.class, new Integer(maxId)); + + InodeDTO newRoot = session.newInstance(InodeDTO.class); + + newRoot.setId(rootId); + newRoot.setName(oldRoot.getName()); + newRoot.setParentId(-oldRoot.getParentId()); + newRoot.setQuotaEnabled(oldRoot.getQuotaEnabled()); + newRoot.setModificationTime(oldRoot.getModificationTime()); + newRoot.setATime(oldRoot.getATime()); + newRoot.setPermission(oldRoot.getPermission()); + newRoot.setUnderConstruction(oldRoot.getUnderConstruction()); + newRoot.setClientName(oldRoot.getClientName()); + newRoot.setClientMachine(oldRoot.getClientMachine()); + newRoot.setClientNode(oldRoot.getClientNode()); + newRoot.setGenerationStamp(oldRoot.getGenerationStamp()); + newRoot.setHeader(oldRoot.getHeader()); + newRoot.setSymlink(oldRoot.getSymlink()); + newRoot.setSubtreeLocked((byte) 0);//UnsetSubTreeLockedToTree. + newRoot.setSubtreeLockOwner(oldRoot.getSubtreeLockOwner()); + newRoot.setMetaEnabled(oldRoot.getMetaEnabled()); + newRoot.setSize(oldRoot.getSize()); + newRoot.setIsDeleted(oldRoot.getIsDeleted()); + newRoot.setStatus(oldRoot.getStatus()); + + session.persist(newRoot);//Persist the new root; + session.deletePersistent(InodeDTO.class, new Integer(maxId));//delete the old Root row. + + session.flush(); + session.close(); + + return true; + } + + @Override + public boolean waitForSubTreeOperations() throws IOException { + String query = "select count(*) from hdfs_on_going_sub_tree_ops"; + long startTime = System.currentTimeMillis(); + while (System.currentTimeMillis() - startTime < 60 * 1000) { + int onGoingSubTreeOpsCount = execMySqlQuery(query); + if (onGoingSubTreeOpsCount == 0) { + return true; + } + } + return false; + } + + @Override + public boolean waitForQuotaUpdates() throws IOException { + String query = "select count(*) from hdfs_quota_update"; + long startTime = System.currentTimeMillis(); + while (System.currentTimeMillis() - startTime < 60 * 1000) { + int quotaUpdatesYetToBeApplied = execMySqlQuery(query); + if (quotaUpdatesYetToBeApplied == 0) { + return true; + } + } + return false; + } + + private static class BlocksPhase1Callable implements Callable { + + long startId, endId; + + BlocksPhase1Callable(long startId, long endId) { + this.startId = startId; + this.endId = endId; + } + + @Override + public Boolean call() throws StorageException { + + HopsSession session = ClusterjConnector.getInstance().obtainSession(); + + HopsQueryBuilder qb = session.getQueryBuilder(); + //Delete inodes with status=2 or status=3 + HopsQueryDomainType deleteBlockInfos = qb.createQueryDefinition(BlockInfoDTO.class); + HopsPredicate pred1 = deleteBlockInfos.get(BlockInfoTableDef.STATUS).equal(deleteBlockInfos.param("statusParam2")); + HopsPredicate pred2 = deleteBlockInfos.get(BlockInfoTableDef.STATUS).equal(deleteBlockInfos.param("statusParam3")); + HopsPredicate pred3 = deleteBlockInfos.get(BlockInfoTableDef.BLOCK_ID).greaterEqual(deleteBlockInfos.param("startId")); + HopsPredicate pred4 = deleteBlockInfos.get(BlockInfoTableDef.BLOCK_ID).lessEqual(deleteBlockInfos.param("endId")); + HopsPredicate pred5 = pred1.or(pred2); + deleteBlockInfos.where(pred5.and(pred3).and(pred4)); + HopsQuery query = session.createQuery(deleteBlockInfos); + query.setParameter("statusParam2", SnapShotConstants.Modified); + query.setParameter("statusParam3", SnapShotConstants.New); + query.setParameter("startId", startId); + query.setParameter("endId", endId); + + + query.deletePersistentAll(); + + // tx.commit(); + session.flush(); + session.close(); + return true; + } + } + + private static class BlocksPhase2Callable implements Callable { + + long startId, endId; + final Lock lock; + + BlocksPhase2Callable(long startId, long endId, Lock lock) { + this.startId = startId; + this.endId = endId; + this.lock = lock; + } + + @Override + public Boolean call() throws StorageException, IOException { + try { + + HopsSession session = ClusterjConnector.getInstance().obtainSession(); +// Transaction tx = session.currentTransaction(); +// tx.begin(); + //System.err.println(Thread.currentThread().getId() + ": Started. StartId=" + startId + ", endId=" + endId); + + HopsQueryBuilder qb = session.getQueryBuilder(); + HopsQueryDomainType updateBlockNodes = qb.createQueryDefinition(BlockInfoDTO.class); + HopsPredicate pred3 = updateBlockNodes.get(BlockInfoTableDef.BLOCK_ID).lessEqual(updateBlockNodes.param("startId")); + HopsPredicate pred4 = updateBlockNodes.get(BlockInfoTableDef.BLOCK_ID).greaterEqual(updateBlockNodes.param("endId")); + + updateBlockNodes.where(pred3.and(pred4)); + + HopsQuery UpdateQuery = session.createQuery(updateBlockNodes); + UpdateQuery.setParameter("startId", startId); + UpdateQuery.setParameter("endId", endId); + + + List results = UpdateQuery.getResultList(); + //System.out.println(Thread.currentThread().getId() + ": Result Size= " + results.size()); + List newResults = new ArrayList(results.size()); + + BlockInfoDTO newBlockInfo; + + for (BlockInfoDTO row : results) { + newBlockInfo = session.newInstance(BlockInfoDTO.class); + + newBlockInfo.setBlockId(-row.getBlockId()); + newBlockInfo.setNumBytes(row.getNumBytes()); + newBlockInfo.setGenerationStamp(row.getGenerationStamp()); + newBlockInfo.setINodeId(-row.getINodeId()); + newBlockInfo.setTimestamp(row.getTimestamp()); + newBlockInfo.setBlockIndex(row.getBlockIndex()); + newBlockInfo.setBlockUCState(row.getBlockUCState()); + newBlockInfo.setPrimaryNodeIndex(row.getPrimaryNodeIndex()); + newBlockInfo.setBlockRecoveryId(row.getBlockRecoveryId()); + newBlockInfo.setStatus(row.getStatus()); + + newResults.add(newBlockInfo); + } + + synchronized (lock) { + session.makePersistentAll(newResults); + } + System.gc(); + // System.out.println(Thread.currentThread().getId() + ": Acquired Lock. rows to write "+newResults.size()); + session = ClusterjConnector.getInstance().obtainSession(); + + + HopsQueryBuilder qbd = session.getQueryBuilder(); + HopsQueryDomainType deleteInodesWithNegativeId = qbd.createQueryDefinition(BlockInfoDTO.class); + HopsPredicate pred3d = deleteInodesWithNegativeId.get(BlockInfoTableDef.BLOCK_ID).lessEqual(deleteInodesWithNegativeId.param("startId")); + HopsPredicate pred4d = deleteInodesWithNegativeId.get(BlockInfoTableDef.BLOCK_ID).greaterEqual(deleteInodesWithNegativeId.param("endId")); + + deleteInodesWithNegativeId.where(pred3d.and(pred4d)); + HopsQuery queryToDeleteNegative = session.createQuery(deleteInodesWithNegativeId); + queryToDeleteNegative.setParameter("startId", startId); + queryToDeleteNegative.setParameter("endId", endId); + synchronized (lock) { + queryToDeleteNegative.deletePersistentAll(); + } + session.flush(); + session.close(); + } catch (Throwable e) { + throw new IOException(e); + } + return true; + + } + } + + private static class InodeAttributesPhase1Callable implements Callable { + + int startId, endId; + + InodeAttributesPhase1Callable(int startId, int endId) { + this.startId = startId; + this.endId = endId; + } + + @Override + public Boolean call() throws StorageException { + + HopsSession session = ClusterjConnector.getInstance().obtainSession(); + // Transaction tx = session.currentTransaction(); + //tx.begin(); + + HopsQueryBuilder qb = session.getQueryBuilder(); + //Delete inodes with status=2 or status=3 + HopsQueryDomainType deleteInodeAttributes = qb.createQueryDefinition(INodeAttributesDTO.class); + HopsPredicate pred1 = deleteInodeAttributes.get(INodeAttributesTableDef.STATUS).equal(deleteInodeAttributes.param("statusParam2")); + HopsPredicate pred2 = deleteInodeAttributes.get(INodeAttributesTableDef.STATUS).equal(deleteInodeAttributes.param("statusParam3")); + HopsPredicate pred3 = deleteInodeAttributes.get(INodeAttributesTableDef.ID).greaterEqual(deleteInodeAttributes.param("startId")); + HopsPredicate pred4 = deleteInodeAttributes.get(INodeAttributesTableDef.ID).lessEqual(deleteInodeAttributes.param("endId")); + HopsPredicate pred5 = pred3.and(pred4); + HopsPredicate pred6 = pred1.or(pred2); + deleteInodeAttributes.where(pred5.and(pred6)); + HopsQuery query = session.createQuery(deleteInodeAttributes); + query.setParameter("statusParam2", SnapShotConstants.Modified); + query.setParameter("statusParam3", SnapShotConstants.New); + query.setParameter("startId", startId); + query.setParameter("endId", endId); + + query.deletePersistentAll(); + + // tx.commit(); + session.flush(); + session.close(); + return true; + } + } + + private static class InodeAttributesPhase2Callable implements Callable { + + int startId, endId; + final Lock lock; + + InodeAttributesPhase2Callable(int startId, int endId, Lock lock) { + this.startId = startId; + this.endId = endId; + this.lock = lock; + } + + @Override + public Boolean call() throws StorageException, IOException { + try { + + HopsSession session = ClusterjConnector.getInstance().obtainSession(); +// Transaction tx = session.currentTransaction(); +// tx.begin(); + //System.err.println(Thread.currentThread().getId() + ": Started. StartId=" + startId + ", endId=" + endId); + + HopsQueryBuilder qb = session.getQueryBuilder(); + HopsQueryDomainType updateInodes = qb.createQueryDefinition(INodeAttributesDTO.class); + HopsPredicate pred3 = updateInodes.get(INodeAttributesTableDef.ID).lessEqual(updateInodes.param("startId")); + HopsPredicate pred4 = updateInodes.get(INodeAttributesTableDef.ID).greaterEqual(updateInodes.param("endId")); + + updateInodes.where(pred3.and(pred4)); + + HopsQuery UpdateQuery = session.createQuery(updateInodes); + UpdateQuery.setParameter("startId", startId); + UpdateQuery.setParameter("endId", endId); + + + List results = UpdateQuery.getResultList(); + //System.out.println(Thread.currentThread().getId() + ": Result Size= " + results.size()); + List newResults = new ArrayList(results.size()); + + INodeAttributesDTO newInodeAttributes; + + for (INodeAttributesDTO row : results) { + newInodeAttributes = session.newInstance(INodeAttributesDTO.class); + + newInodeAttributes.setId(-row.getId()); + newInodeAttributes.setNSQuota(row.getNSQuota()); + newInodeAttributes.setNSCount(row.getNSCount()); + newInodeAttributes.setDSQuota(row.getDSQuota()); + newInodeAttributes.setDiskspace(row.getDiskspace()); + newInodeAttributes.setStatus(row.getStatus()); + + newResults.add(newInodeAttributes); + } + + synchronized (lock) { + + session.makePersistentAll(newResults); + } + System.gc(); + + HopsQueryBuilder qbd = session.getQueryBuilder(); + HopsQueryDomainType deleteInodesWithNegativeId = qbd.createQueryDefinition(INodeAttributesDTO.class); + HopsPredicate pred3d = deleteInodesWithNegativeId.get(INodeAttributesTableDef.ID).lessEqual(deleteInodesWithNegativeId.param("startId")); + HopsPredicate pred4d = deleteInodesWithNegativeId.get(INodeAttributesTableDef.ID).greaterEqual(deleteInodesWithNegativeId.param("endId")); + + deleteInodesWithNegativeId.where(pred3d.and(pred4d)); + HopsQuery queryToDeleteNegative = session.createQuery(deleteInodesWithNegativeId); + queryToDeleteNegative.setParameter("startId", startId); + queryToDeleteNegative.setParameter("endId", endId); + + synchronized (lock) { + queryToDeleteNegative.deletePersistentAll(); + } + + session.flush(); + session.close(); + + } catch (Throwable e) { + throw new IOException(e); + } + return true; + + } + } + + private static class InodesPhase1Callable implements Callable { + + int startId, endId; + + InodesPhase1Callable(int startId, int endId) { + this.startId = startId; + this.endId = endId; + } + + @Override + public Boolean call() throws StorageException { + + HopsSession session = ClusterjConnector.getInstance().obtainSession(); + // Transaction tx = session.currentTransaction(); + //tx.begin(); + + HopsQueryBuilder qb = session.getQueryBuilder(); + //Delete inodes with status=2 or status=3 + HopsQueryDomainType deleteInodes = qb.createQueryDefinition(InodeDTO.class); + HopsPredicate pred1 = deleteInodes.get(INodeTableDef.STATUS).equal(deleteInodes.param("statusParam2")); + HopsPredicate pred2 = deleteInodes.get(INodeTableDef.STATUS).equal(deleteInodes.param("statusParam3")); + HopsPredicate pred3 = deleteInodes.get(INodeTableDef.ID).greaterEqual(deleteInodes.param("startId")); + HopsPredicate pred4 = deleteInodes.get(INodeTableDef.ID).lessEqual(deleteInodes.param("endId")); + HopsPredicate pred5 = pred3.and(pred4); + HopsPredicate pred6 = pred1.or(pred2); + deleteInodes.where(pred5.and(pred6)); + HopsQuery query = session.createQuery(deleteInodes); + query.setParameter("statusParam2", SnapShotConstants.Modified); + query.setParameter("statusParam3", SnapShotConstants.New); + query.setParameter("startId", startId); + query.setParameter("endId", endId); + + + query.deletePersistentAll(); + + // tx.commit(); + session.flush(); + session.close(); + return true; + } + } + + private static class InodesPhase2Callable implements Callable { + + int startId, endId; + + InodesPhase2Callable(int startId, int endId) { + this.startId = startId; + this.endId = endId; + } + + @Override + public Boolean call() throws StorageException { + + HopsSession session = ClusterjConnector.getInstance().obtainSession(); + HopsQueryBuilder qb = session.getQueryBuilder(); + HopsQueryDomainType updateInodes = qb.createQueryDefinition(InodeDTO.class); + + HopsPredicate pred4 = updateInodes.get(INodeTableDef.ISDELETED).equal(updateInodes.param("isDeletedParam")); + HopsPredicate pred5 = updateInodes.get(INodeTableDef.ID).lessEqual(updateInodes.param("startId")); + HopsPredicate pred6 = updateInodes.get(INodeTableDef.ID).greaterEqual(updateInodes.param("endId")); + updateInodes.where(pred6.and(pred5).and(pred4)); + + HopsQuery UpdateQuery = session.createQuery(updateInodes); + + UpdateQuery.setParameter("isDeletedParam", SnapShotConstants.isDeleted); + UpdateQuery.setParameter("startId", startId); + UpdateQuery.setParameter("endId", endId); + + List results = UpdateQuery.getResultList(); + List newResults = new ArrayList(results.size()); + List tobeDeleted = new ArrayList(results.size()); + + int id; + InodeDTO newInode; + for (InodeDTO row : results) { + if ((id = row.getParentId()) < 0) { + newInode = session.newInstance(InodeDTO.class); + newInode.setId(-row.getId()); + //newInode.setName(row.getName().split("\\$DEL\\$")[0]); + newInode.setName(row.getName().split("\\$DEL:[0-9]+\\$", 2)[0]); + newInode.setParentId(-id); + newInode.setQuotaEnabled(row.getQuotaEnabled()); + newInode.setModificationTime(row.getModificationTime()); + newInode.setATime(row.getATime()); + newInode.setPermission(row.getPermission()); + newInode.setUnderConstruction(row.getUnderConstruction()); + newInode.setClientName(row.getClientName()); + newInode.setClientMachine(row.getClientMachine()); + newInode.setClientNode(row.getClientNode()); + newInode.setGenerationStamp(row.getGenerationStamp()); + newInode.setHeader(row.getHeader()); + newInode.setSymlink(row.getSymlink()); + newInode.setSubtreeLocked(row.getSubtreeLocked()); + newInode.setSubtreeLockOwner(row.getSubtreeLockOwner()); + newInode.setMetaEnabled(row.getMetaEnabled()); + newInode.setSize(row.getSize()); + newInode.setIsDeleted(SnapShotConstants.isNotDeleted); + newInode.setStatus(row.getStatus()); + + newResults.add(newInode); + tobeDeleted.add(row); + } + } + + if (!newResults.isEmpty()) + session.makePersistentAll(newResults); + if (!tobeDeleted.isEmpty()) + session.deletePersistentAll(tobeDeleted); + + //System.out.println("SetIsDeleted from id=" + factor * BUFFER_SIZE + " ,to id=" + (factor + 1) * BUFFER_SIZE + ". Time= " + (end - start) / 1000 + "Seconds"); + + // tx.commit(); + session.flush(); + session.close(); + return true; + } + } + + private static class InodesPhase3Callable implements Callable { + + private final Log LOG = LogFactory.getLog(InodesPhase3Callable.class); + int startId, endId; + final Lock lock; + + InodesPhase3Callable(int startId, int endId, Lock lock) { + System.out.println("The startId=" + startId + ".The endId=" + endId); + this.startId = startId; + this.endId = endId; + this.lock = lock; + } + + @Override + public Boolean call() throws StorageException, IOException { + try { + System.out.println("call method is called"); + HopsSession session = ClusterjConnector.getInstance().obtainSession(); +// Transaction tx = session.currentTransaction(); +// tx.begin(); + //System.err.println(Thread.currentThread().getId() + ": Started. StartId=" + startId + ", endId=" + endId); + + HopsQueryBuilder qb = session.getQueryBuilder(); + HopsQueryDomainType updateInodes = qb.createQueryDefinition(InodeDTO.class); + HopsPredicate pred3 = updateInodes.get(INodeTableDef.ID).lessEqual(updateInodes.param("startId")); + HopsPredicate pred4 = updateInodes.get(INodeTableDef.ID).greaterEqual(updateInodes.param("endId")); + + updateInodes.where(pred3.and(pred4)); + + + HopsQuery UpdateQuery = session.createQuery(updateInodes); + UpdateQuery.setParameter("startId", startId); + UpdateQuery.setParameter("endId", endId); + + + List results = UpdateQuery.getResultList(); + //System.out.println(Thread.currentThread().getId() + ": Result Size= " + results.size()); + List newResults = new ArrayList(results.size()); + System.out.println("Query is executed"); + InodeDTO newInode; + + for (InodeDTO row : results) { + newInode = session.newInstance(InodeDTO.class); + + newInode.setId(-row.getId()); + newInode.setName(row.getName()); + newInode.setParentId(-row.getParentId()); + newInode.setQuotaEnabled(row.getQuotaEnabled()); + newInode.setModificationTime(row.getModificationTime()); + newInode.setATime(row.getATime()); + newInode.setPermission(row.getPermission()); + newInode.setUnderConstruction(row.getUnderConstruction()); + newInode.setClientName(row.getClientName()); + newInode.setClientMachine(row.getClientMachine()); + newInode.setClientNode(row.getClientNode()); + newInode.setGenerationStamp(row.getGenerationStamp()); + newInode.setHeader(row.getHeader()); + newInode.setSymlink(row.getSymlink()); + newInode.setSubtreeLocked(row.getSubtreeLocked()); + newInode.setSubtreeLockOwner(row.getSubtreeLockOwner()); + newInode.setMetaEnabled(row.getMetaEnabled()); + newInode.setSize(row.getSize()); + newInode.setIsDeleted(row.getIsDeleted()); + newInode.setStatus(row.getStatus()); + + newResults.add(newInode); + } + System.gc(); + synchronized (lock) { + // System.out.println(Thread.currentThread().getId() + ": Acquired Lock. rows to write "+newResults.size()); + session.makePersistentAll(newResults); + } + + HopsQueryBuilder qbd = session.getQueryBuilder(); + HopsQueryDomainType deleteInodesWithNegativeId = qbd.createQueryDefinition(InodeDTO.class); + HopsPredicate pred3d = deleteInodesWithNegativeId.get(INodeTableDef.ID).lessEqual(deleteInodesWithNegativeId.param("startId")); + HopsPredicate pred4d = deleteInodesWithNegativeId.get(INodeTableDef.ID).greaterEqual(deleteInodesWithNegativeId.param("endId")); + + deleteInodesWithNegativeId.where(pred3d.and(pred4d)); + HopsQuery queryToDeleteNegative = session.createQuery(deleteInodesWithNegativeId); + queryToDeleteNegative.setParameter("startId", startId); + queryToDeleteNegative.setParameter("endId", endId); + System.out.println("Deleting all the negative rows"); + synchronized (lock) { + queryToDeleteNegative.deletePersistentAll(); + } + System.out.println("Deleted all the negative rows"); + session.flush(); + session.close(); + + } catch (Throwable e) { + throw new IOException(e); + } + return true; + + } + } + + private static class Lock { + } +} \ No newline at end of file diff --git a/src/main/resources/ndb-config.properties b/src/main/resources/ndb-config.properties old mode 100644 new mode 100755 index 608b7080..d4c352d3 --- a/src/main/resources/ndb-config.properties +++ b/src/main/resources/ndb-config.properties @@ -1,23 +1,26 @@ -# -# Do not add spaces in the file. it is also used by some deployment scripts that fail if there are redundant spaces -# -com.mysql.clusterj.connectstring= -com.mysql.clusterj.database= -com.mysql.clusterj.connection.pool.size=1 -com.mysql.clusterj.max.transactions=1024 -#com.mysql.clusterj.connection.pool.nodeids= +com.mysql.clusterj.connectstring = cloud1.sics.se +com.mysql.clusterj.database = hop_pushparaj +com.mysql.clusterj.connection.pool.size = 1 +com.mysql.clusterj.max.transactions = 1024 io.hops.metadata.ndb.mysqlserver.data_source_class_name = com.mysql.jdbc.jdbc2.optional.MysqlDataSource -io.hops.metadata.ndb.mysqlserver.host= -io.hops.metadata.ndb.mysqlserver.port=3306 -io.hops.metadata.ndb.mysqlserver.username= -io.hops.metadata.ndb.mysqlserver.password= -io.hops.metadata.ndb.mysqlserver.connection_pool_size=10 +io.hops.metadata.ndb.mysqlserver.host = cloud1.sics.se +io.hops.metadata.ndb.mysqlserver.port = 3307 +io.hops.metadata.ndb.mysqlserver.username = hop +io.hops.metadata.ndb.mysqlserver.password = hop +io.hops.metadata.ndb.mysqlserver.connection_pool_size = 10 -#size of the session pool. should be altreat as big as the number of active RPC handling Threads in the system -io.hops.session.pool.size=1000 +#size of the session pool. should be altreat as big as the number of active Tx in the system +io.hops.session.pool.size = 1200 +#Session is reused n times and then it is GCed +io.hops.session.reuse.count = 500 +###Old Start +#com.mysql.clusterj.connectstring = cloud11.sics.se +#com.mysql.clusterj.database = hop_pushparaj_new +#com.mysql.clusterj.connection.pool.size = 1 +#com.mysql.clusterj.max.transactions = 1024 -#Session is reused Random.getNextInt(0,io.hops.session.reuse.count) times and then it is GCed -#use smaller values if using java 6. -#if you use java 7 or higer then use G1GC and there is no need to close sessions. use Int.MAX_VALUE -io.hops.session.reuse.count=2147483647 +#com.mysql.clusterj.jdbc.username=hop +#com.mysql.clusterj.jdbc.password=hop +#com.mysql.clusterj.jdbc.url=jdbc:mysql://cloud11.sics.se:3306/hop_pushparaj_new +####Old Finish