Skip to content

Commit da2d9ae

Browse files
authored
HOPSFS-162: Fixed Retry Cache Cleaner (hopshadoop#64)
* HOPSFS-162: Fixed Retry Cache Cleaner * HOPSFS-162: create an index on hdfs_retry_cache_entry.epoch
1 parent 90ddbd8 commit da2d9ae

File tree

2 files changed

+11
-1
lines changed

2 files changed

+11
-1
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
-- create an index on hdfs_retry_cache_entry.epoch
2+
-- check if the create index fix is not already applied
3+
4+
SELECT COUNT(*) INTO @index_exists FROM information_schema.statistics WHERE table_schema = 'hops' AND table_name = 'hdfs_retry_cache_entry' AND index_name = 'epoch_idx';
5+
6+
SET @create_index_stmt = IF(@index_exists = 0, 'ALTER TABLE hdfs_retry_cache_entry ADD INDEX epoch_idx (epoch);', 'SELECT "Index already exists";');
7+
PREPARE stmt FROM @create_index_stmt;
8+
EXECUTE stmt;
9+
DEALLOCATE PREPARE stmt;

src/main/java/io/hops/metadata/ndb/dalimpl/hdfs/RetryCacheEntryClusterj.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -116,13 +116,14 @@ public void prepare(Collection<RetryCacheEntry> removed,
116116
session.release(changes);
117117
}
118118

119-
public int removeOlds(long epoch) throws StorageException{
119+
public int removeOlds(long epoch, int batchSize) throws StorageException{
120120
HopsSession session = connector.obtainSession();
121121
HopsQueryBuilder qb = session.getQueryBuilder();
122122
HopsQueryDomainType<RetryCacheEntryDTO> qdt = qb.createQueryDefinition(RetryCacheEntryDTO.class);
123123
qdt.where(qdt.get("epoch").equal(qdt.param("param")));
124124
HopsQuery<RetryCacheEntryDTO> query = session.createQuery(qdt);
125125
query.setParameter("param", epoch);
126+
query.setLimits(0, batchSize);
126127
return query.deletePersistentAll();
127128
}
128129

0 commit comments

Comments
 (0)