diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java index 5c845573ad6..0be1ea4faf7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java @@ -147,6 +147,11 @@ public HashPartition(FragmentContext context, BufferAllocator allocator, Chained try { this.hashTable = baseHashTable.createAndSetupHashTable(null); + this.hjHelper = semiJoin ? null : new HashJoinHelper(context, allocator); + tmpBatchesList = new ArrayList<>(); + if (numPartitions > 1) { + allocateNewCurrentBatchAndHV(); + } } catch (ClassTransformationException e) { throw UserException.unsupportedError(e) .message("Code generation error - likely an error in the code.") @@ -157,11 +162,11 @@ public HashPartition(FragmentContext context, BufferAllocator allocator, Chained .build(logger); } catch (SchemaChangeException sce) { throw new IllegalStateException("Unexpected Schema Change while creating a hash table",sce); - } - this.hjHelper = semiJoin ? null : new HashJoinHelper(context, allocator); - tmpBatchesList = new ArrayList<>(); - if (numPartitions > 1) { - allocateNewCurrentBatchAndHV(); + } catch (OutOfMemoryException oom) { + close(); + throw UserException.memoryError(oom) + .message("Failed to allocate hash partition.") + .build(logger); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/AbstractHashBinaryRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/AbstractHashBinaryRecordBatch.java index 41217ca208d..42dbc1cbfc5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/AbstractHashBinaryRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/AbstractHashBinaryRecordBatch.java @@ -1312,7 +1312,9 @@ private void cleanup() { } // clean (and deallocate) each partition, and delete its spill file for (HashPartition partn : partitions) { - partn.close(); + if (partn != null) { + partn.close(); + } } // delete any spill file left in unread spilled partitions