Skip to content

Commit 431e446

Browse files
smkniaziberthoug
authored andcommitted
[HOPSWORKS-CLOUD-1540] Fixing cleaning up subtree operations
1 parent d6b9753 commit 431e446

File tree

1 file changed

+86
-0
lines changed

1 file changed

+86
-0
lines changed

src/main/java/io/hops/metadata/ndb/dalimpl/hdfs/OnGoingSubTreeOpsClusterj.java

+86
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.hops.exception.StorageException;
2626
import io.hops.metadata.hdfs.TablesDef;
2727
import io.hops.metadata.hdfs.dal.OngoingSubTreeOpsDataAccess;
28+
import io.hops.metadata.hdfs.entity.Storage;
2829
import io.hops.metadata.hdfs.entity.SubTreeOperation;
2930
import io.hops.metadata.ndb.ClusterjConnector;
3031
import io.hops.metadata.ndb.wrapper.HopsPredicate;
@@ -35,6 +36,7 @@
3536
import io.hops.metadata.ndb.wrapper.HopsSession;
3637

3738
import java.util.ArrayList;
39+
import java.util.Arrays;
3840
import java.util.Collection;
3941
import java.util.List;
4042

@@ -149,6 +151,90 @@ public Collection<SubTreeOperation> allOpsToRecoverAsync()
149151
return convertAndRelease(dbSession, query.getResultList());
150152
}
151153

154+
@Override
155+
public Collection<SubTreeOperation> allDeadOperations(long[] aliveNNIDs, long time) throws StorageException {
156+
if(aliveNNIDs == null || aliveNNIDs.length == 0){
157+
throw new IllegalArgumentException("No alive namenode specified");
158+
}
159+
160+
HopsSession dbSession = connector.obtainSession();
161+
HopsQueryBuilder qb = dbSession.getQueryBuilder();
162+
HopsQueryDomainType dobj = qb.createQueryDefinition(OnGoingSubTreeOpsDTO.class);
163+
164+
HopsPredicate pred[] = new HopsPredicate[aliveNNIDs.length];
165+
for(int i = 0; i < aliveNNIDs.length; i++ ){
166+
pred[i] = dobj.not(dobj.get("namenodeId").equal(dobj.param("namenodeIdParam"+i)));
167+
}
168+
169+
HopsPredicate allCombined = pred[0];
170+
for(int i = 1; i < aliveNNIDs.length; i++ ){
171+
allCombined = allCombined.and(pred[i]);
172+
}
173+
allCombined.and(dobj.get("startTime").lessThan(dobj.param("startTimeParam")));
174+
175+
dobj.where(allCombined);
176+
HopsQuery query = dbSession.createQuery(dobj);
177+
178+
for(int i = 0; i < aliveNNIDs.length; i++ ){
179+
query.setParameter("namenodeIdParam"+i, aliveNNIDs[i]);
180+
}
181+
query.setParameter("startTimeParam", time);
182+
183+
return convertAndRelease(dbSession, query.getResultList());
184+
}
185+
186+
@Override
187+
public Collection<SubTreeOperation> allSlowActiveOperations(long[] aliveNNIDs, long time)
188+
throws StorageException {
189+
if(aliveNNIDs == null || aliveNNIDs.length == 0){
190+
throw new IllegalArgumentException("No alive namenode specified");
191+
}
192+
193+
HopsSession dbSession = connector.obtainSession();
194+
HopsQueryBuilder qb = dbSession.getQueryBuilder();
195+
HopsQueryDomainType dobj = qb.createQueryDefinition(OnGoingSubTreeOpsDTO.class);
196+
197+
HopsPredicate pred[] = new HopsPredicate[aliveNNIDs.length];
198+
for(int i = 0; i < aliveNNIDs.length; i++ ){
199+
pred[i] = dobj.get("namenodeId").equal(dobj.param("namenodeIdParam"+i));
200+
}
201+
202+
HopsPredicate allCombined = pred[0];
203+
for(int i = 1; i < aliveNNIDs.length; i++ ){
204+
allCombined = allCombined.or(pred[i]);
205+
}
206+
allCombined = dobj.get("startTime").lessThan(dobj.param("startTimeParam")).and(allCombined);
207+
208+
dobj.where(allCombined);
209+
HopsQuery query = dbSession.createQuery(dobj);
210+
211+
for(int i = 0; i < aliveNNIDs.length; i++ ){
212+
query.setParameter("namenodeIdParam"+i, aliveNNIDs[i]);
213+
}
214+
query.setParameter("startTimeParam", time);
215+
216+
return convertAndRelease(dbSession, query.getResultList());
217+
}
218+
219+
@Override
220+
public long getLockTime(long inodeID) throws StorageException {
221+
HopsSession dbSession = connector.obtainSession();
222+
HopsQueryBuilder qb = dbSession.getQueryBuilder();
223+
HopsQueryDomainType dobj = qb.createQueryDefinition(OnGoingSubTreeOpsDTO.class);
224+
HopsPredicate pred = dobj.get("inodeId").equal(dobj.param("inodeIdParam"));
225+
dobj.where(pred);
226+
HopsQuery query = dbSession.createQuery(dobj);
227+
query.setParameter("inodeIdParam", inodeID);
228+
List<SubTreeOperation> list = convertAndRelease(dbSession, query.getResultList());
229+
if (list.size() > 0) {
230+
if (list.size() > 1) {
231+
throw new StorageException("Multiple subtree locks found for same INode: " + inodeID);
232+
}
233+
return list.get(0).getStartTime();
234+
}
235+
return 0;
236+
}
237+
152238
@Override
153239
public Collection<SubTreeOperation> allOpsByNN(long nnID)
154240
throws StorageException {

0 commit comments

Comments
 (0)