From 29e6061ac624e717ba2d8aa643f699b9f532cc6b Mon Sep 17 00:00:00 2001 From: raj Date: Sat, 9 May 2015 04:41:58 +0530 Subject: [PATCH 1/4] Compile successful --- schema/schema.sql | 4 + .../hops/metadata/ndb/NdbStorageFactory.java | 3 + .../ndb/dalimpl/hdfs/BlockInfoClusterj.java | 13 +- .../dalimpl/hdfs/INodeAttributesClusterj.java | 9 +- .../ndb/dalimpl/hdfs/INodeClusterj.java | 87 +- .../ndb/dalimpl/rollBack/RollBackImpl.java | 897 ++++++++++++++++++ src/main/resources/ndb-config.properties | 20 +- 7 files changed, 1016 insertions(+), 17 deletions(-) create mode 100644 src/main/java/io/hops/metadata/ndb/dalimpl/rollBack/RollBackImpl.java diff --git a/schema/schema.sql b/schema/schema.sql index 208cb4ed..cebbd467 100644 --- a/schema/schema.sql +++ b/schema/schema.sql @@ -10,6 +10,7 @@ CREATE TABLE `hdfs_block_infos` ( `time_stamp` bigint(20) DEFAULT NULL, `primary_node_index` int(11) DEFAULT NULL, `block_recovery_id` bigint(20) DEFAULT NULL, + `status` int(11) DEFAULT 0, PRIMARY KEY (`inode_id`,`block_id`) ) ENGINE=ndbcluster DEFAULT CHARSET=latin1 /*!50100 PARTITION BY KEY (inode_id) */$$ @@ -57,6 +58,7 @@ CREATE TABLE `hdfs_inode_attributes` ( `dsquota` bigint(20) DEFAULT NULL, `nscount` bigint(20) DEFAULT NULL, `diskspace` bigint(20) DEFAULT NULL, + `status` int(11) DEFAULT 0, PRIMARY KEY (`inodeId`) ) ENGINE=ndbcluster DEFAULT CHARSET=latin1$$ @@ -80,6 +82,8 @@ CREATE TABLE `hdfs_inodes` ( `under_construction` bit(8) NOT NULL, `subtree_locked` bit(8) DEFAULT NULL, `subtree_lock_owner` bigint(20) DEFAULT NULL, + `isdeleted` int(11) DEFAULT 0, + `status` int(11) DEFAULT 0, PRIMARY KEY (`parent_id`,`name`), KEY `inode_idx` (`id`) ) ENGINE=ndbcluster DEFAULT CHARSET=latin1 diff --git a/src/main/java/io/hops/metadata/ndb/NdbStorageFactory.java b/src/main/java/io/hops/metadata/ndb/NdbStorageFactory.java index 2e18f5a8..2c4c46f1 100644 --- a/src/main/java/io/hops/metadata/ndb/NdbStorageFactory.java +++ b/src/main/java/io/hops/metadata/ndb/NdbStorageFactory.java @@ -44,6 +44,8 @@ import io.hops.metadata.hdfs.dal.StorageIdMapDataAccess; import io.hops.metadata.hdfs.dal.UnderReplicatedBlockDataAccess; import io.hops.metadata.hdfs.dal.VariableDataAccess; +import io.hops.metadata.rollBack.dal.RollBackAccess; +import io.hops.metadata.ndb.dalimpl.rollBack.RollBackImpl; import io.hops.metadata.ndb.dalimpl.election.HdfsLeaderClusterj; import io.hops.metadata.ndb.dalimpl.election.YarnLeaderClusterj; import io.hops.metadata.ndb.dalimpl.hdfs.BlockChecksumClusterj; @@ -280,6 +282,7 @@ private void initDataAccessMap() { .put(NextHeartbeatDataAccess.class, new NextHeartbeatClusterJ()); dataAccessMap.put(RMLoadDataAccess.class, new RMLoadClusterJ()); dataAccessMap.put(FullRMNodeDataAccess.class, new FullRMNodeClusterJ()); + dataAccessMap.put(RollBackAccess.class,new RollBackImpl()); } @Override diff --git a/src/main/java/io/hops/metadata/ndb/dalimpl/hdfs/BlockInfoClusterj.java b/src/main/java/io/hops/metadata/ndb/dalimpl/hdfs/BlockInfoClusterj.java index 09663fe5..19026111 100644 --- a/src/main/java/io/hops/metadata/ndb/dalimpl/hdfs/BlockInfoClusterj.java +++ b/src/main/java/io/hops/metadata/ndb/dalimpl/hdfs/BlockInfoClusterj.java @@ -95,9 +95,13 @@ public interface BlockInfoDTO { long getBlockRecoveryId(); void setBlockRecoveryId(long recoveryId); - } - - private ClusterjConnector connector = ClusterjConnector.getInstance(); + + @Column(name=STATUS) + int getStatus(); + + void setStatus(int status); + } + private ClusterjConnector connector = ClusterjConnector.getInstance(); private final static int NOT_FOUND_ROW = -1000; @Override @@ -310,7 +314,7 @@ private BlockInfo createBlockInfo(BlockInfoClusterj.BlockInfoDTO bDTO) { new BlockInfo(bDTO.getBlockId(), bDTO.getBlockIndex(), bDTO.getINodeId(), bDTO.getNumBytes(), bDTO.getGenerationStamp(), bDTO.getBlockUCState(), bDTO.getTimestamp(), - bDTO.getPrimaryNodeIndex(), bDTO.getBlockRecoveryId()); + bDTO.getPrimaryNodeIndex(), bDTO.getBlockRecoveryId(),bDTO.getStatus()); return hopBlockInfo; } @@ -325,5 +329,6 @@ private void createPersistable(BlockInfo block, persistable.setBlockUCState(block.getBlockUCState()); persistable.setPrimaryNodeIndex(block.getPrimaryNodeIndex()); persistable.setBlockRecoveryId(block.getBlockRecoveryId()); + persistable.setStatus(block.getStatus()); } } diff --git a/src/main/java/io/hops/metadata/ndb/dalimpl/hdfs/INodeAttributesClusterj.java b/src/main/java/io/hops/metadata/ndb/dalimpl/hdfs/INodeAttributesClusterj.java index 7633e698..9a1c67e3 100644 --- a/src/main/java/io/hops/metadata/ndb/dalimpl/hdfs/INodeAttributesClusterj.java +++ b/src/main/java/io/hops/metadata/ndb/dalimpl/hdfs/INodeAttributesClusterj.java @@ -65,8 +65,12 @@ public interface INodeAttributesDTO { long getDiskspace(); void setDiskspace(long diskspace); - } + + @Column(name = STATUS) + int getStatus(); + void setStatus(int status); + } private ClusterjConnector connector = ClusterjConnector.getInstance(); @Override @@ -133,6 +137,7 @@ private INodeAttributesDTO createPersistable(INodeAttributes attribute, dto.setNSCount(attribute.getNsCount()); dto.setDSQuota(attribute.getDsQuota()); dto.setDiskspace(attribute.getDiskspace()); + dto.setStatus(attribute.getStatus()); return dto; } @@ -142,7 +147,7 @@ private INodeAttributes makeINodeAttributes(INodeAttributesDTO dto) { } INodeAttributes iNodeAttributes = new INodeAttributes(dto.getId(), dto.getNSQuota(), dto.getNSCount(), - dto.getDSQuota(), dto.getDiskspace()); + dto.getDSQuota(), dto.getDiskspace(),dto.getStatus()); return iNodeAttributes; } } \ No newline at end of file diff --git a/src/main/java/io/hops/metadata/ndb/dalimpl/hdfs/INodeClusterj.java b/src/main/java/io/hops/metadata/ndb/dalimpl/hdfs/INodeClusterj.java index 3230543e..d3f38821 100644 --- a/src/main/java/io/hops/metadata/ndb/dalimpl/hdfs/INodeClusterj.java +++ b/src/main/java/io/hops/metadata/ndb/dalimpl/hdfs/INodeClusterj.java @@ -29,6 +29,7 @@ import io.hops.metadata.hdfs.entity.INode; import io.hops.metadata.hdfs.entity.INodeIdentifier; import io.hops.metadata.hdfs.entity.ProjectedINode; +import io.hops.metadata.hdfs.snapshots.SnapShotConstants; import io.hops.metadata.ndb.ClusterjConnector; import io.hops.metadata.ndb.NdbBoolean; import io.hops.metadata.ndb.mysqlserver.HopsSQLExceptionHelper; @@ -149,6 +150,16 @@ public interface InodeDTO { long getSubtreeLockOwner(); void setSubtreeLockOwner(long leaderId); + + @Column(name = ISDELETED) + int getIsDeleted(); + + void setIsDeleted(int isdeleted); + + @Column(name = STATUS) + int getStatus(); + + void setStatus(int Status); } private ClusterjConnector connector = ClusterjConnector.getInstance(); @@ -222,7 +233,7 @@ public List indexScanFindInodesByParentId(int parentId) qb.createQueryDefinition(InodeDTO.class); HopsPredicate pred1 = dobj.get("parentId").equal(dobj.param("parentIDParam")); - dobj.where(pred1); + dobj.where(pred1); HopsQuery query = session.createQuery(dobj); query.setParameter("parentIDParam", parentId); @@ -230,14 +241,73 @@ public List indexScanFindInodesByParentId(int parentId) explain(query); return createInodeList(results); } - + + @Override + public List indexScanFindInodesByParentIdIncludeDeletes(int parentId) throws StorageException { + try { + + HopsSession session = connector.obtainSession(); + HopsQueryBuilder qb = session.getQueryBuilder(); + HopsQueryDomainType dobj = qb.createQueryDefinition(InodeDTO.class); + HopsPredicate pred1 = dobj.get("parentId").equal(dobj.param("parentIDParam1")); + HopsPredicate pred2 = dobj.get("parentId").equal(dobj.param("parentIDParam2")); + HopsPredicate pred3 = dobj.get("isdeleted").equal(dobj.param("isdeletedParam")); + dobj.where(pred1.or(pred2.and(pred3))); + + HopsQuery query = session.createQuery(dobj); + query.setParameter("parentIDParam1", parentId); + query.setParameter("parentIDParam2", -parentId); + query.setParameter("parentIDParam3", 1); + + List results = query.getResultList(); + explain(query); + return createInodeList(results); + } catch (Exception e) { + throw new StorageException(e); + } + } + @Override + public List findInodesByParentIdForSubTreeOpsWithReadLockIncludeDeletes( + int parentId) throws StorageException { + final String query = String.format( + "SELECT %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,%s,%s FROM %s WHERE %s=%d or (%s=%d and %s=%d) LOCK IN SHARE MODE", + ID, NAME, PARENT_ID, PERMISSION, HEADER, SYMLINK, QUOTA_ENABLED, + UNDER_CONSTRUCTION, SUBTREE_LOCKED, SUBTREE_LOCK_OWNER,STATUS,ISDELETED,TABLE_NAME, + PARENT_ID,PARENT_ID,ISDELETED, parentId,-parentId,SnapShotConstants.isDeleted); + ArrayList resultList; + try { + Connection conn = mysqlConnector.obtainSession(); + PreparedStatement s = conn.prepareStatement(query); + ResultSet result = s.executeQuery(); + resultList = new ArrayList(); + + while (result.next()) { + resultList.add( + new ProjectedINode(result.getInt(ID), result.getInt(PARENT_ID), + result.getString(NAME), result.getBytes(PERMISSION), + result.getLong(HEADER), + result.getString(SYMLINK) == null ? false : true, + result.getBoolean(QUOTA_ENABLED), + result.getBoolean(UNDER_CONSTRUCTION), + result.getBoolean(SUBTREE_LOCKED), + result.getLong(SUBTREE_LOCK_OWNER), + result.getInt(STATUS), + result.getInt(ISDELETED))); + } + } catch (SQLException ex) { + throw HopsSQLExceptionHelper.wrap(ex); + } finally { + mysqlConnector.closeSession(); + } + return resultList; + } @Override public List findInodesForSubtreeOperationsWithReadLock( int parentId) throws StorageException { final String query = String.format( - "SELECT %s, %s, %s, %s, %s, %s, %s, %s, %s, %s FROM %s WHERE %s=%d LOCK IN SHARE MODE", + "SELECT %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,%s,%s FROM %s WHERE %s=%d LOCK IN SHARE MODE", ID, NAME, PARENT_ID, PERMISSION, HEADER, SYMLINK, QUOTA_ENABLED, - UNDER_CONSTRUCTION, SUBTREE_LOCKED, SUBTREE_LOCK_OWNER, TABLE_NAME, + UNDER_CONSTRUCTION, SUBTREE_LOCKED, SUBTREE_LOCK_OWNER,STATUS,ISDELETED,TABLE_NAME, PARENT_ID, parentId); ArrayList resultList; try { @@ -255,7 +325,9 @@ public List findInodesForSubtreeOperationsWithReadLock( result.getBoolean(QUOTA_ENABLED), result.getBoolean(UNDER_CONSTRUCTION), result.getBoolean(SUBTREE_LOCKED), - result.getLong(SUBTREE_LOCK_OWNER))); + result.getLong(SUBTREE_LOCK_OWNER), + result.getInt(STATUS), + result.getInt(ISDELETED))); } } catch (SQLException ex) { throw HopsSQLExceptionHelper.wrap(ex); @@ -377,7 +449,8 @@ private INode createInode(InodeDTO persistable) { persistable.getClientNode(), persistable.getGenerationStamp(), persistable.getHeader(), persistable.getSymlink(), NdbBoolean.convert(persistable.getSubtreeLocked()), - persistable.getSubtreeLockOwner()); + persistable.getSubtreeLockOwner(), + persistable.getIsDeleted(),persistable.getStatus()); } private void createPersistable(INode inode, InodeDTO persistable) { @@ -397,6 +470,8 @@ private void createPersistable(INode inode, InodeDTO persistable) { persistable.setSymlink(inode.getSymlink()); persistable.setSubtreeLocked(NdbBoolean.convert(inode.isSubtreeLocked())); persistable.setSubtreeLockOwner(inode.getSubtreeLockOwner()); + persistable.setIsDeleted(inode.getIsDeleted()); + persistable.setStatus(inode.getStatus()); } private void explain(HopsQuery query) { diff --git a/src/main/java/io/hops/metadata/ndb/dalimpl/rollBack/RollBackImpl.java b/src/main/java/io/hops/metadata/ndb/dalimpl/rollBack/RollBackImpl.java new file mode 100644 index 00000000..3db1afea --- /dev/null +++ b/src/main/java/io/hops/metadata/ndb/dalimpl/rollBack/RollBackImpl.java @@ -0,0 +1,897 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ +package io.hops.metadata.ndb.dalimpl.rollBack; + +import com.mysql.clusterj.Query; +import com.mysql.clusterj.Session; +import com.mysql.clusterj.query.Predicate; +import com.mysql.clusterj.query.QueryBuilder; +import com.mysql.clusterj.query.QueryDomainType; +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.logging.Level; +import java.util.logging.Logger; + +import io.hops.metadata.ndb.wrapper.*; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import io.hops.exception.StorageException; +import io.hops.metadata.ndb.ClusterjConnector; +import io.hops.metadata.ndb.dalimpl.hdfs.BlockInfoClusterj.BlockInfoDTO; +import io.hops.metadata.ndb.dalimpl.hdfs.INodeAttributesClusterj.INodeAttributesDTO; +import io.hops.metadata.ndb.dalimpl.hdfs.INodeClusterj.InodeDTO; +import io.hops.metadata.ndb.mysqlserver.MysqlServerConnector; +import io.hops.metadata.rollBack.dal.RollBackAccess; +import io.hops.metadata.ndb.wrapper.HopsSession; +/** + * + * @author pushparaj + */ +public class RollBackImpl implements RollBackAccess { + + private static final Log LOG = LogFactory.getLog(RollBackImpl.class); + private static int BUFFER_SIZE = 50000; + private int rootId = 2; + + @Override + public boolean processInodesPhase1() throws IOException { + ExecutorService pool = Executors.newCachedThreadPool(); + int intervalStart; + int maxId = 0; + //Delete all rows with status=2 or status=3 + + try { + maxId = execMySqlQuery("select max(id) from inodes where status=2 or status=3"); + + } catch (StorageException ex) { + // Logger.getLogger(RollBackImpl.class.getName()).log(Level.SEVERE, null, ex); + throw new StorageException(ex); + } + + intervalStart = 0; + InodesPhase1Callable dr; + while (intervalStart <= maxId) { + dr = new InodesPhase1Callable(intervalStart, intervalStart + BUFFER_SIZE); + intervalStart = intervalStart + BUFFER_SIZE; + pool.submit(dr); + } + + //Wait for completion of above task. + shutDownPool(pool); + + //Confirm that the task has done. + int count = 1; + try { + count = execMySqlQuery("select count(*) from inodes where status=2 or status=3"); + + } catch (StorageException ex) { + // Logger.getLogger(RollBackImpl.class.getName()).log(Level.SEVERE, null, ex); + throw new StorageException(ex); + } + + return count == 0; + } + + @Override + public boolean processInodesPhase2() throws IOException { + + + ExecutorService pool = Executors.newCachedThreadPool(); + int intervalStart = 0; + int maxId = 0; + //Update all rows with id>0 and isDeleted=1 to isDeleted=0 + + try { + maxId = execMySqlQuery("select max(id) from inodes where id >0 and isDeleted=1"); + } catch (StorageException ex) { + //LOG.error(ex, ex); + throw new StorageException(ex); + } + + + intervalStart = 0; + InodesPhase2Callable mr; + while (intervalStart <= maxId) { + mr = new InodesPhase2Callable(intervalStart, intervalStart + BUFFER_SIZE); + intervalStart = intervalStart + BUFFER_SIZE; + pool.submit(mr); + } + + //Wait for completion of above task. + shutDownPool(pool); + + //Confirm that the task has done. + int count = 1; + try { + count = execMySqlQuery("select count(*) from inodes where id >0 and isDeleted=1"); + + } catch (StorageException ex) { + //Logger.getLogger(RollBackImpl.class.getName()).log(Level.SEVERE, null, ex); + throw new StorageException(ex); + } + + return count == 0; + } + + int execMySqlQuery(String query) throws StorageException { + MysqlServerConnector connector = MysqlServerConnector.getInstance(); + + try { + Connection conn = connector.obtainSession(); + PreparedStatement s = conn.prepareStatement(query); + ResultSet result = s.executeQuery(); + if (result.next()) { + return result.getInt(1); + } else { + throw new StorageException( + String.format("Count result set is empty. Query: %s", query)); + } + } catch (SQLException ex) { + throw new StorageException(ex); + } finally { + connector.closeSession(); + } + + + } + + void shutDownPool(ExecutorService pool) { + //Wait for completion of above task. + pool.shutdown(); + try { + //pool.awaitTermination(10, TimeUnit.MINUTES); + while (!pool.isTerminated()) { + Thread.sleep(500); + }; + } catch (InterruptedException ex) { + LOG.error(ex, ex); + } + + } + + @Override + public boolean processInodesPhase3() throws IOException { + //Insert new row for each backup row with [new row's id]=-[back-up row's id] and [new row's parentid]=-[back-up row's parentid] + + ExecutorService pool = Executors.newCachedThreadPool(); + int intervalStart = 0; + int minId = 0; + + try { + //Get total number of backup records. + minId = execMySqlQuery("select min(id) from inodes where id <0 "); + LOG.debug("The minimum id id " + minId); + //System.out.println(backUpRowsCnt); + } catch (StorageException ex) { + LOG.error(ex, ex); + } + + InodesPhase3Callable ir; + + Lock lock = new Lock(); + + while (intervalStart >= minId) { + ir = new InodesPhase3Callable(intervalStart, intervalStart - BUFFER_SIZE, lock); + //doInsert(intervalStart, intervalStart-BUFFER_SIZE); + intervalStart = intervalStart - BUFFER_SIZE; + + pool.submit(ir); + } + + //Wait for completion of above task. + shutDownPool(pool); + + //Confirm that the task has done. + int count = 1; + try { + count = execMySqlQuery("select count(*) from inodes where id<0"); + + } catch (StorageException ex) { + // Logger.getLogger(RollBackImpl.class.getName()).log(Level.SEVERE, null, ex); + throw new StorageException(ex); + } + + return count == 0; + } + + @Override + public boolean processInodeAttributesPhase1() throws IOException { + ExecutorService pool = Executors.newCachedThreadPool(); + int intervalStart; + int maxId = 0; + //Delete all rows with status=2 or status=3 + + try { + maxId = execMySqlQuery("select max(inodeId) from inode_attributes where status=2 or status=3"); + + } catch (StorageException ex) { + // Logger.getLogger(RollBackImpl.class.getName()).log(Level.SEVERE, null, ex); + throw new StorageException(ex); + } + + intervalStart = 0; + InodeAttributesPhase1Callable dr; + while (intervalStart <= maxId) { + dr = new InodeAttributesPhase1Callable(intervalStart, intervalStart + BUFFER_SIZE); + intervalStart = intervalStart + BUFFER_SIZE; + pool.submit(dr); + } + + //Wait for completion of above task. + shutDownPool(pool); + + //Confirm that the task has done. + int count = 1; + try { + count = execMySqlQuery("select count(*) from inode_attributes where status=2 or status=3"); + + } catch (StorageException ex) { + // Logger.getLogger(RollBackImpl.class.getName()).log(Level.SEVERE, null, ex); + throw new StorageException(ex); + } + + return count == 0; + } + + @Override + public boolean processInodeAttributesPhase2() throws IOException { + //Insert new row for each backup row with [new row's id]=-[back-up row's id] and [new row's parentid]=-[back-up row's parentid] + + ExecutorService pool = Executors.newCachedThreadPool(); + int intervalStart = 0; + int minId = 0; + + try { + //Get total number of backup records. + minId = execMySqlQuery("select min(inodeId) from inode_attributes where inodeId <0 "); + //System.out.println(backUpRowsCnt); + } catch (StorageException ex) { + LOG.error(ex, ex); + } + + InodeAttributesPhase2Callable ir; + + Lock lock = new Lock(); + + while (intervalStart >= minId) { + ir = new InodeAttributesPhase2Callable(intervalStart, intervalStart - BUFFER_SIZE, lock); + //doInsert(intervalStart, intervalStart-BUFFER_SIZE); + intervalStart = intervalStart - BUFFER_SIZE; + + pool.submit(ir); + } + + //Wait for completion of above task. + shutDownPool(pool); + + //Confirm that the task has done. + int count = 1; + try { + count = execMySqlQuery("select count(*) from inode_attributes where inodeId<0"); + + } catch (StorageException ex) { + //Logger.getLogger(RollBackImpl.class.getName()).log(Level.SEVERE, null, ex); + throw new StorageException(ex); + } + + return count == 0; + + } + + @Override + public boolean processBlocksPhase1() throws IOException { + ExecutorService pool = Executors.newCachedThreadPool(); + int intervalStart; + int maxId = 0; + //Delete all rows with status=2 or status=3 + + try { + maxId = execMySqlQuery("select max(block_id) from block_infos where status=2 or status=3"); + + } catch (StorageException ex) { + //Logger.getLogger(RollBackImpl.class.getName()).log(Level.SEVERE, null, ex); + throw new StorageException(ex); + } + + intervalStart = 0; + BlocksPhase1Callable dr; + while (intervalStart <= maxId) { + dr = new BlocksPhase1Callable(intervalStart, intervalStart + BUFFER_SIZE); + intervalStart = intervalStart + BUFFER_SIZE; + pool.submit(dr); + } + + //Wait for completion of above task. + shutDownPool(pool); + + //Confirm that the task has done. + int count = 1; + try { + count = execMySqlQuery("select count(*) from block_infos where status=2 or status=3"); + + } catch (StorageException ex) { + //Logger.getLogger(RollBackImpl.class.getName()).log(Level.SEVERE, null, ex); + throw new StorageException(ex); + } + + return count == 0; + } + + @Override + public boolean processBlocksPhase2() throws IOException { + //Insert new row for each backup row with [new row's id]=-[back-up row's id] and [new row's parentid]=-[back-up row's parentid] + ExecutorService pool = Executors.newCachedThreadPool(); + int intervalStart = 0; + int minId = 0; + + try { + //Get total number of backup records. + minId = execMySqlQuery("select min(block_id) from block_infos where block_id <0 "); + //System.out.println(backUpRowsCnt); + } catch (StorageException ex) { + LOG.error(ex, ex); + } + + BlocksPhase2Callable ir; + Lock lock = new Lock(); + + while (intervalStart >= minId) { + ir = new BlocksPhase2Callable(intervalStart, intervalStart - BUFFER_SIZE, lock); + //doInsert(intervalStart, intervalStart-BUFFER_SIZE); + intervalStart = intervalStart - BUFFER_SIZE; + pool.submit(ir); + } + + //Wait for completion of above task. + shutDownPool(pool); + + //Confirm that the task has done. + int count = 1; + try { + count = execMySqlQuery("select count(*) from block_infos where block_id<0"); + + } catch (StorageException ex) { + //Logger.getLogger(RollBackImpl.class.getName()).log(Level.SEVERE, null, ex); + throw new StorageException(ex); + } + + return count == 0; + } + + @Override + public boolean processRootAndSetSubTreeLocked(long nameNodeId) throws IOException { + + HopsSession session = ClusterjConnector.getInstance().obtainSession(); + + HopsQueryBuilder qb = session.getQueryBuilder(); + HopsQueryDomainType deleteInodes = qb.createQueryDefinition(InodeDTO.class); + HopsPredicate pred3 = deleteInodes.get("Id").equal(deleteInodes.param("RootId")); + deleteInodes.where(pred3); + HopsQuery query = session.createQuery(deleteInodes); + query.setParameter("Id", rootId); + //delete the root. + query.deletePersistentAll(); + //Get the back-Up row. + InodeDTO backUpRow = session.find(InodeDTO.class, new Integer(-rootId)); + + int maxId; + + maxId = execMySqlQuery("select max(id) from inodes where id >0"); + + InodeDTO newInode = session.newInstance(InodeDTO.class); + + newInode.setId(maxId + 1); + newInode.setName(backUpRow.getName()); + newInode.setParentId(-backUpRow.getParentId()); + newInode.setQuotaEnabled(backUpRow.getQuotaEnabled()); + newInode.setModificationTime(backUpRow.getModificationTime()); + newInode.setATime(backUpRow.getATime()); + newInode.setPermission(backUpRow.getPermission()); + newInode.setUnderConstruction(backUpRow.getUnderConstruction()); + newInode.setClientName(backUpRow.getClientName()); + newInode.setClientMachine(backUpRow.getClientMachine()); + newInode.setClientNode(backUpRow.getClientNode()); + newInode.setGenerationStamp(backUpRow.getGenerationStamp()); + newInode.setHeader(backUpRow.getHeader()); + newInode.setSymlink(backUpRow.getSymlink()); + newInode.setSubtreeLocked((byte)1);//SetSubTreeLockedToTree. + newInode.setSubtreeLockOwner(nameNodeId); + newInode.setIsDeleted(backUpRow.getIsDeleted()); + newInode.setStatus(backUpRow.getStatus()); + + session.persist(newInode);//Save the newRow of root with MaxId; + session.deletePersistent(InodeDTO.class, new Integer(-rootId));//delete the backUp row. + + session.flush(); + session.close(); + + return true; + + } + + @Override + public boolean unSetSubTreeLockedOnRoot() throws IOException { + HopsSession session = ClusterjConnector.getInstance().obtainSession(); + int maxId; + + maxId = execMySqlQuery("select max(id) from inodes where id >0"); + + InodeDTO oldRoot = session.find(InodeDTO.class, new Integer(maxId)); + + InodeDTO newRoot = session.newInstance(InodeDTO.class); + + newRoot.setId(rootId); + newRoot.setName(oldRoot.getName()); + newRoot.setParentId(-oldRoot.getParentId()); + newRoot.setQuotaEnabled(oldRoot.getQuotaEnabled()); + newRoot.setModificationTime(oldRoot.getModificationTime()); + newRoot.setATime(oldRoot.getATime()); + newRoot.setPermission(oldRoot.getPermission()); + newRoot.setUnderConstruction(oldRoot.getUnderConstruction()); + newRoot.setClientName(oldRoot.getClientName()); + newRoot.setClientMachine(oldRoot.getClientMachine()); + newRoot.setClientNode(oldRoot.getClientNode()); + newRoot.setGenerationStamp(oldRoot.getGenerationStamp()); + newRoot.setHeader(oldRoot.getHeader()); + newRoot.setSymlink(oldRoot.getSymlink()); + newRoot.setSubtreeLocked((byte)0);//UnsetSubTreeLockedToTree. + newRoot.setSubtreeLockOwner(oldRoot.getSubtreeLockOwner()); + newRoot.setIsDeleted(oldRoot.getIsDeleted()); + newRoot.setStatus(oldRoot.getStatus()); + + session.persist(newRoot);//Persist the new root; + session.deletePersistent(InodeDTO.class, new Integer(maxId));//delete the old Root row. + + session.flush(); + session.close(); + + return true; + } +} + +class BlocksPhase1Callable implements Callable { + + long startId, endId; + + BlocksPhase1Callable(long startId, long endId) { + this.startId = startId; + this.endId = endId; + } + + @Override + public Boolean call() throws StorageException { + + HopsSession session = ClusterjConnector.getInstance().obtainSession(); + + HopsQueryBuilder qb = session.getQueryBuilder(); + //Delete inodes with status=2 or status=3 + HopsQueryDomainType deleteBlockInfos = qb.createQueryDefinition(BlockInfoDTO.class); + HopsPredicate pred1 = deleteBlockInfos.get("status").equal(deleteBlockInfos.param("statusParam2")); + HopsPredicate pred2 = deleteBlockInfos.get("status").equal(deleteBlockInfos.param("statusParam3")); + HopsPredicate pred3 = deleteBlockInfos.get("blockId").greaterEqual(deleteBlockInfos.param("startId")); + HopsPredicate pred4 = deleteBlockInfos.get("blockId").lessEqual(deleteBlockInfos.param("endId")); + HopsPredicate pred5 = pred1.or(pred2); + deleteBlockInfos.where(pred5.and(pred3).and(pred4)); + HopsQuery query = session.createQuery(deleteBlockInfos); + query.setParameter("statusParam2", 2); + query.setParameter("statusParam3", 3); + query.setParameter("startId", startId); + query.setParameter("endId", endId); + + + query.deletePersistentAll(); + + // tx.commit(); + session.flush(); + session.close(); + return true; + } +} + +class BlocksPhase2Callable implements Callable { + + long startId, endId; + final Lock lock; + + BlocksPhase2Callable(long startId, long endId, Lock lock) { + this.startId = startId; + this.endId = endId; + this.lock = lock; + } + + @Override + public Boolean call() throws StorageException, IOException { + try { + + HopsSession session = ClusterjConnector.getInstance().obtainSession(); +// Transaction tx = session.currentTransaction(); +// tx.begin(); + //System.err.println(Thread.currentThread().getId() + ": Started. StartId=" + startId + ", endId=" + endId); + + HopsQueryBuilder qb = session.getQueryBuilder(); + HopsQueryDomainType updateInodes = qb.createQueryDefinition(BlockInfoDTO.class); + HopsPredicate pred3 = updateInodes.get("block_Id").lessEqual(updateInodes.param("startId")); + HopsPredicate pred4 = updateInodes.get("block_Id").greaterEqual(updateInodes.param("endId")); + + updateInodes.where(pred3.and(pred4)); + + HopsQuery UpdateQuery = session.createQuery(updateInodes); + UpdateQuery.setParameter("startId", startId); + UpdateQuery.setParameter("endId", endId); + + + List results = UpdateQuery.getResultList(); + //System.out.println(Thread.currentThread().getId() + ": Result Size= " + results.size()); + List newResults = new ArrayList(results.size()); + + BlockInfoDTO newBlockInfo; + + for (BlockInfoDTO row : results) { + newBlockInfo = session.newInstance(BlockInfoDTO.class); + + newBlockInfo.setBlockId(-row.getBlockId()); + newBlockInfo.setNumBytes(row.getNumBytes()); + newBlockInfo.setGenerationStamp(row.getGenerationStamp()); + newBlockInfo.setINodeId(-row.getINodeId()); + newBlockInfo.setTimestamp(row.getTimestamp()); + newBlockInfo.setBlockIndex(row.getBlockIndex()); + newBlockInfo.setBlockUCState(row.getBlockUCState()); + newBlockInfo.setPrimaryNodeIndex(row.getPrimaryNodeIndex()); + newBlockInfo.setBlockRecoveryId(row.getBlockRecoveryId()); + newBlockInfo.setStatus(row.getStatus()); + + newResults.add(newBlockInfo); + } + + synchronized (lock) { + session.makePersistentAll(newResults); + } + System.gc(); + // System.out.println(Thread.currentThread().getId() + ": Acquired Lock. rows to write "+newResults.size()); + session = ClusterjConnector.getInstance().obtainSession(); + + + HopsQueryBuilder qbd = session.getQueryBuilder(); + HopsQueryDomainType deleteInodesWithNegativeId = qbd.createQueryDefinition(BlockInfoDTO.class); + HopsPredicate pred3d = deleteInodesWithNegativeId.get("blockId").lessEqual(deleteInodesWithNegativeId.param("startId")); + HopsPredicate pred4d = deleteInodesWithNegativeId.get("blockId").greaterEqual(deleteInodesWithNegativeId.param("endId")); + + deleteInodesWithNegativeId.where(pred3d.and(pred4d)); + HopsQuery queryToDeleteNegative = session.createQuery(deleteInodesWithNegativeId); + queryToDeleteNegative.setParameter("startId", startId); + queryToDeleteNegative.setParameter("endId", endId); + synchronized (lock) { + queryToDeleteNegative.deletePersistentAll(); + } + session.flush(); + session.close(); + } catch (Throwable e) { + throw new IOException(e); + } + return true; + + } +} + +class InodeAttributesPhase1Callable implements Callable { + + int startId, endId; + + InodeAttributesPhase1Callable(int startId, int endId) { + this.startId = startId; + this.endId = endId; + } + + @Override + public Boolean call() throws StorageException { + + HopsSession session = ClusterjConnector.getInstance().obtainSession(); + // Transaction tx = session.currentTransaction(); + //tx.begin(); + + HopsQueryBuilder qb = session.getQueryBuilder(); + //Delete inodes with status=2 or status=3 + HopsQueryDomainType deleteInodes = qb.createQueryDefinition(INodeAttributesDTO.class); + HopsPredicate pred1 = deleteInodes.get("status").equal(deleteInodes.param("statusParam2")); + HopsPredicate pred2 = deleteInodes.get("status").equal(deleteInodes.param("statusParam3")); + HopsPredicate pred3 = deleteInodes.get("inodeId").greaterEqual(deleteInodes.param("startId")); + HopsPredicate pred4 = deleteInodes.get("inodeId").lessEqual(deleteInodes.param("endId")); + HopsPredicate pred5 = pred3.and(pred4); + HopsPredicate pred6 = pred1.or(pred2); + deleteInodes.where(pred5.and(pred6)); + HopsQuery query = session.createQuery(deleteInodes); + query.setParameter("statusParam2", 2); + query.setParameter("statusParam3", 3); + query.setParameter("startId", startId); + query.setParameter("endId", endId); + + query.deletePersistentAll(); + + // tx.commit(); + session.flush(); + session.close(); + return true; + } +} + +class InodeAttributesPhase2Callable implements Callable { + + int startId, endId; + final Lock lock; + + InodeAttributesPhase2Callable(int startId, int endId, Lock lock) { + this.startId = startId; + this.endId = endId; + this.lock = lock; + } + + @Override + public Boolean call() throws StorageException, IOException { + try { + + HopsSession session = ClusterjConnector.getInstance().obtainSession(); +// Transaction tx = session.currentTransaction(); +// tx.begin(); + //System.err.println(Thread.currentThread().getId() + ": Started. StartId=" + startId + ", endId=" + endId); + + HopsQueryBuilder qb = session.getQueryBuilder(); + HopsQueryDomainType updateInodes = qb.createQueryDefinition(INodeAttributesDTO.class); + HopsPredicate pred3 = updateInodes.get("inodeId").lessEqual(updateInodes.param("startId")); + HopsPredicate pred4 = updateInodes.get("inodeId").greaterEqual(updateInodes.param("endId")); + + updateInodes.where(pred3.and(pred4)); + + HopsQuery UpdateQuery = session.createQuery(updateInodes); + UpdateQuery.setParameter("startId", startId); + UpdateQuery.setParameter("endId", endId); + + + List results = UpdateQuery.getResultList(); + //System.out.println(Thread.currentThread().getId() + ": Result Size= " + results.size()); + List newResults = new ArrayList(results.size()); + + INodeAttributesDTO newInodeAttributes; + + for (INodeAttributesDTO row : results) { + newInodeAttributes = session.newInstance(INodeAttributesDTO.class); + + newInodeAttributes.setId(-row.getId()); + newInodeAttributes.setNSQuota(row.getNSQuota()); + newInodeAttributes.setNSCount(row.getNSCount()); + newInodeAttributes.setDSQuota(row.getDSQuota()); + newInodeAttributes.setDiskspace(row.getDiskspace()); + newInodeAttributes.setStatus(row.getStatus()); + + newResults.add(newInodeAttributes); + } + + synchronized (lock) { + + session.makePersistentAll(newResults); + } + System.gc(); + + HopsQueryBuilder qbd = session.getQueryBuilder(); + HopsQueryDomainType deleteInodesWithNegativeId = qbd.createQueryDefinition(INodeAttributesDTO.class); + HopsPredicate pred3d = deleteInodesWithNegativeId.get("inodeId").lessEqual(deleteInodesWithNegativeId.param("startId")); + HopsPredicate pred4d = deleteInodesWithNegativeId.get("inodeId").greaterEqual(deleteInodesWithNegativeId.param("endId")); + + deleteInodesWithNegativeId.where(pred3d.and(pred4d)); + HopsQuery queryToDeleteNegative = session.createQuery(deleteInodesWithNegativeId); + queryToDeleteNegative.setParameter("startId", startId); + queryToDeleteNegative.setParameter("endId", endId); + + synchronized (lock) { + queryToDeleteNegative.deletePersistentAll(); + } + + session.flush(); + session.close(); + + } catch (Throwable e) { + throw new IOException(e); + } + return true; + + } +} + +class InodesPhase1Callable implements Callable { + + int startId, endId; + + InodesPhase1Callable(int startId, int endId) { + this.startId = startId; + this.endId = endId; + } + + @Override + public Boolean call() throws StorageException { + + HopsSession session = ClusterjConnector.getInstance().obtainSession(); + // Transaction tx = session.currentTransaction(); + //tx.begin(); + + HopsQueryBuilder qb = session.getQueryBuilder(); + //Delete inodes with status=2 or status=3 + HopsQueryDomainType deleteInodes = qb.createQueryDefinition(InodeDTO.class); + HopsPredicate pred1 = deleteInodes.get("status").equal(deleteInodes.param("statusParam2")); + HopsPredicate pred2 = deleteInodes.get("status").equal(deleteInodes.param("statusParam3")); + HopsPredicate pred3 = deleteInodes.get("id").greaterEqual(deleteInodes.param("startId")); + HopsPredicate pred4 = deleteInodes.get("id").lessEqual(deleteInodes.param("endId")); + HopsPredicate pred5 = pred3.and(pred4); + HopsPredicate pred6 = pred1.or(pred2); + deleteInodes.where(pred5.and(pred6)); + HopsQuery query = session.createQuery(deleteInodes); + query.setParameter("statusParam2", 2); + query.setParameter("statusParam3", 3); + query.setParameter("startId", startId); + query.setParameter("endId", endId); + + + query.deletePersistentAll(); + + // tx.commit(); + session.flush(); + session.close(); + return true; + } +} + +class InodesPhase2Callable implements Callable { + + int startId, endId; + + InodesPhase2Callable(int startId, int endId) { + this.startId = startId; + this.endId = endId; + } + + @Override + public Boolean call() throws StorageException { + + + + HopsSession session = ClusterjConnector.getInstance().obtainSession(); +// Transaction tx = session.currentTransaction(); +// tx.begin(); + + HopsQueryBuilder qb = session.getQueryBuilder(); + HopsQueryDomainType updateInodes = qb.createQueryDefinition(InodeDTO.class); + + HopsPredicate pred4 = updateInodes.get("isDeleted").equal(updateInodes.param("isDeletedParam")); + HopsPredicate pred5 = updateInodes.get("id").greaterEqual(updateInodes.param("startId")); + HopsPredicate pred6 = updateInodes.get("id").lessEqual(updateInodes.param("endId")); + updateInodes.where(pred6.and(pred5).and(pred4)); + + HopsQuery UpdateQuery = session.createQuery(updateInodes); + + UpdateQuery.setParameter("isDeletedParam", 1); + UpdateQuery.setParameter("startId", startId); + UpdateQuery.setParameter("endId", endId); + + List results = UpdateQuery.getResultList(); + + for (InodeDTO row : results) { + row.setIsDeleted(0); + } + + session.updatePersistentAll(results); + + //System.out.println("SetIsDeleted from id=" + factor * BUFFER_SIZE + " ,to id=" + (factor + 1) * BUFFER_SIZE + ". Time= " + (end - start) / 1000 + "Seconds"); + + // tx.commit(); + session.flush(); + session.close(); + return true; + } +} + +class InodesPhase3Callable implements Callable { + + private static final Log LOG = LogFactory.getLog(InodesPhase3Callable.class); + int startId, endId; + final Lock lock; + + InodesPhase3Callable(int startId, int endId, Lock lock) { + System.out.println("The startId=" + startId + ".The endId=" + endId); + this.startId = startId; + this.endId = endId; + this.lock = lock; + } + + @Override + public Boolean call() throws StorageException, IOException { + try { + System.out.println("call method is called"); + HopsSession session = ClusterjConnector.getInstance().obtainSession(); +// Transaction tx = session.currentTransaction(); +// tx.begin(); + //System.err.println(Thread.currentThread().getId() + ": Started. StartId=" + startId + ", endId=" + endId); + + HopsQueryBuilder qb = session.getQueryBuilder(); + HopsQueryDomainType updateInodes = qb.createQueryDefinition(InodeDTO.class); + HopsPredicate pred3 = updateInodes.get("id").lessEqual(updateInodes.param("startId")); + HopsPredicate pred4 = updateInodes.get("id").greaterEqual(updateInodes.param("endId")); + + updateInodes.where(pred3.and(pred4)); + + + HopsQuery UpdateQuery = session.createQuery(updateInodes); + UpdateQuery.setParameter("startId", startId); + UpdateQuery.setParameter("endId", endId); + + + List results = UpdateQuery.getResultList(); + //System.out.println(Thread.currentThread().getId() + ": Result Size= " + results.size()); + List newResults = new ArrayList(results.size()); + System.out.println("Query is executed"); + InodeDTO newInode; + + for (InodeDTO row : results) { + newInode = session.newInstance(InodeDTO.class); + + newInode.setId(-row.getId()); + newInode.setName(row.getName()); + newInode.setParentId(-row.getParentId()); + newInode.setQuotaEnabled(row.getQuotaEnabled()); + newInode.setModificationTime(row.getModificationTime()); + newInode.setATime(row.getATime()); + newInode.setPermission(row.getPermission()); + newInode.setUnderConstruction(row.getUnderConstruction()); + newInode.setClientName(row.getClientName()); + newInode.setClientMachine(row.getClientMachine()); + newInode.setClientNode(row.getClientNode()); + newInode.setGenerationStamp(row.getGenerationStamp()); + newInode.setHeader(row.getHeader()); + newInode.setSymlink(row.getSymlink()); + newInode.setSubtreeLocked(row.getSubtreeLocked()); + newInode.setSubtreeLockOwner(row.getSubtreeLockOwner()); + newInode.setIsDeleted(row.getIsDeleted()); + newInode.setStatus(row.getStatus()); + + newResults.add(newInode); + } + System.gc(); + synchronized (lock) { + // System.out.println(Thread.currentThread().getId() + ": Acquired Lock. rows to write "+newResults.size()); + session.makePersistentAll(newResults); + } + + HopsQueryBuilder qbd = session.getQueryBuilder(); + HopsQueryDomainType deleteInodesWithNegativeId = qbd.createQueryDefinition(InodeDTO.class); + HopsPredicate pred3d = deleteInodesWithNegativeId.get("id").lessEqual(deleteInodesWithNegativeId.param("startId")); + HopsPredicate pred4d = deleteInodesWithNegativeId.get("id").greaterEqual(deleteInodesWithNegativeId.param("endId")); + + deleteInodesWithNegativeId.where(pred3d.and(pred4d)); + HopsQuery queryToDeleteNegative = session.createQuery(deleteInodesWithNegativeId); + queryToDeleteNegative.setParameter("startId", startId); + queryToDeleteNegative.setParameter("endId", endId); + System.out.println("Deleting all the negative rows"); + synchronized (lock) { + queryToDeleteNegative.deletePersistentAll(); + } + System.out.println("Deleted all the negative rows"); + session.flush(); + session.close(); + + } catch (Throwable e) { + throw new IOException(e); + } + return true; + + } +} + +class Lock { +} diff --git a/src/main/resources/ndb-config.properties b/src/main/resources/ndb-config.properties index 21e16da0..57ae62e1 100644 --- a/src/main/resources/ndb-config.properties +++ b/src/main/resources/ndb-config.properties @@ -1,16 +1,26 @@ -com.mysql.clusterj.connectstring = ndb1.example.com -com.mysql.clusterj.database = example +com.mysql.clusterj.connectstring = cloud11.sics.se +com.mysql.clusterj.database = hop_pushparaj_new com.mysql.clusterj.connection.pool.size = 1 com.mysql.clusterj.max.transactions = 1024 io.hops.metadata.ndb.mysqlserver.data_source_class_name = com.mysql.jdbc.jdbc2.optional.MysqlDataSource -io.hops.metadata.ndb.mysqlserver.host = mysql1.example.com +io.hops.metadata.ndb.mysqlserver.host = cloud11.sics.se io.hops.metadata.ndb.mysqlserver.port = 3306 -io.hops.metadata.ndb.mysqlserver.username = username -io.hops.metadata.ndb.mysqlserver.password = password +io.hops.metadata.ndb.mysqlserver.username = hop +io.hops.metadata.ndb.mysqlserver.password = hop io.hops.metadata.ndb.mysqlserver.connection_pool_size = 10 #size of the session pool. should be altreat as big as the number of active Tx in the system io.hops.session.pool.size = 1200 #Session is reused n times and then it is GCed io.hops.session.reuse.count = 500 +###Old Start +#com.mysql.clusterj.connectstring = cloud11.sics.se +#com.mysql.clusterj.database = hop_pushparaj_new +#com.mysql.clusterj.connection.pool.size = 1 +#com.mysql.clusterj.max.transactions = 1024 + +#com.mysql.clusterj.jdbc.username=hop +#com.mysql.clusterj.jdbc.password=hop +#com.mysql.clusterj.jdbc.url=jdbc:mysql://cloud11.sics.se:3306/hop_pushparaj_new +####Old Finish From 30e9673b39cce6fd5a0beff74169cdfdf914ff2b Mon Sep 17 00:00:00 2001 From: Pushparaj Motamari Date: Thu, 9 Jun 2016 11:20:57 +0200 Subject: [PATCH 2/4] 8June2016 --- schema/schema.sql | 5 +- .../ndb/dalimpl/rollBack/RollBackImpl.java | 126 +++++++++++------- src/main/resources/ndb-config.properties | 8 +- 3 files changed, 86 insertions(+), 53 deletions(-) mode change 100644 => 100755 src/main/resources/ndb-config.properties diff --git a/schema/schema.sql b/schema/schema.sql index cebbd467..24bb3a1f 100644 --- a/schema/schema.sql +++ b/schema/schema.sql @@ -1,3 +1,6 @@ +CREATE DATABASE IF NOT EXISTS `hop_pushparaj`; +USE `hop_pushparaj`; + delimiter $$ CREATE TABLE `hdfs_block_infos` ( @@ -827,4 +830,4 @@ CREATE TABLE `yarn_pendingevents` ( `status` TINYINT NULL, `last_hb` INT NULL, PRIMARY KEY (`id`, `rmnodeid`)) -ENGINE = ndbcluster$$ \ No newline at end of file +ENGINE = ndbcluster$$ diff --git a/src/main/java/io/hops/metadata/ndb/dalimpl/rollBack/RollBackImpl.java b/src/main/java/io/hops/metadata/ndb/dalimpl/rollBack/RollBackImpl.java index 3db1afea..5bfca895 100644 --- a/src/main/java/io/hops/metadata/ndb/dalimpl/rollBack/RollBackImpl.java +++ b/src/main/java/io/hops/metadata/ndb/dalimpl/rollBack/RollBackImpl.java @@ -22,6 +22,7 @@ import java.util.logging.Level; import java.util.logging.Logger; +import io.hops.metadata.hdfs.snapshots.SnapShotConstants; import io.hops.metadata.ndb.wrapper.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -48,17 +49,20 @@ public boolean processInodesPhase1() throws IOException { ExecutorService pool = Executors.newCachedThreadPool(); int intervalStart; int maxId = 0; + int minId; //Delete all rows with status=2 or status=3 try { - maxId = execMySqlQuery("select max(id) from inodes where status=2 or status=3"); + maxId = execMySqlQuery("select max(id) from hdfs_inodes where status=2 or status=3"); + //An inode created after taking snapshot and deleted will have negative_id and isDeleted=1. + minId = execMySqlQuery("select min(id) from hdfs_inodes where status=2 or status=3"); } catch (StorageException ex) { // Logger.getLogger(RollBackImpl.class.getName()).log(Level.SEVERE, null, ex); throw new StorageException(ex); } - intervalStart = 0; + intervalStart = minId; InodesPhase1Callable dr; while (intervalStart <= maxId) { dr = new InodesPhase1Callable(intervalStart, intervalStart + BUFFER_SIZE); @@ -72,7 +76,7 @@ public boolean processInodesPhase1() throws IOException { //Confirm that the task has done. int count = 1; try { - count = execMySqlQuery("select count(*) from inodes where status=2 or status=3"); + count = execMySqlQuery("select count(*) from hdfs_inodes where status=2 or status=3"); } catch (StorageException ex) { // Logger.getLogger(RollBackImpl.class.getName()).log(Level.SEVERE, null, ex); @@ -84,15 +88,13 @@ public boolean processInodesPhase1() throws IOException { @Override public boolean processInodesPhase2() throws IOException { - - ExecutorService pool = Executors.newCachedThreadPool(); int intervalStart = 0; - int maxId = 0; + int minId = 0; //Update all rows with id>0 and isDeleted=1 to isDeleted=0 try { - maxId = execMySqlQuery("select max(id) from inodes where id >0 and isDeleted=1"); + minId = execMySqlQuery("select min(id) from hdfs_inodes where id <0 and isDeleted=1"); } catch (StorageException ex) { //LOG.error(ex, ex); throw new StorageException(ex); @@ -101,9 +103,9 @@ public boolean processInodesPhase2() throws IOException { intervalStart = 0; InodesPhase2Callable mr; - while (intervalStart <= maxId) { - mr = new InodesPhase2Callable(intervalStart, intervalStart + BUFFER_SIZE); - intervalStart = intervalStart + BUFFER_SIZE; + while (intervalStart > minId) { + mr = new InodesPhase2Callable(intervalStart, intervalStart - BUFFER_SIZE); + intervalStart = intervalStart - BUFFER_SIZE; pool.submit(mr); } @@ -113,7 +115,7 @@ public boolean processInodesPhase2() throws IOException { //Confirm that the task has done. int count = 1; try { - count = execMySqlQuery("select count(*) from inodes where id >0 and isDeleted=1"); + count = execMySqlQuery("select count(*) from hdfs_inodes where id <0 and isDeleted=1"); } catch (StorageException ex) { //Logger.getLogger(RollBackImpl.class.getName()).log(Level.SEVERE, null, ex); @@ -169,7 +171,7 @@ public boolean processInodesPhase3() throws IOException { try { //Get total number of backup records. - minId = execMySqlQuery("select min(id) from inodes where id <0 "); + minId = execMySqlQuery("select min(id) from hdfs_inodes where id <0 "); LOG.debug("The minimum id id " + minId); //System.out.println(backUpRowsCnt); } catch (StorageException ex) { @@ -194,7 +196,7 @@ public boolean processInodesPhase3() throws IOException { //Confirm that the task has done. int count = 1; try { - count = execMySqlQuery("select count(*) from inodes where id<0"); + count = execMySqlQuery("select count(*) from hdfs_inodes where id<0"); } catch (StorageException ex) { // Logger.getLogger(RollBackImpl.class.getName()).log(Level.SEVERE, null, ex); @@ -212,7 +214,7 @@ public boolean processInodeAttributesPhase1() throws IOException { //Delete all rows with status=2 or status=3 try { - maxId = execMySqlQuery("select max(inodeId) from inode_attributes where status=2 or status=3"); + maxId = execMySqlQuery("select max(inodeId) from hdfs_inode_attributes where status=2 or status=3"); } catch (StorageException ex) { // Logger.getLogger(RollBackImpl.class.getName()).log(Level.SEVERE, null, ex); @@ -233,7 +235,7 @@ public boolean processInodeAttributesPhase1() throws IOException { //Confirm that the task has done. int count = 1; try { - count = execMySqlQuery("select count(*) from inode_attributes where status=2 or status=3"); + count = execMySqlQuery("select count(*) from hdfs_inode_attributes where status=2 or status=3"); } catch (StorageException ex) { // Logger.getLogger(RollBackImpl.class.getName()).log(Level.SEVERE, null, ex); @@ -253,7 +255,7 @@ public boolean processInodeAttributesPhase2() throws IOException { try { //Get total number of backup records. - minId = execMySqlQuery("select min(inodeId) from inode_attributes where inodeId <0 "); + minId = execMySqlQuery("select min(inodeId) from hdfs_inode_attributes where inodeId <0 "); //System.out.println(backUpRowsCnt); } catch (StorageException ex) { LOG.error(ex, ex); @@ -277,7 +279,7 @@ public boolean processInodeAttributesPhase2() throws IOException { //Confirm that the task has done. int count = 1; try { - count = execMySqlQuery("select count(*) from inode_attributes where inodeId<0"); + count = execMySqlQuery("select count(*) from hdfs_inode_attributes where inodeId<0"); } catch (StorageException ex) { //Logger.getLogger(RollBackImpl.class.getName()).log(Level.SEVERE, null, ex); @@ -296,7 +298,7 @@ public boolean processBlocksPhase1() throws IOException { //Delete all rows with status=2 or status=3 try { - maxId = execMySqlQuery("select max(block_id) from block_infos where status=2 or status=3"); + maxId = execMySqlQuery("select max(block_id) from hdfs_block_infos where status=2 or status=3"); } catch (StorageException ex) { //Logger.getLogger(RollBackImpl.class.getName()).log(Level.SEVERE, null, ex); @@ -317,7 +319,7 @@ public boolean processBlocksPhase1() throws IOException { //Confirm that the task has done. int count = 1; try { - count = execMySqlQuery("select count(*) from block_infos where status=2 or status=3"); + count = execMySqlQuery("select count(*) from hdfs_block_infos where status=2 or status=3"); } catch (StorageException ex) { //Logger.getLogger(RollBackImpl.class.getName()).log(Level.SEVERE, null, ex); @@ -336,7 +338,7 @@ public boolean processBlocksPhase2() throws IOException { try { //Get total number of backup records. - minId = execMySqlQuery("select min(block_id) from block_infos where block_id <0 "); + minId = execMySqlQuery("select min(block_id) from hdfs_block_infos where block_id <0 "); //System.out.println(backUpRowsCnt); } catch (StorageException ex) { LOG.error(ex, ex); @@ -358,7 +360,7 @@ public boolean processBlocksPhase2() throws IOException { //Confirm that the task has done. int count = 1; try { - count = execMySqlQuery("select count(*) from block_infos where block_id<0"); + count = execMySqlQuery("select count(*) from hdfs_block_infos where block_id<0"); } catch (StorageException ex) { //Logger.getLogger(RollBackImpl.class.getName()).log(Level.SEVERE, null, ex); @@ -386,7 +388,7 @@ public boolean processRootAndSetSubTreeLocked(long nameNodeId) throws IOExceptio int maxId; - maxId = execMySqlQuery("select max(id) from inodes where id >0"); + maxId = execMySqlQuery("select max(id) from hdfs_inodes where id >0"); InodeDTO newInode = session.newInstance(InodeDTO.class); @@ -424,7 +426,7 @@ public boolean unSetSubTreeLockedOnRoot() throws IOException { HopsSession session = ClusterjConnector.getInstance().obtainSession(); int maxId; - maxId = execMySqlQuery("select max(id) from inodes where id >0"); + maxId = execMySqlQuery("select max(id) from hdfs_inodes where id >0"); InodeDTO oldRoot = session.find(InodeDTO.class, new Integer(maxId)); @@ -519,13 +521,13 @@ public Boolean call() throws StorageException, IOException { //System.err.println(Thread.currentThread().getId() + ": Started. StartId=" + startId + ", endId=" + endId); HopsQueryBuilder qb = session.getQueryBuilder(); - HopsQueryDomainType updateInodes = qb.createQueryDefinition(BlockInfoDTO.class); - HopsPredicate pred3 = updateInodes.get("block_Id").lessEqual(updateInodes.param("startId")); - HopsPredicate pred4 = updateInodes.get("block_Id").greaterEqual(updateInodes.param("endId")); + HopsQueryDomainType updateBlockNodes = qb.createQueryDefinition(BlockInfoDTO.class); + HopsPredicate pred3 = updateBlockNodes.get("block_Id").lessEqual(updateBlockNodes.param("startId")); + HopsPredicate pred4 = updateBlockNodes.get("block_Id").greaterEqual(updateBlockNodes.param("endId")); - updateInodes.where(pred3.and(pred4)); + updateBlockNodes.where(pred3.and(pred4)); - HopsQuery UpdateQuery = session.createQuery(updateInodes); + HopsQuery UpdateQuery = session.createQuery(updateBlockNodes); UpdateQuery.setParameter("startId", startId); UpdateQuery.setParameter("endId", endId); @@ -601,15 +603,15 @@ public Boolean call() throws StorageException { HopsQueryBuilder qb = session.getQueryBuilder(); //Delete inodes with status=2 or status=3 - HopsQueryDomainType deleteInodes = qb.createQueryDefinition(INodeAttributesDTO.class); - HopsPredicate pred1 = deleteInodes.get("status").equal(deleteInodes.param("statusParam2")); - HopsPredicate pred2 = deleteInodes.get("status").equal(deleteInodes.param("statusParam3")); - HopsPredicate pred3 = deleteInodes.get("inodeId").greaterEqual(deleteInodes.param("startId")); - HopsPredicate pred4 = deleteInodes.get("inodeId").lessEqual(deleteInodes.param("endId")); + HopsQueryDomainType deleteInodeAttributes = qb.createQueryDefinition(INodeAttributesDTO.class); + HopsPredicate pred1 = deleteInodeAttributes.get("status").equal(deleteInodeAttributes.param("statusParam2")); + HopsPredicate pred2 = deleteInodeAttributes.get("status").equal(deleteInodeAttributes.param("statusParam3")); + HopsPredicate pred3 = deleteInodeAttributes.get("id").greaterEqual(deleteInodeAttributes.param("startId")); + HopsPredicate pred4 = deleteInodeAttributes.get("id").lessEqual(deleteInodeAttributes.param("endId")); HopsPredicate pred5 = pred3.and(pred4); HopsPredicate pred6 = pred1.or(pred2); - deleteInodes.where(pred5.and(pred6)); - HopsQuery query = session.createQuery(deleteInodes); + deleteInodeAttributes.where(pred5.and(pred6)); + HopsQuery query = session.createQuery(deleteInodeAttributes); query.setParameter("statusParam2", 2); query.setParameter("statusParam3", 3); query.setParameter("startId", startId); @@ -646,8 +648,8 @@ public Boolean call() throws StorageException, IOException { HopsQueryBuilder qb = session.getQueryBuilder(); HopsQueryDomainType updateInodes = qb.createQueryDefinition(INodeAttributesDTO.class); - HopsPredicate pred3 = updateInodes.get("inodeId").lessEqual(updateInodes.param("startId")); - HopsPredicate pred4 = updateInodes.get("inodeId").greaterEqual(updateInodes.param("endId")); + HopsPredicate pred3 = updateInodes.get("id").lessEqual(updateInodes.param("startId")); + HopsPredicate pred4 = updateInodes.get("id").greaterEqual(updateInodes.param("endId")); updateInodes.where(pred3.and(pred4)); @@ -683,8 +685,8 @@ public Boolean call() throws StorageException, IOException { HopsQueryBuilder qbd = session.getQueryBuilder(); HopsQueryDomainType deleteInodesWithNegativeId = qbd.createQueryDefinition(INodeAttributesDTO.class); - HopsPredicate pred3d = deleteInodesWithNegativeId.get("inodeId").lessEqual(deleteInodesWithNegativeId.param("startId")); - HopsPredicate pred4d = deleteInodesWithNegativeId.get("inodeId").greaterEqual(deleteInodesWithNegativeId.param("endId")); + HopsPredicate pred3d = deleteInodesWithNegativeId.get("id").lessEqual(deleteInodesWithNegativeId.param("startId")); + HopsPredicate pred4d = deleteInodesWithNegativeId.get("id").greaterEqual(deleteInodesWithNegativeId.param("endId")); deleteInodesWithNegativeId.where(pred3d.and(pred4d)); HopsQuery queryToDeleteNegative = session.createQuery(deleteInodesWithNegativeId); @@ -760,18 +762,13 @@ class InodesPhase2Callable implements Callable { @Override public Boolean call() throws StorageException { - - HopsSession session = ClusterjConnector.getInstance().obtainSession(); -// Transaction tx = session.currentTransaction(); -// tx.begin(); - HopsQueryBuilder qb = session.getQueryBuilder(); HopsQueryDomainType updateInodes = qb.createQueryDefinition(InodeDTO.class); HopsPredicate pred4 = updateInodes.get("isDeleted").equal(updateInodes.param("isDeletedParam")); - HopsPredicate pred5 = updateInodes.get("id").greaterEqual(updateInodes.param("startId")); - HopsPredicate pred6 = updateInodes.get("id").lessEqual(updateInodes.param("endId")); + HopsPredicate pred5 = updateInodes.get("id").lessEqual(updateInodes.param("startId")); + HopsPredicate pred6 = updateInodes.get("id").greaterEqual(updateInodes.param("endId")); updateInodes.where(pred6.and(pred5).and(pred4)); HopsQuery UpdateQuery = session.createQuery(updateInodes); @@ -781,12 +778,45 @@ public Boolean call() throws StorageException { UpdateQuery.setParameter("endId", endId); List results = UpdateQuery.getResultList(); + List newResults = new ArrayList(results.size()); + List tobeDeleted = new ArrayList(results.size()); + int id; + InodeDTO newInode; for (InodeDTO row : results) { - row.setIsDeleted(0); + if((id=row.getParentId())<0) { + newInode = session.newInstance(InodeDTO.class); + newInode.setId(-row.getId()); + //newInode.setName(row.getName().split("\\$DEL\\$")[0]); + newInode.setName(row.getName().split("\\$DEL:[0-9]+\\$",2)[0]); + newInode.setParentId(-id); + newInode.setQuotaEnabled(row.getQuotaEnabled()); + newInode.setModificationTime(row.getModificationTime()); + newInode.setATime(row.getATime()); + newInode.setPermission(row.getPermission()); + newInode.setUnderConstruction(row.getUnderConstruction()); + newInode.setClientName(row.getClientName()); + newInode.setClientMachine(row.getClientMachine()); + newInode.setClientNode(row.getClientNode()); + newInode.setGenerationStamp(row.getGenerationStamp()); + newInode.setHeader(row.getHeader()); + newInode.setSymlink(row.getSymlink()); + newInode.setSubtreeLocked(row.getSubtreeLocked()); + newInode.setSubtreeLockOwner(row.getSubtreeLockOwner()); + newInode.setIsDeleted(SnapShotConstants.isNotDeleted); + newInode.setStatus(row.getStatus()); + + newResults.add(newInode); + tobeDeleted.add(row); + + + } } - session.updatePersistentAll(results); + if(!newResults.isEmpty()) + session.makePersistentAll(newResults); + if(!tobeDeleted.isEmpty()) + session.deletePersistentAll(tobeDeleted); //System.out.println("SetIsDeleted from id=" + factor * BUFFER_SIZE + " ,to id=" + (factor + 1) * BUFFER_SIZE + ". Time= " + (end - start) / 1000 + "Seconds"); diff --git a/src/main/resources/ndb-config.properties b/src/main/resources/ndb-config.properties old mode 100644 new mode 100755 index 57ae62e1..d4c352d3 --- a/src/main/resources/ndb-config.properties +++ b/src/main/resources/ndb-config.properties @@ -1,11 +1,11 @@ -com.mysql.clusterj.connectstring = cloud11.sics.se -com.mysql.clusterj.database = hop_pushparaj_new +com.mysql.clusterj.connectstring = cloud1.sics.se +com.mysql.clusterj.database = hop_pushparaj com.mysql.clusterj.connection.pool.size = 1 com.mysql.clusterj.max.transactions = 1024 io.hops.metadata.ndb.mysqlserver.data_source_class_name = com.mysql.jdbc.jdbc2.optional.MysqlDataSource -io.hops.metadata.ndb.mysqlserver.host = cloud11.sics.se -io.hops.metadata.ndb.mysqlserver.port = 3306 +io.hops.metadata.ndb.mysqlserver.host = cloud1.sics.se +io.hops.metadata.ndb.mysqlserver.port = 3307 io.hops.metadata.ndb.mysqlserver.username = hop io.hops.metadata.ndb.mysqlserver.password = hop io.hops.metadata.ndb.mysqlserver.connection_pool_size = 10 From fd4cc0a82a134a4eee62844ffad5a145daf27186 Mon Sep 17 00:00:00 2001 From: Pushparaj Motamari Date: Sun, 19 Jun 2016 15:44:18 +0200 Subject: [PATCH 3/4] Changes to Schema --- schema/schema.sql | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/schema/schema.sql b/schema/schema.sql index d2869dce..0859b0b7 100644 --- a/schema/schema.sql +++ b/schema/schema.sql @@ -13,7 +13,7 @@ CREATE TABLE `hdfs_block_infos` ( `time_stamp` bigint(20) DEFAULT NULL, `primary_node_index` int(11) DEFAULT NULL, `block_recovery_id` bigint(20) DEFAULT NULL, - `status` int(11) DEFAULT 0, + `status` int(11) DEFAULT 1, PRIMARY KEY (`inode_id`,`block_id`) ) ENGINE=ndbcluster DEFAULT CHARSET=latin1 /*!50100 PARTITION BY KEY (inode_id) */$$ @@ -61,7 +61,7 @@ CREATE TABLE `hdfs_inode_attributes` ( `dsquota` bigint(20) DEFAULT NULL, `nscount` bigint(20) DEFAULT NULL, `diskspace` bigint(20) DEFAULT NULL, - `status` int(11) DEFAULT 0, + `status` int(11) DEFAULT 1, PRIMARY KEY (`inodeId`) ) ENGINE=ndbcluster DEFAULT CHARSET=latin1$$ @@ -88,7 +88,7 @@ CREATE TABLE `hdfs_inodes` ( `meta_enabled` bit(8) DEFAULT b'110000', `size` bigint(20) NOT NULL DEFAULT '0', `isdeleted` int(11) DEFAULT 0, - `status` int(11) DEFAULT 0, + `status` int(11) DEFAULT 1, PRIMARY KEY (`parent_id`,`name`), KEY `pidex` (`parent_id`), KEY `inode_idx` (`id`) From 2a89516ab96b1c71e0c5547189ad1f3e3d571d19 Mon Sep 17 00:00:00 2001 From: Pushparaj Motamari Date: Sun, 12 Mar 2017 16:03:39 +0100 Subject: [PATCH 4/4] RollBackManager --- .../dalimpl/rollBack/RemoveSnapshotImpl.java | 500 ++++++++++ .../ndb/dalimpl/rollBack/RollBackImpl.java | 871 +++++++++--------- 2 files changed, 918 insertions(+), 453 deletions(-) create mode 100644 src/main/java/io/hops/metadata/ndb/dalimpl/rollBack/RemoveSnapshotImpl.java diff --git a/src/main/java/io/hops/metadata/ndb/dalimpl/rollBack/RemoveSnapshotImpl.java b/src/main/java/io/hops/metadata/ndb/dalimpl/rollBack/RemoveSnapshotImpl.java new file mode 100644 index 00000000..c3d50e60 --- /dev/null +++ b/src/main/java/io/hops/metadata/ndb/dalimpl/rollBack/RemoveSnapshotImpl.java @@ -0,0 +1,500 @@ +package io.hops.metadata.ndb.dalimpl.rollBack; + +import io.hops.exception.StorageException; +import io.hops.metadata.hdfs.snapshots.SnapShotConstants; +import io.hops.metadata.ndb.ClusterjConnector; +import io.hops.metadata.ndb.dalimpl.hdfs.BlockInfoClusterj; +import io.hops.metadata.ndb.dalimpl.hdfs.INodeAttributesClusterj; +import io.hops.metadata.ndb.dalimpl.hdfs.INodeClusterj; +import io.hops.metadata.ndb.wrapper.*; +import io.hops.metadata.rollBack.dal.RemoveSnapshotAccess; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import static io.hops.metadata.ndb.dalimpl.rollBack.RollBackImpl.execMySqlQuery; +import io.hops.metadata.hdfs.TablesDef.*; + + +/** + * Copyright (c) 01/08/16 DigitalRoute + * All rights reserved. + * Usage of this program and the accompanying materials is subject to license terms + * + * @author pushparaj.motamari + */ +public class RemoveSnapshotImpl implements RemoveSnapshotAccess { + private static final Log LOG = LogFactory.getLog(RollBackImpl.class); + private static int BUFFER_SIZE = 50000; + private int rootId = 2; + + @Override + public boolean processInodesPhase1() throws IOException { + ExecutorService pool = Executors.newCachedThreadPool(); + int intervalStart; + int minId; + //Remove all back-up rows..i.e ..rows with negative id. + try { + minId = execMySqlQuery("select min(id) from hdfs_inodes where id<0"); + + } catch (StorageException ex) { + // Logger.getLogger(RollBackImpl.class.getName()).log(Level.SEVERE, null, ex); + throw new StorageException(ex); + } + + intervalStart = 0; + InodesPhase1Callable mr; + while (intervalStart > minId) { + mr = new InodesPhase1Callable(intervalStart, intervalStart - BUFFER_SIZE); + intervalStart = intervalStart - BUFFER_SIZE; + pool.submit(mr); + } + + //Wait for completion of above task. + RollBackImpl.shutDownPool(pool); + + //Confirm that the task has done. + int count = 1; + try { + count = execMySqlQuery("select count(*) from hdfs_inodes where id <0 "); + + } catch (StorageException ex) { + //Logger.getLogger(RollBackImpl.class.getName()).log(Level.SEVERE, null, ex); + throw new StorageException(ex); + } + + return count == 0; + } + private static class InodesPhase1Callable implements Callable { + + int startId, endId; + + InodesPhase1Callable(int startId, int endId) { + this.startId = startId; + this.endId = endId; + } + + @Override + public Boolean call() throws StorageException { + HopsSession session = ClusterjConnector.getInstance().obtainSession(); + HopsQueryBuilder qb = session.getQueryBuilder(); + HopsQueryDomainType deleteInodes = qb.createQueryDefinition(INodeClusterj.InodeDTO.class); + + HopsPredicate pred5 = deleteInodes.get(INodeTableDef.ID).lessEqual(deleteInodes.param("startId")); + HopsPredicate pred6 = deleteInodes.get(INodeTableDef.ID).greaterEqual(deleteInodes.param("endId")); + deleteInodes.where(pred6.and(pred5)); + + HopsQuery deleteQuery = session.createQuery(deleteInodes); + + deleteQuery.setParameter("startId", startId); + deleteQuery.setParameter("endId", endId); + + deleteQuery.deletePersistentAll(); + + session.flush(); + session.close(); + return true; + } + } + + @Override + public boolean processInodesPhase2() throws IOException { + ExecutorService pool = Executors.newCachedThreadPool(); + int intervalStart; + int minId,maxId; + // Change all new and modified rows to Original. + try { + maxId = execMySqlQuery("select max(id) from hdfs_inodes where id>0 " + + " and (status ="+SnapShotConstants.Modified+" or status = "+SnapShotConstants.New+")"); + minId = execMySqlQuery("select min(id) from hdfs_inodes where id>0" + + " and (status ="+SnapShotConstants.Modified+" or status = "+SnapShotConstants.New+")"); + + } catch (StorageException ex) { + throw new StorageException(ex); + } + + intervalStart = minId; + InodesPhase2Callable mr; + while (intervalStart < maxId) { + mr = new InodesPhase2Callable(intervalStart, intervalStart + BUFFER_SIZE); + intervalStart = intervalStart + BUFFER_SIZE; + pool.submit(mr); + } + + //Wait for completion of above task. + RollBackImpl.shutDownPool(pool); + + //Confirm that the task has done. + int count = 1; + try { + count = execMySqlQuery("select count(*) from hdfs_inodes where id >0 and (status =2 or status = 3) "); + + } catch (StorageException ex) { + throw new StorageException(ex); + } + + return count == 0; + } + private static class InodesPhase2Callable implements Callable { + + int startId, endId; + + InodesPhase2Callable(int startId, int endId) { + this.startId = startId; + this.endId = endId; + } + + @Override + public Boolean call() throws StorageException { + HopsSession session = ClusterjConnector.getInstance().obtainSession(); + HopsQueryBuilder qb = session.getQueryBuilder(); + HopsQueryDomainType updateInodes = qb.createQueryDefinition(INodeClusterj.InodeDTO.class); + + HopsPredicate pred3 = updateInodes.get(INodeTableDef.STATUS).equal(updateInodes.param("statusParam1")); + HopsPredicate pred4 = updateInodes.get(INodeTableDef.STATUS).equal(updateInodes.param("statusParam2")); + HopsPredicate pred5 = updateInodes.get(INodeTableDef.ID).greaterEqual(updateInodes.param("startId")); + HopsPredicate pred6 = updateInodes.get(INodeTableDef.ID).lessEqual(updateInodes.param("endId")); + updateInodes.where(pred6.and(pred5).and(pred3.or(pred4))); + + HopsQuery updateQuery = session.createQuery(updateInodes); + + updateQuery.setParameter("statusParameter1",SnapShotConstants.Modified); + updateQuery.setParameter("statusParameter2",SnapShotConstants.New); + updateQuery.setParameter("startId", startId); + updateQuery.setParameter("endId", endId); + + List resultList = updateQuery.getResultList(); + + for(INodeClusterj.InodeDTO inode : resultList){ + inode.setStatus(SnapShotConstants.Original); + } + + session.updatePersistentAll(resultList); + session.flush(); + session.close(); + return true; + } + } + + @Override + public boolean processInodeAttributesPhase1() throws IOException { + ExecutorService pool = Executors.newCachedThreadPool(); + int intervalStart; + int minId; + //Remove all back-up rows..i.e ..rows with negative id. + try { + minId = execMySqlQuery("select min("+INodeAttributesTableDef.ID +") from hdfs_inode_attributes where "+INodeAttributesTableDef.ID +"<0"); + + } catch (StorageException ex) { + // Logger.getLogger(RollBackImpl.class.getName()).log(Level.SEVERE, null, ex); + throw new StorageException(ex); + } + + intervalStart = 0; + InodeAttributesPhase1Callable mr; + while (intervalStart > minId) { + mr = new InodeAttributesPhase1Callable(intervalStart, intervalStart - BUFFER_SIZE); + intervalStart = intervalStart - BUFFER_SIZE; + pool.submit(mr); + } + + //Wait for completion of above task. + RollBackImpl.shutDownPool(pool); + + //Confirm that the task has done. + int count = 1; + try { + count = execMySqlQuery("select count(*) from hdfs_inode_attributes where "+INodeAttributesTableDef.ID +" <0 "); + + } catch (StorageException ex) { + //Logger.getLogger(RollBackImpl.class.getName()).log(Level.SEVERE, null, ex); + throw new StorageException(ex); + } + + return count == 0; + } + private static class InodeAttributesPhase1Callable implements Callable { + + int startId, endId; + + InodeAttributesPhase1Callable(int startId, int endId) { + this.startId = startId; + this.endId = endId; + } + + @Override + public Boolean call() throws StorageException { + HopsSession session = ClusterjConnector.getInstance().obtainSession(); + HopsQueryBuilder qb = session.getQueryBuilder(); + HopsQueryDomainType deleteInodeAttributes = qb.createQueryDefinition(INodeAttributesClusterj.INodeAttributesDTO.class); + + HopsPredicate pred5 = deleteInodeAttributes.get(INodeAttributesTableDef.ID).lessEqual(deleteInodeAttributes.param("startId")); + HopsPredicate pred6 = deleteInodeAttributes.get(INodeAttributesTableDef.ID).greaterEqual(deleteInodeAttributes.param("endId")); + deleteInodeAttributes.where(pred6.and(pred5)); + + HopsQuery deleteQuery = session.createQuery(deleteInodeAttributes); + + deleteQuery.setParameter("startId", startId); + deleteQuery.setParameter("endId", endId); + + deleteQuery.deletePersistentAll(); + + session.flush(); + session.close(); + return true; + } + } + + @Override + public boolean processInodeAttributesPhase2() throws IOException { + ExecutorService pool = Executors.newCachedThreadPool(); + int intervalStart; + int minId,maxId; + // Change all new and modified rows to Original. + try { + maxId = execMySqlQuery("select max("+INodeAttributesTableDef.ID +") from hdfs_inode_attributes where "+INodeAttributesTableDef.ID +">0" + + " and (status ="+SnapShotConstants.Modified+" or status = "+SnapShotConstants.New+")"); + minId = execMySqlQuery("select min("+INodeAttributesTableDef.ID +") from hdfs_inode_attributes where "+INodeAttributesTableDef.ID +">0 " + + " and (status ="+SnapShotConstants.Modified+" or status = "+SnapShotConstants.New+")"); + } catch (StorageException ex) { + throw new StorageException(ex); + } + + intervalStart = minId; + InodeAttributesPhase2Callable mr; + while (intervalStart < maxId) { + mr = new InodeAttributesPhase2Callable(intervalStart, intervalStart + BUFFER_SIZE); + intervalStart = intervalStart + BUFFER_SIZE; + pool.submit(mr); + } + + //Wait for completion of above task. + RollBackImpl.shutDownPool(pool); + + //Confirm that the task has done. + int count = 1; + try { + count = execMySqlQuery("select count(*) from hdfs_inode_attributes where "+INodeAttributesTableDef.ID +" >0 and (status =2 or status = 3) "); + + } catch (StorageException ex) { + throw new StorageException(ex); + } + + return count == 0; + } + + private static class InodeAttributesPhase2Callable implements Callable { + + int startId, endId; + + InodeAttributesPhase2Callable(int startId, int endId) { + this.startId = startId; + this.endId = endId; + } + + @Override + public Boolean call() throws StorageException { + HopsSession session = ClusterjConnector.getInstance().obtainSession(); + HopsQueryBuilder qb = session.getQueryBuilder(); + HopsQueryDomainType updateInodeAttributes = qb.createQueryDefinition(INodeAttributesClusterj.INodeAttributesDTO.class); + + HopsPredicate pred3 = updateInodeAttributes.get(INodeAttributesTableDef.STATUS).equal(updateInodeAttributes.param("statusParam1")); + HopsPredicate pred4 = updateInodeAttributes.get(INodeAttributesTableDef.STATUS).equal(updateInodeAttributes.param("statusParam2")); + HopsPredicate pred5 = updateInodeAttributes.get(INodeAttributesTableDef.ID).greaterEqual(updateInodeAttributes.param("startId")); + HopsPredicate pred6 = updateInodeAttributes.get(INodeAttributesTableDef.ID).lessEqual(updateInodeAttributes.param("endId")); + updateInodeAttributes.where(pred6.and(pred5).and(pred3.or(pred4))); + + HopsQuery updateQuery = session.createQuery(updateInodeAttributes); + + updateQuery.setParameter("statusParameter1",SnapShotConstants.Modified); + updateQuery.setParameter("statusParameter2",SnapShotConstants.New); + updateQuery.setParameter("startId", startId); + updateQuery.setParameter("endId", endId); + + List resultList = updateQuery.getResultList(); + + for(INodeAttributesClusterj.INodeAttributesDTO inodeAttribute : resultList){ + inodeAttribute.setStatus(SnapShotConstants.Original); + } + + session.updatePersistentAll(resultList); + session.flush(); + session.close(); + return true; + } + } + + public boolean processBlocksPhase1() throws IOException { + ExecutorService pool = Executors.newCachedThreadPool(); + int intervalStart; + int minId; + //Remove all back-up rows..i.e ..rows with negative id. + try { + minId = execMySqlQuery("select min("+BlockInfoTableDef.BLOCK_ID +") from hdfs_block_infos where "+BlockInfoTableDef.BLOCK_ID +"<0"); + } catch (StorageException ex) { + // Logger.getLogger(RollBackImpl.class.getName()).log(Level.SEVERE, null, ex); + throw new StorageException(ex); + } + + intervalStart = 0; + processBlocksPhase1Callable mr; + while (intervalStart > minId) { + mr = new processBlocksPhase1Callable(intervalStart, intervalStart - BUFFER_SIZE); + intervalStart = intervalStart - BUFFER_SIZE; + pool.submit(mr); + } + + //Wait for completion of above task. + RollBackImpl.shutDownPool(pool); + + //Confirm that the task has done. + int count = 1; + try { + count = execMySqlQuery("select count(*) from hdfs_block_infos where "+BlockInfoTableDef.BLOCK_ID +" <0 "); + + } catch (StorageException ex) { + //Logger.getLogger(RollBackImpl.class.getName()).log(Level.SEVERE, null, ex); + throw new StorageException(ex); + } + + return count == 0; + } + private static class processBlocksPhase1Callable implements Callable { + + int startId, endId; + + processBlocksPhase1Callable(int startId, int endId) { + this.startId = startId; + this.endId = endId; + } + + @Override + public Boolean call() throws StorageException { + HopsSession session = ClusterjConnector.getInstance().obtainSession(); + HopsQueryBuilder qb = session.getQueryBuilder(); + HopsQueryDomainType deleteBlocks = qb.createQueryDefinition(BlockInfoClusterj.BlockInfoDTO.class); + + HopsPredicate pred5 = deleteBlocks.get(BlockInfoTableDef.BLOCK_ID).lessEqual(deleteBlocks.param("startId")); + HopsPredicate pred6 = deleteBlocks.get(BlockInfoTableDef.BLOCK_ID).greaterEqual(deleteBlocks.param("endId")); + deleteBlocks.where(pred6.and(pred5)); + + HopsQuery deleteQuery = session.createQuery(deleteBlocks); + + deleteQuery.setParameter("startId", startId); + deleteQuery.setParameter("endId", endId); + + deleteQuery.deletePersistentAll(); + + session.flush(); + session.close(); + return true; + } + } + + @Override + public boolean processBlocksPhase2() throws IOException { + ExecutorService pool = Executors.newCachedThreadPool(); + int intervalStart; + int minId,maxId; + // Change all new and modified rows to Original. + try { + maxId = execMySqlQuery("select max("+BlockInfoTableDef.BLOCK_ID +") from hdfs_block_infos where "+BlockInfoTableDef.BLOCK_ID +">0" + + " and (status ="+SnapShotConstants.Modified+" or status = "+SnapShotConstants.New+")"); + minId = execMySqlQuery("select min("+BlockInfoTableDef.BLOCK_ID +") from hdfs_block_infos where "+BlockInfoTableDef.BLOCK_ID +">0 " + + " and (status ="+SnapShotConstants.Modified+" or status = "+SnapShotConstants.New+")"); + } catch (StorageException ex) { + throw new StorageException(ex); + } + + intervalStart = minId; + processBlocksPhase2Callable mr; + while (intervalStart < maxId) { + mr = new processBlocksPhase2Callable(intervalStart, intervalStart + BUFFER_SIZE); + intervalStart = intervalStart + BUFFER_SIZE; + pool.submit(mr); + } + + //Wait for completion of above task. + RollBackImpl.shutDownPool(pool); + + //Confirm that the task has done. + int count = 1; + try { + count = execMySqlQuery("select count(*) from hdfs_block_infos where "+BlockInfoTableDef.BLOCK_ID +" >0 " + + "and ( "+BlockInfoTableDef.STATUS +" =2 or "+BlockInfoTableDef.STATUS +" = 3) "); + + } catch (StorageException ex) { + throw new StorageException(ex); + } + + return count == 0; + } + private static class processBlocksPhase2Callable implements Callable { + + int startId, endId; + + processBlocksPhase2Callable(int startId, int endId) { + this.startId = startId; + this.endId = endId; + } + + @Override + public Boolean call() throws StorageException { + HopsSession session = ClusterjConnector.getInstance().obtainSession(); + HopsQueryBuilder qb = session.getQueryBuilder(); + HopsQueryDomainType updateBlockInfos = qb.createQueryDefinition(BlockInfoClusterj.BlockInfoDTO.class); + + HopsPredicate pred3 = updateBlockInfos.get(BlockInfoTableDef.STATUS).equal(updateBlockInfos.param("statusParam1")); + HopsPredicate pred4 = updateBlockInfos.get(BlockInfoTableDef.STATUS).equal(updateBlockInfos.param("statusParam2")); + HopsPredicate pred5 = updateBlockInfos.get(BlockInfoTableDef.BLOCK_ID).greaterEqual(updateBlockInfos.param("startId")); + HopsPredicate pred6 = updateBlockInfos.get(BlockInfoTableDef.BLOCK_ID).lessEqual(updateBlockInfos.param("endId")); + updateBlockInfos.where(pred6.and(pred5).and(pred3.or(pred4))); + + HopsQuery updateQuery = session.createQuery(updateBlockInfos); + + updateQuery.setParameter("statusParameter1",SnapShotConstants.Modified); + updateQuery.setParameter("statusParameter2",SnapShotConstants.New); + updateQuery.setParameter("startId", startId); + updateQuery.setParameter("endId", endId); + + List resultList = updateQuery.getResultList(); + + for(BlockInfoClusterj.BlockInfoDTO inodeAttribute : resultList){ + inodeAttribute.setStatus(SnapShotConstants.Original); + } + + session.updatePersistentAll(resultList); + session.flush(); + session.close(); + return true; + } + } + public boolean waitForSubTreeOperations() throws IOException { + String query = "select count(*) from hdfs_on_going_sub_tree_ops"; + long startTime = System.currentTimeMillis(); + while (System.currentTimeMillis() - startTime < 60 * 1000) { + int onGoingSubTreeOpsCount = execMySqlQuery(query); + if (onGoingSubTreeOpsCount == 0) { + return true; + } + } + return false; + } + + public boolean waitForQuotaUpdates() throws IOException { + String query = "select count(*) from hdfs_quota_update"; + long startTime = System.currentTimeMillis(); + while (System.currentTimeMillis() - startTime < 60 * 1000) { + int quotaUpdatesYetToBeApplied = execMySqlQuery(query); + if (quotaUpdatesYetToBeApplied == 0) { + return true; + } + } + return false; + } + +} diff --git a/src/main/java/io/hops/metadata/ndb/dalimpl/rollBack/RollBackImpl.java b/src/main/java/io/hops/metadata/ndb/dalimpl/rollBack/RollBackImpl.java index 291845f2..096bb13d 100644 --- a/src/main/java/io/hops/metadata/ndb/dalimpl/rollBack/RollBackImpl.java +++ b/src/main/java/io/hops/metadata/ndb/dalimpl/rollBack/RollBackImpl.java @@ -4,11 +4,20 @@ */ package io.hops.metadata.ndb.dalimpl.rollBack; -import com.mysql.clusterj.Query; -import com.mysql.clusterj.Session; -import com.mysql.clusterj.query.Predicate; -import com.mysql.clusterj.query.QueryBuilder; -import com.mysql.clusterj.query.QueryDomainType; +import io.hops.exception.StorageException; +import io.hops.metadata.hdfs.TablesDef; +import io.hops.metadata.hdfs.entity.INodeAttributes; +import io.hops.metadata.hdfs.snapshots.SnapShotConstants; +import io.hops.metadata.ndb.ClusterjConnector; +import io.hops.metadata.ndb.dalimpl.hdfs.BlockInfoClusterj.BlockInfoDTO; +import io.hops.metadata.ndb.dalimpl.hdfs.INodeAttributesClusterj.INodeAttributesDTO; +import io.hops.metadata.ndb.dalimpl.hdfs.INodeClusterj.InodeDTO; +import io.hops.metadata.ndb.mysqlserver.MysqlServerConnector; +import io.hops.metadata.ndb.wrapper.*; +import io.hops.metadata.rollBack.dal.RollBackAccess; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import io.hops.metadata.hdfs.TablesDef.*; import java.io.IOException; import java.sql.Connection; import java.sql.PreparedStatement; @@ -19,21 +28,6 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.logging.Level; -import java.util.logging.Logger; - -import io.hops.metadata.hdfs.snapshots.SnapShotConstants; -import io.hops.metadata.ndb.wrapper.*; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import io.hops.exception.StorageException; -import io.hops.metadata.ndb.ClusterjConnector; -import io.hops.metadata.ndb.dalimpl.hdfs.BlockInfoClusterj.BlockInfoDTO; -import io.hops.metadata.ndb.dalimpl.hdfs.INodeAttributesClusterj.INodeAttributesDTO; -import io.hops.metadata.ndb.dalimpl.hdfs.INodeClusterj.InodeDTO; -import io.hops.metadata.ndb.mysqlserver.MysqlServerConnector; -import io.hops.metadata.rollBack.dal.RollBackAccess; -import io.hops.metadata.ndb.wrapper.HopsSession; /** * * @author pushparaj @@ -54,7 +48,6 @@ public boolean processInodesPhase1() throws IOException { try { maxId = execMySqlQuery("select max(id) from hdfs_inodes where status=2 or status=3"); - //An inode created after taking snapshot and deleted will have negative_id and isDeleted=1. minId = execMySqlQuery("select min(id) from hdfs_inodes where status=2 or status=3"); } catch (StorageException ex) { @@ -125,7 +118,7 @@ public boolean processInodesPhase2() throws IOException { return count == 0; } - int execMySqlQuery(String query) throws StorageException { + static int execMySqlQuery(String query) throws StorageException { MysqlServerConnector connector = MysqlServerConnector.getInstance(); try { @@ -147,14 +140,15 @@ int execMySqlQuery(String query) throws StorageException { } - void shutDownPool(ExecutorService pool) { + static void shutDownPool(ExecutorService pool) { //Wait for completion of above task. pool.shutdown(); try { //pool.awaitTermination(10, TimeUnit.MINUTES); while (!pool.isTerminated()) { Thread.sleep(500); - }; + } + ; } catch (InterruptedException ex) { LOG.error(ex, ex); } @@ -370,59 +364,6 @@ public boolean processBlocksPhase2() throws IOException { return count == 0; } - @Override - public boolean processRootAndSetSubTreeLocked(long nameNodeId) throws IOException { - - HopsSession session = ClusterjConnector.getInstance().obtainSession(); - - HopsQueryBuilder qb = session.getQueryBuilder(); - HopsQueryDomainType deleteInodes = qb.createQueryDefinition(InodeDTO.class); - HopsPredicate pred3 = deleteInodes.get("Id").equal(deleteInodes.param("RootId")); - deleteInodes.where(pred3); - HopsQuery query = session.createQuery(deleteInodes); - query.setParameter("Id", rootId); - //delete the root. - query.deletePersistentAll(); - //Get the back-Up row. - InodeDTO backUpRow = session.find(InodeDTO.class, new Integer(-rootId)); - - int maxId; - - maxId = execMySqlQuery("select max(id) from hdfs_inodes where id >0"); - - InodeDTO newInode = session.newInstance(InodeDTO.class); - - newInode.setId(maxId + 1); - newInode.setName(backUpRow.getName()); - newInode.setParentId(-backUpRow.getParentId()); - newInode.setQuotaEnabled(backUpRow.getQuotaEnabled()); - newInode.setModificationTime(backUpRow.getModificationTime()); - newInode.setATime(backUpRow.getATime()); - newInode.setPermission(backUpRow.getPermission()); - newInode.setUnderConstruction(backUpRow.getUnderConstruction()); - newInode.setClientName(backUpRow.getClientName()); - newInode.setClientMachine(backUpRow.getClientMachine()); - newInode.setClientNode(backUpRow.getClientNode()); - newInode.setGenerationStamp(backUpRow.getGenerationStamp()); - newInode.setHeader(backUpRow.getHeader()); - newInode.setSymlink(backUpRow.getSymlink()); - newInode.setSubtreeLocked((byte)1);//SetSubTreeLockedToTree. - newInode.setSubtreeLockOwner(nameNodeId); - newInode.setMetaEnabled(backUpRow.getMetaEnabled()); - newInode.setSize(backUpRow.getSize()); - newInode.setIsDeleted(backUpRow.getIsDeleted()); - newInode.setStatus(backUpRow.getStatus()); - - session.persist(newInode);//Save the newRow of root with MaxId; - session.deletePersistent(InodeDTO.class, new Integer(-rootId));//delete the backUp row. - - session.flush(); - session.close(); - - return true; - - } - @Override public boolean unSetSubTreeLockedOnRoot() throws IOException { HopsSession session = ClusterjConnector.getInstance().obtainSession(); @@ -448,7 +389,7 @@ public boolean unSetSubTreeLockedOnRoot() throws IOException { newRoot.setGenerationStamp(oldRoot.getGenerationStamp()); newRoot.setHeader(oldRoot.getHeader()); newRoot.setSymlink(oldRoot.getSymlink()); - newRoot.setSubtreeLocked((byte)0);//UnsetSubTreeLockedToTree. + newRoot.setSubtreeLocked((byte) 0);//UnsetSubTreeLockedToTree. newRoot.setSubtreeLockOwner(oldRoot.getSubtreeLockOwner()); newRoot.setMetaEnabled(oldRoot.getMetaEnabled()); newRoot.setSize(oldRoot.getSize()); @@ -463,473 +404,497 @@ public boolean unSetSubTreeLockedOnRoot() throws IOException { return true; } -} -class BlocksPhase1Callable implements Callable { - - long startId, endId; - - BlocksPhase1Callable(long startId, long endId) { - this.startId = startId; - this.endId = endId; + @Override + public boolean waitForSubTreeOperations() throws IOException { + String query = "select count(*) from hdfs_on_going_sub_tree_ops"; + long startTime = System.currentTimeMillis(); + while (System.currentTimeMillis() - startTime < 60 * 1000) { + int onGoingSubTreeOpsCount = execMySqlQuery(query); + if (onGoingSubTreeOpsCount == 0) { + return true; + } + } + return false; } @Override - public Boolean call() throws StorageException { - - HopsSession session = ClusterjConnector.getInstance().obtainSession(); - - HopsQueryBuilder qb = session.getQueryBuilder(); - //Delete inodes with status=2 or status=3 - HopsQueryDomainType deleteBlockInfos = qb.createQueryDefinition(BlockInfoDTO.class); - HopsPredicate pred1 = deleteBlockInfos.get("status").equal(deleteBlockInfos.param("statusParam2")); - HopsPredicate pred2 = deleteBlockInfos.get("status").equal(deleteBlockInfos.param("statusParam3")); - HopsPredicate pred3 = deleteBlockInfos.get("blockId").greaterEqual(deleteBlockInfos.param("startId")); - HopsPredicate pred4 = deleteBlockInfos.get("blockId").lessEqual(deleteBlockInfos.param("endId")); - HopsPredicate pred5 = pred1.or(pred2); - deleteBlockInfos.where(pred5.and(pred3).and(pred4)); - HopsQuery query = session.createQuery(deleteBlockInfos); - query.setParameter("statusParam2", 2); - query.setParameter("statusParam3", 3); - query.setParameter("startId", startId); - query.setParameter("endId", endId); - - - query.deletePersistentAll(); - - // tx.commit(); - session.flush(); - session.close(); - return true; + public boolean waitForQuotaUpdates() throws IOException { + String query = "select count(*) from hdfs_quota_update"; + long startTime = System.currentTimeMillis(); + while (System.currentTimeMillis() - startTime < 60 * 1000) { + int quotaUpdatesYetToBeApplied = execMySqlQuery(query); + if (quotaUpdatesYetToBeApplied == 0) { + return true; + } + } + return false; } -} -class BlocksPhase2Callable implements Callable { + private static class BlocksPhase1Callable implements Callable { - long startId, endId; - final Lock lock; + long startId, endId; - BlocksPhase2Callable(long startId, long endId, Lock lock) { - this.startId = startId; - this.endId = endId; - this.lock = lock; - } + BlocksPhase1Callable(long startId, long endId) { + this.startId = startId; + this.endId = endId; + } - @Override - public Boolean call() throws StorageException, IOException { - try { + @Override + public Boolean call() throws StorageException { HopsSession session = ClusterjConnector.getInstance().obtainSession(); -// Transaction tx = session.currentTransaction(); -// tx.begin(); - //System.err.println(Thread.currentThread().getId() + ": Started. StartId=" + startId + ", endId=" + endId); HopsQueryBuilder qb = session.getQueryBuilder(); - HopsQueryDomainType updateBlockNodes = qb.createQueryDefinition(BlockInfoDTO.class); - HopsPredicate pred3 = updateBlockNodes.get("block_Id").lessEqual(updateBlockNodes.param("startId")); - HopsPredicate pred4 = updateBlockNodes.get("block_Id").greaterEqual(updateBlockNodes.param("endId")); + //Delete inodes with status=2 or status=3 + HopsQueryDomainType deleteBlockInfos = qb.createQueryDefinition(BlockInfoDTO.class); + HopsPredicate pred1 = deleteBlockInfos.get(BlockInfoTableDef.STATUS).equal(deleteBlockInfos.param("statusParam2")); + HopsPredicate pred2 = deleteBlockInfos.get(BlockInfoTableDef.STATUS).equal(deleteBlockInfos.param("statusParam3")); + HopsPredicate pred3 = deleteBlockInfos.get(BlockInfoTableDef.BLOCK_ID).greaterEqual(deleteBlockInfos.param("startId")); + HopsPredicate pred4 = deleteBlockInfos.get(BlockInfoTableDef.BLOCK_ID).lessEqual(deleteBlockInfos.param("endId")); + HopsPredicate pred5 = pred1.or(pred2); + deleteBlockInfos.where(pred5.and(pred3).and(pred4)); + HopsQuery query = session.createQuery(deleteBlockInfos); + query.setParameter("statusParam2", SnapShotConstants.Modified); + query.setParameter("statusParam3", SnapShotConstants.New); + query.setParameter("startId", startId); + query.setParameter("endId", endId); + + + query.deletePersistentAll(); + + // tx.commit(); + session.flush(); + session.close(); + return true; + } + } - updateBlockNodes.where(pred3.and(pred4)); + private static class BlocksPhase2Callable implements Callable { - HopsQuery UpdateQuery = session.createQuery(updateBlockNodes); - UpdateQuery.setParameter("startId", startId); - UpdateQuery.setParameter("endId", endId); + long startId, endId; + final Lock lock; + BlocksPhase2Callable(long startId, long endId, Lock lock) { + this.startId = startId; + this.endId = endId; + this.lock = lock; + } - List results = UpdateQuery.getResultList(); - //System.out.println(Thread.currentThread().getId() + ": Result Size= " + results.size()); - List newResults = new ArrayList(results.size()); + @Override + public Boolean call() throws StorageException, IOException { + try { - BlockInfoDTO newBlockInfo; + HopsSession session = ClusterjConnector.getInstance().obtainSession(); +// Transaction tx = session.currentTransaction(); +// tx.begin(); + //System.err.println(Thread.currentThread().getId() + ": Started. StartId=" + startId + ", endId=" + endId); - for (BlockInfoDTO row : results) { - newBlockInfo = session.newInstance(BlockInfoDTO.class); + HopsQueryBuilder qb = session.getQueryBuilder(); + HopsQueryDomainType updateBlockNodes = qb.createQueryDefinition(BlockInfoDTO.class); + HopsPredicate pred3 = updateBlockNodes.get(BlockInfoTableDef.BLOCK_ID).lessEqual(updateBlockNodes.param("startId")); + HopsPredicate pred4 = updateBlockNodes.get(BlockInfoTableDef.BLOCK_ID).greaterEqual(updateBlockNodes.param("endId")); - newBlockInfo.setBlockId(-row.getBlockId()); - newBlockInfo.setNumBytes(row.getNumBytes()); - newBlockInfo.setGenerationStamp(row.getGenerationStamp()); - newBlockInfo.setINodeId(-row.getINodeId()); - newBlockInfo.setTimestamp(row.getTimestamp()); - newBlockInfo.setBlockIndex(row.getBlockIndex()); - newBlockInfo.setBlockUCState(row.getBlockUCState()); - newBlockInfo.setPrimaryNodeIndex(row.getPrimaryNodeIndex()); - newBlockInfo.setBlockRecoveryId(row.getBlockRecoveryId()); - newBlockInfo.setStatus(row.getStatus()); + updateBlockNodes.where(pred3.and(pred4)); - newResults.add(newBlockInfo); - } + HopsQuery UpdateQuery = session.createQuery(updateBlockNodes); + UpdateQuery.setParameter("startId", startId); + UpdateQuery.setParameter("endId", endId); - synchronized (lock) { - session.makePersistentAll(newResults); - } - System.gc(); - // System.out.println(Thread.currentThread().getId() + ": Acquired Lock. rows to write "+newResults.size()); - session = ClusterjConnector.getInstance().obtainSession(); - - - HopsQueryBuilder qbd = session.getQueryBuilder(); - HopsQueryDomainType deleteInodesWithNegativeId = qbd.createQueryDefinition(BlockInfoDTO.class); - HopsPredicate pred3d = deleteInodesWithNegativeId.get("blockId").lessEqual(deleteInodesWithNegativeId.param("startId")); - HopsPredicate pred4d = deleteInodesWithNegativeId.get("blockId").greaterEqual(deleteInodesWithNegativeId.param("endId")); - - deleteInodesWithNegativeId.where(pred3d.and(pred4d)); - HopsQuery queryToDeleteNegative = session.createQuery(deleteInodesWithNegativeId); - queryToDeleteNegative.setParameter("startId", startId); - queryToDeleteNegative.setParameter("endId", endId); - synchronized (lock) { - queryToDeleteNegative.deletePersistentAll(); - } - session.flush(); - session.close(); - } catch (Throwable e) { - throw new IOException(e); - } - return true; - } -} + List results = UpdateQuery.getResultList(); + //System.out.println(Thread.currentThread().getId() + ": Result Size= " + results.size()); + List newResults = new ArrayList(results.size()); -class InodeAttributesPhase1Callable implements Callable { + BlockInfoDTO newBlockInfo; - int startId, endId; + for (BlockInfoDTO row : results) { + newBlockInfo = session.newInstance(BlockInfoDTO.class); - InodeAttributesPhase1Callable(int startId, int endId) { - this.startId = startId; - this.endId = endId; - } + newBlockInfo.setBlockId(-row.getBlockId()); + newBlockInfo.setNumBytes(row.getNumBytes()); + newBlockInfo.setGenerationStamp(row.getGenerationStamp()); + newBlockInfo.setINodeId(-row.getINodeId()); + newBlockInfo.setTimestamp(row.getTimestamp()); + newBlockInfo.setBlockIndex(row.getBlockIndex()); + newBlockInfo.setBlockUCState(row.getBlockUCState()); + newBlockInfo.setPrimaryNodeIndex(row.getPrimaryNodeIndex()); + newBlockInfo.setBlockRecoveryId(row.getBlockRecoveryId()); + newBlockInfo.setStatus(row.getStatus()); - @Override - public Boolean call() throws StorageException { + newResults.add(newBlockInfo); + } - HopsSession session = ClusterjConnector.getInstance().obtainSession(); - // Transaction tx = session.currentTransaction(); - //tx.begin(); - - HopsQueryBuilder qb = session.getQueryBuilder(); - //Delete inodes with status=2 or status=3 - HopsQueryDomainType deleteInodeAttributes = qb.createQueryDefinition(INodeAttributesDTO.class); - HopsPredicate pred1 = deleteInodeAttributes.get("status").equal(deleteInodeAttributes.param("statusParam2")); - HopsPredicate pred2 = deleteInodeAttributes.get("status").equal(deleteInodeAttributes.param("statusParam3")); - HopsPredicate pred3 = deleteInodeAttributes.get("id").greaterEqual(deleteInodeAttributes.param("startId")); - HopsPredicate pred4 = deleteInodeAttributes.get("id").lessEqual(deleteInodeAttributes.param("endId")); - HopsPredicate pred5 = pred3.and(pred4); - HopsPredicate pred6 = pred1.or(pred2); - deleteInodeAttributes.where(pred5.and(pred6)); - HopsQuery query = session.createQuery(deleteInodeAttributes); - query.setParameter("statusParam2", 2); - query.setParameter("statusParam3", 3); - query.setParameter("startId", startId); - query.setParameter("endId", endId); - - query.deletePersistentAll(); - - // tx.commit(); - session.flush(); - session.close(); - return true; + synchronized (lock) { + session.makePersistentAll(newResults); + } + System.gc(); + // System.out.println(Thread.currentThread().getId() + ": Acquired Lock. rows to write "+newResults.size()); + session = ClusterjConnector.getInstance().obtainSession(); + + + HopsQueryBuilder qbd = session.getQueryBuilder(); + HopsQueryDomainType deleteInodesWithNegativeId = qbd.createQueryDefinition(BlockInfoDTO.class); + HopsPredicate pred3d = deleteInodesWithNegativeId.get(BlockInfoTableDef.BLOCK_ID).lessEqual(deleteInodesWithNegativeId.param("startId")); + HopsPredicate pred4d = deleteInodesWithNegativeId.get(BlockInfoTableDef.BLOCK_ID).greaterEqual(deleteInodesWithNegativeId.param("endId")); + + deleteInodesWithNegativeId.where(pred3d.and(pred4d)); + HopsQuery queryToDeleteNegative = session.createQuery(deleteInodesWithNegativeId); + queryToDeleteNegative.setParameter("startId", startId); + queryToDeleteNegative.setParameter("endId", endId); + synchronized (lock) { + queryToDeleteNegative.deletePersistentAll(); + } + session.flush(); + session.close(); + } catch (Throwable e) { + throw new IOException(e); + } + return true; + + } } -} -class InodeAttributesPhase2Callable implements Callable { + private static class InodeAttributesPhase1Callable implements Callable { - int startId, endId; - final Lock lock; + int startId, endId; - InodeAttributesPhase2Callable(int startId, int endId, Lock lock) { - this.startId = startId; - this.endId = endId; - this.lock = lock; - } + InodeAttributesPhase1Callable(int startId, int endId) { + this.startId = startId; + this.endId = endId; + } - @Override - public Boolean call() throws StorageException, IOException { - try { + @Override + public Boolean call() throws StorageException { HopsSession session = ClusterjConnector.getInstance().obtainSession(); -// Transaction tx = session.currentTransaction(); -// tx.begin(); - //System.err.println(Thread.currentThread().getId() + ": Started. StartId=" + startId + ", endId=" + endId); + // Transaction tx = session.currentTransaction(); + //tx.begin(); HopsQueryBuilder qb = session.getQueryBuilder(); - HopsQueryDomainType updateInodes = qb.createQueryDefinition(INodeAttributesDTO.class); - HopsPredicate pred3 = updateInodes.get("id").lessEqual(updateInodes.param("startId")); - HopsPredicate pred4 = updateInodes.get("id").greaterEqual(updateInodes.param("endId")); - - updateInodes.where(pred3.and(pred4)); - - HopsQuery UpdateQuery = session.createQuery(updateInodes); - UpdateQuery.setParameter("startId", startId); - UpdateQuery.setParameter("endId", endId); + //Delete inodes with status=2 or status=3 + HopsQueryDomainType deleteInodeAttributes = qb.createQueryDefinition(INodeAttributesDTO.class); + HopsPredicate pred1 = deleteInodeAttributes.get(INodeAttributesTableDef.STATUS).equal(deleteInodeAttributes.param("statusParam2")); + HopsPredicate pred2 = deleteInodeAttributes.get(INodeAttributesTableDef.STATUS).equal(deleteInodeAttributes.param("statusParam3")); + HopsPredicate pred3 = deleteInodeAttributes.get(INodeAttributesTableDef.ID).greaterEqual(deleteInodeAttributes.param("startId")); + HopsPredicate pred4 = deleteInodeAttributes.get(INodeAttributesTableDef.ID).lessEqual(deleteInodeAttributes.param("endId")); + HopsPredicate pred5 = pred3.and(pred4); + HopsPredicate pred6 = pred1.or(pred2); + deleteInodeAttributes.where(pred5.and(pred6)); + HopsQuery query = session.createQuery(deleteInodeAttributes); + query.setParameter("statusParam2", SnapShotConstants.Modified); + query.setParameter("statusParam3", SnapShotConstants.New); + query.setParameter("startId", startId); + query.setParameter("endId", endId); + + query.deletePersistentAll(); + + // tx.commit(); + session.flush(); + session.close(); + return true; + } + } + private static class InodeAttributesPhase2Callable implements Callable { - List results = UpdateQuery.getResultList(); - //System.out.println(Thread.currentThread().getId() + ": Result Size= " + results.size()); - List newResults = new ArrayList(results.size()); + int startId, endId; + final Lock lock; - INodeAttributesDTO newInodeAttributes; + InodeAttributesPhase2Callable(int startId, int endId, Lock lock) { + this.startId = startId; + this.endId = endId; + this.lock = lock; + } - for (INodeAttributesDTO row : results) { - newInodeAttributes = session.newInstance(INodeAttributesDTO.class); + @Override + public Boolean call() throws StorageException, IOException { + try { - newInodeAttributes.setId(-row.getId()); - newInodeAttributes.setNSQuota(row.getNSQuota()); - newInodeAttributes.setNSCount(row.getNSCount()); - newInodeAttributes.setDSQuota(row.getDSQuota()); - newInodeAttributes.setDiskspace(row.getDiskspace()); - newInodeAttributes.setStatus(row.getStatus()); + HopsSession session = ClusterjConnector.getInstance().obtainSession(); +// Transaction tx = session.currentTransaction(); +// tx.begin(); + //System.err.println(Thread.currentThread().getId() + ": Started. StartId=" + startId + ", endId=" + endId); - newResults.add(newInodeAttributes); - } + HopsQueryBuilder qb = session.getQueryBuilder(); + HopsQueryDomainType updateInodes = qb.createQueryDefinition(INodeAttributesDTO.class); + HopsPredicate pred3 = updateInodes.get(INodeAttributesTableDef.ID).lessEqual(updateInodes.param("startId")); + HopsPredicate pred4 = updateInodes.get(INodeAttributesTableDef.ID).greaterEqual(updateInodes.param("endId")); - synchronized (lock) { + updateInodes.where(pred3.and(pred4)); - session.makePersistentAll(newResults); - } - System.gc(); + HopsQuery UpdateQuery = session.createQuery(updateInodes); + UpdateQuery.setParameter("startId", startId); + UpdateQuery.setParameter("endId", endId); - HopsQueryBuilder qbd = session.getQueryBuilder(); - HopsQueryDomainType deleteInodesWithNegativeId = qbd.createQueryDefinition(INodeAttributesDTO.class); - HopsPredicate pred3d = deleteInodesWithNegativeId.get("id").lessEqual(deleteInodesWithNegativeId.param("startId")); - HopsPredicate pred4d = deleteInodesWithNegativeId.get("id").greaterEqual(deleteInodesWithNegativeId.param("endId")); - deleteInodesWithNegativeId.where(pred3d.and(pred4d)); - HopsQuery queryToDeleteNegative = session.createQuery(deleteInodesWithNegativeId); - queryToDeleteNegative.setParameter("startId", startId); - queryToDeleteNegative.setParameter("endId", endId); + List results = UpdateQuery.getResultList(); + //System.out.println(Thread.currentThread().getId() + ": Result Size= " + results.size()); + List newResults = new ArrayList(results.size()); - synchronized (lock) { - queryToDeleteNegative.deletePersistentAll(); - } + INodeAttributesDTO newInodeAttributes; - session.flush(); - session.close(); + for (INodeAttributesDTO row : results) { + newInodeAttributes = session.newInstance(INodeAttributesDTO.class); - } catch (Throwable e) { - throw new IOException(e); - } - return true; + newInodeAttributes.setId(-row.getId()); + newInodeAttributes.setNSQuota(row.getNSQuota()); + newInodeAttributes.setNSCount(row.getNSCount()); + newInodeAttributes.setDSQuota(row.getDSQuota()); + newInodeAttributes.setDiskspace(row.getDiskspace()); + newInodeAttributes.setStatus(row.getStatus()); - } -} + newResults.add(newInodeAttributes); + } -class InodesPhase1Callable implements Callable { + synchronized (lock) { - int startId, endId; + session.makePersistentAll(newResults); + } + System.gc(); - InodesPhase1Callable(int startId, int endId) { - this.startId = startId; - this.endId = endId; - } + HopsQueryBuilder qbd = session.getQueryBuilder(); + HopsQueryDomainType deleteInodesWithNegativeId = qbd.createQueryDefinition(INodeAttributesDTO.class); + HopsPredicate pred3d = deleteInodesWithNegativeId.get(INodeAttributesTableDef.ID).lessEqual(deleteInodesWithNegativeId.param("startId")); + HopsPredicate pred4d = deleteInodesWithNegativeId.get(INodeAttributesTableDef.ID).greaterEqual(deleteInodesWithNegativeId.param("endId")); - @Override - public Boolean call() throws StorageException { + deleteInodesWithNegativeId.where(pred3d.and(pred4d)); + HopsQuery queryToDeleteNegative = session.createQuery(deleteInodesWithNegativeId); + queryToDeleteNegative.setParameter("startId", startId); + queryToDeleteNegative.setParameter("endId", endId); - HopsSession session = ClusterjConnector.getInstance().obtainSession(); - // Transaction tx = session.currentTransaction(); - //tx.begin(); - - HopsQueryBuilder qb = session.getQueryBuilder(); - //Delete inodes with status=2 or status=3 - HopsQueryDomainType deleteInodes = qb.createQueryDefinition(InodeDTO.class); - HopsPredicate pred1 = deleteInodes.get("status").equal(deleteInodes.param("statusParam2")); - HopsPredicate pred2 = deleteInodes.get("status").equal(deleteInodes.param("statusParam3")); - HopsPredicate pred3 = deleteInodes.get("id").greaterEqual(deleteInodes.param("startId")); - HopsPredicate pred4 = deleteInodes.get("id").lessEqual(deleteInodes.param("endId")); - HopsPredicate pred5 = pred3.and(pred4); - HopsPredicate pred6 = pred1.or(pred2); - deleteInodes.where(pred5.and(pred6)); - HopsQuery query = session.createQuery(deleteInodes); - query.setParameter("statusParam2", 2); - query.setParameter("statusParam3", 3); - query.setParameter("startId", startId); - query.setParameter("endId", endId); - - - query.deletePersistentAll(); - - // tx.commit(); - session.flush(); - session.close(); - return true; - } -} + synchronized (lock) { + queryToDeleteNegative.deletePersistentAll(); + } -class InodesPhase2Callable implements Callable { + session.flush(); + session.close(); - int startId, endId; + } catch (Throwable e) { + throw new IOException(e); + } + return true; - InodesPhase2Callable(int startId, int endId) { - this.startId = startId; - this.endId = endId; + } } - @Override - public Boolean call() throws StorageException { - - HopsSession session = ClusterjConnector.getInstance().obtainSession(); - HopsQueryBuilder qb = session.getQueryBuilder(); - HopsQueryDomainType updateInodes = qb.createQueryDefinition(InodeDTO.class); - - HopsPredicate pred4 = updateInodes.get("isDeleted").equal(updateInodes.param("isDeletedParam")); - HopsPredicate pred5 = updateInodes.get("id").lessEqual(updateInodes.param("startId")); - HopsPredicate pred6 = updateInodes.get("id").greaterEqual(updateInodes.param("endId")); - updateInodes.where(pred6.and(pred5).and(pred4)); - - HopsQuery UpdateQuery = session.createQuery(updateInodes); - - UpdateQuery.setParameter("isDeletedParam", 1); - UpdateQuery.setParameter("startId", startId); - UpdateQuery.setParameter("endId", endId); - - List results = UpdateQuery.getResultList(); - List newResults = new ArrayList(results.size()); - List tobeDeleted = new ArrayList(results.size()); - - int id; - InodeDTO newInode; - for (InodeDTO row : results) { - if((id=row.getParentId())<0) { - newInode = session.newInstance(InodeDTO.class); - newInode.setId(-row.getId()); - //newInode.setName(row.getName().split("\\$DEL\\$")[0]); - newInode.setName(row.getName().split("\\$DEL:[0-9]+\\$",2)[0]); - newInode.setParentId(-id); - newInode.setQuotaEnabled(row.getQuotaEnabled()); - newInode.setModificationTime(row.getModificationTime()); - newInode.setATime(row.getATime()); - newInode.setPermission(row.getPermission()); - newInode.setUnderConstruction(row.getUnderConstruction()); - newInode.setClientName(row.getClientName()); - newInode.setClientMachine(row.getClientMachine()); - newInode.setClientNode(row.getClientNode()); - newInode.setGenerationStamp(row.getGenerationStamp()); - newInode.setHeader(row.getHeader()); - newInode.setSymlink(row.getSymlink()); - newInode.setSubtreeLocked(row.getSubtreeLocked()); - newInode.setSubtreeLockOwner(row.getSubtreeLockOwner()); - newInode.setMetaEnabled(row.getMetaEnabled()); - newInode.setSize(row.getSize()); - newInode.setIsDeleted(SnapShotConstants.isNotDeleted); - newInode.setStatus(row.getStatus()); - - newResults.add(newInode); - tobeDeleted.add(row); + private static class InodesPhase1Callable implements Callable { + int startId, endId; - } + InodesPhase1Callable(int startId, int endId) { + this.startId = startId; + this.endId = endId; } - if(!newResults.isEmpty()) - session.makePersistentAll(newResults); - if(!tobeDeleted.isEmpty()) - session.deletePersistentAll(tobeDeleted); + @Override + public Boolean call() throws StorageException { - //System.out.println("SetIsDeleted from id=" + factor * BUFFER_SIZE + " ,to id=" + (factor + 1) * BUFFER_SIZE + ". Time= " + (end - start) / 1000 + "Seconds"); + HopsSession session = ClusterjConnector.getInstance().obtainSession(); + // Transaction tx = session.currentTransaction(); + //tx.begin(); - // tx.commit(); - session.flush(); - session.close(); - return true; + HopsQueryBuilder qb = session.getQueryBuilder(); + //Delete inodes with status=2 or status=3 + HopsQueryDomainType deleteInodes = qb.createQueryDefinition(InodeDTO.class); + HopsPredicate pred1 = deleteInodes.get(INodeTableDef.STATUS).equal(deleteInodes.param("statusParam2")); + HopsPredicate pred2 = deleteInodes.get(INodeTableDef.STATUS).equal(deleteInodes.param("statusParam3")); + HopsPredicate pred3 = deleteInodes.get(INodeTableDef.ID).greaterEqual(deleteInodes.param("startId")); + HopsPredicate pred4 = deleteInodes.get(INodeTableDef.ID).lessEqual(deleteInodes.param("endId")); + HopsPredicate pred5 = pred3.and(pred4); + HopsPredicate pred6 = pred1.or(pred2); + deleteInodes.where(pred5.and(pred6)); + HopsQuery query = session.createQuery(deleteInodes); + query.setParameter("statusParam2", SnapShotConstants.Modified); + query.setParameter("statusParam3", SnapShotConstants.New); + query.setParameter("startId", startId); + query.setParameter("endId", endId); + + + query.deletePersistentAll(); + + // tx.commit(); + session.flush(); + session.close(); + return true; + } } -} -class InodesPhase3Callable implements Callable { + private static class InodesPhase2Callable implements Callable { - private static final Log LOG = LogFactory.getLog(InodesPhase3Callable.class); - int startId, endId; - final Lock lock; + int startId, endId; - InodesPhase3Callable(int startId, int endId, Lock lock) { - System.out.println("The startId=" + startId + ".The endId=" + endId); - this.startId = startId; - this.endId = endId; - this.lock = lock; - } + InodesPhase2Callable(int startId, int endId) { + this.startId = startId; + this.endId = endId; + } - @Override - public Boolean call() throws StorageException, IOException { - try { - System.out.println("call method is called"); - HopsSession session = ClusterjConnector.getInstance().obtainSession(); -// Transaction tx = session.currentTransaction(); -// tx.begin(); - //System.err.println(Thread.currentThread().getId() + ": Started. StartId=" + startId + ", endId=" + endId); + @Override + public Boolean call() throws StorageException { + HopsSession session = ClusterjConnector.getInstance().obtainSession(); HopsQueryBuilder qb = session.getQueryBuilder(); HopsQueryDomainType updateInodes = qb.createQueryDefinition(InodeDTO.class); - HopsPredicate pred3 = updateInodes.get("id").lessEqual(updateInodes.param("startId")); - HopsPredicate pred4 = updateInodes.get("id").greaterEqual(updateInodes.param("endId")); - - updateInodes.where(pred3.and(pred4)); + HopsPredicate pred4 = updateInodes.get(INodeTableDef.ISDELETED).equal(updateInodes.param("isDeletedParam")); + HopsPredicate pred5 = updateInodes.get(INodeTableDef.ID).lessEqual(updateInodes.param("startId")); + HopsPredicate pred6 = updateInodes.get(INodeTableDef.ID).greaterEqual(updateInodes.param("endId")); + updateInodes.where(pred6.and(pred5).and(pred4)); HopsQuery UpdateQuery = session.createQuery(updateInodes); + + UpdateQuery.setParameter("isDeletedParam", SnapShotConstants.isDeleted); UpdateQuery.setParameter("startId", startId); UpdateQuery.setParameter("endId", endId); - List results = UpdateQuery.getResultList(); - //System.out.println(Thread.currentThread().getId() + ": Result Size= " + results.size()); List newResults = new ArrayList(results.size()); - System.out.println("Query is executed"); - InodeDTO newInode; + List tobeDeleted = new ArrayList(results.size()); + int id; + InodeDTO newInode; for (InodeDTO row : results) { - newInode = session.newInstance(InodeDTO.class); - - newInode.setId(-row.getId()); - newInode.setName(row.getName()); - newInode.setParentId(-row.getParentId()); - newInode.setQuotaEnabled(row.getQuotaEnabled()); - newInode.setModificationTime(row.getModificationTime()); - newInode.setATime(row.getATime()); - newInode.setPermission(row.getPermission()); - newInode.setUnderConstruction(row.getUnderConstruction()); - newInode.setClientName(row.getClientName()); - newInode.setClientMachine(row.getClientMachine()); - newInode.setClientNode(row.getClientNode()); - newInode.setGenerationStamp(row.getGenerationStamp()); - newInode.setHeader(row.getHeader()); - newInode.setSymlink(row.getSymlink()); - newInode.setSubtreeLocked(row.getSubtreeLocked()); - newInode.setSubtreeLockOwner(row.getSubtreeLockOwner()); - newInode.setMetaEnabled(row.getMetaEnabled()); - newInode.setSize(row.getSize()); - newInode.setIsDeleted(row.getIsDeleted()); - newInode.setStatus(row.getStatus()); - - newResults.add(newInode); + if ((id = row.getParentId()) < 0) { + newInode = session.newInstance(InodeDTO.class); + newInode.setId(-row.getId()); + //newInode.setName(row.getName().split("\\$DEL\\$")[0]); + newInode.setName(row.getName().split("\\$DEL:[0-9]+\\$", 2)[0]); + newInode.setParentId(-id); + newInode.setQuotaEnabled(row.getQuotaEnabled()); + newInode.setModificationTime(row.getModificationTime()); + newInode.setATime(row.getATime()); + newInode.setPermission(row.getPermission()); + newInode.setUnderConstruction(row.getUnderConstruction()); + newInode.setClientName(row.getClientName()); + newInode.setClientMachine(row.getClientMachine()); + newInode.setClientNode(row.getClientNode()); + newInode.setGenerationStamp(row.getGenerationStamp()); + newInode.setHeader(row.getHeader()); + newInode.setSymlink(row.getSymlink()); + newInode.setSubtreeLocked(row.getSubtreeLocked()); + newInode.setSubtreeLockOwner(row.getSubtreeLockOwner()); + newInode.setMetaEnabled(row.getMetaEnabled()); + newInode.setSize(row.getSize()); + newInode.setIsDeleted(SnapShotConstants.isNotDeleted); + newInode.setStatus(row.getStatus()); + + newResults.add(newInode); + tobeDeleted.add(row); + } } - System.gc(); - synchronized (lock) { - // System.out.println(Thread.currentThread().getId() + ": Acquired Lock. rows to write "+newResults.size()); + + if (!newResults.isEmpty()) session.makePersistentAll(newResults); - } + if (!tobeDeleted.isEmpty()) + session.deletePersistentAll(tobeDeleted); - HopsQueryBuilder qbd = session.getQueryBuilder(); - HopsQueryDomainType deleteInodesWithNegativeId = qbd.createQueryDefinition(InodeDTO.class); - HopsPredicate pred3d = deleteInodesWithNegativeId.get("id").lessEqual(deleteInodesWithNegativeId.param("startId")); - HopsPredicate pred4d = deleteInodesWithNegativeId.get("id").greaterEqual(deleteInodesWithNegativeId.param("endId")); - - deleteInodesWithNegativeId.where(pred3d.and(pred4d)); - HopsQuery queryToDeleteNegative = session.createQuery(deleteInodesWithNegativeId); - queryToDeleteNegative.setParameter("startId", startId); - queryToDeleteNegative.setParameter("endId", endId); - System.out.println("Deleting all the negative rows"); - synchronized (lock) { - queryToDeleteNegative.deletePersistentAll(); - } - System.out.println("Deleted all the negative rows"); + //System.out.println("SetIsDeleted from id=" + factor * BUFFER_SIZE + " ,to id=" + (factor + 1) * BUFFER_SIZE + ". Time= " + (end - start) / 1000 + "Seconds"); + + // tx.commit(); session.flush(); session.close(); + return true; + } + } + + private static class InodesPhase3Callable implements Callable { + + private final Log LOG = LogFactory.getLog(InodesPhase3Callable.class); + int startId, endId; + final Lock lock; - } catch (Throwable e) { - throw new IOException(e); + InodesPhase3Callable(int startId, int endId, Lock lock) { + System.out.println("The startId=" + startId + ".The endId=" + endId); + this.startId = startId; + this.endId = endId; + this.lock = lock; } - return true; + @Override + public Boolean call() throws StorageException, IOException { + try { + System.out.println("call method is called"); + HopsSession session = ClusterjConnector.getInstance().obtainSession(); +// Transaction tx = session.currentTransaction(); +// tx.begin(); + //System.err.println(Thread.currentThread().getId() + ": Started. StartId=" + startId + ", endId=" + endId); + + HopsQueryBuilder qb = session.getQueryBuilder(); + HopsQueryDomainType updateInodes = qb.createQueryDefinition(InodeDTO.class); + HopsPredicate pred3 = updateInodes.get(INodeTableDef.ID).lessEqual(updateInodes.param("startId")); + HopsPredicate pred4 = updateInodes.get(INodeTableDef.ID).greaterEqual(updateInodes.param("endId")); + + updateInodes.where(pred3.and(pred4)); + + + HopsQuery UpdateQuery = session.createQuery(updateInodes); + UpdateQuery.setParameter("startId", startId); + UpdateQuery.setParameter("endId", endId); + + + List results = UpdateQuery.getResultList(); + //System.out.println(Thread.currentThread().getId() + ": Result Size= " + results.size()); + List newResults = new ArrayList(results.size()); + System.out.println("Query is executed"); + InodeDTO newInode; + + for (InodeDTO row : results) { + newInode = session.newInstance(InodeDTO.class); + + newInode.setId(-row.getId()); + newInode.setName(row.getName()); + newInode.setParentId(-row.getParentId()); + newInode.setQuotaEnabled(row.getQuotaEnabled()); + newInode.setModificationTime(row.getModificationTime()); + newInode.setATime(row.getATime()); + newInode.setPermission(row.getPermission()); + newInode.setUnderConstruction(row.getUnderConstruction()); + newInode.setClientName(row.getClientName()); + newInode.setClientMachine(row.getClientMachine()); + newInode.setClientNode(row.getClientNode()); + newInode.setGenerationStamp(row.getGenerationStamp()); + newInode.setHeader(row.getHeader()); + newInode.setSymlink(row.getSymlink()); + newInode.setSubtreeLocked(row.getSubtreeLocked()); + newInode.setSubtreeLockOwner(row.getSubtreeLockOwner()); + newInode.setMetaEnabled(row.getMetaEnabled()); + newInode.setSize(row.getSize()); + newInode.setIsDeleted(row.getIsDeleted()); + newInode.setStatus(row.getStatus()); + + newResults.add(newInode); + } + System.gc(); + synchronized (lock) { + // System.out.println(Thread.currentThread().getId() + ": Acquired Lock. rows to write "+newResults.size()); + session.makePersistentAll(newResults); + } + + HopsQueryBuilder qbd = session.getQueryBuilder(); + HopsQueryDomainType deleteInodesWithNegativeId = qbd.createQueryDefinition(InodeDTO.class); + HopsPredicate pred3d = deleteInodesWithNegativeId.get(INodeTableDef.ID).lessEqual(deleteInodesWithNegativeId.param("startId")); + HopsPredicate pred4d = deleteInodesWithNegativeId.get(INodeTableDef.ID).greaterEqual(deleteInodesWithNegativeId.param("endId")); + + deleteInodesWithNegativeId.where(pred3d.and(pred4d)); + HopsQuery queryToDeleteNegative = session.createQuery(deleteInodesWithNegativeId); + queryToDeleteNegative.setParameter("startId", startId); + queryToDeleteNegative.setParameter("endId", endId); + System.out.println("Deleting all the negative rows"); + synchronized (lock) { + queryToDeleteNegative.deletePersistentAll(); + } + System.out.println("Deleted all the negative rows"); + session.flush(); + session.close(); + + } catch (Throwable e) { + throw new IOException(e); + } + return true; + + } } -} -class Lock { -} + private static class Lock { + } +} \ No newline at end of file