Skip to content


#719 Improve the error message about file size not divisible by recor…
Browse files Browse the repository at this point in the history
…d size.
  • Loading branch information
yruslan committed Oct 18, 2024
1 parent 070938d commit c90c326
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,19 @@ private[source] object CobolScanners extends Logging {
val recordSize = reader.getRecordSize

sourceDirs.foreach(sourceDir => {
if (!debugIgnoreFileSize && areThereNonDivisibleFiles(sourceDir, conf, recordSize)) {
throw new IllegalArgumentException(s"There are some files in $sourceDir that are NOT DIVISIBLE by the RECORD SIZE calculated from the copybook ($recordSize bytes per record). Check the logs for the names of the files.")
if (!debugIgnoreFileSize) {
val nonDivisibleFiles = getNonDivisibleFiles(sourceDir, conf, recordSize)

if (nonDivisibleFiles.nonEmpty) {
nonDivisibleFiles.head match {
case (name, size) =>
if (nonDivisibleFiles.length > 1) {
throw new IllegalArgumentException(s"Multiple file sizes are NOT DIVISIBLE by the RECORD SIZE calculated from the copybook ($recordSize bytes per record). Example file: $name size ($size bytes).")
} else {
throw new IllegalArgumentException(s"File $name size ($size bytes) is NOT DIVISIBLE by the RECORD SIZE calculated from the copybook ($recordSize bytes per record).")

Expand Down Expand Up @@ -164,15 +175,14 @@ private[source] object CobolScanners extends Logging {
recordParser(reader, records)

private def areThereNonDivisibleFiles(sourceDir: String, hadoopConfiguration: Configuration, divisor: Int): Boolean = {

private def getNonDivisibleFiles(sourceDir: String, hadoopConfiguration: Configuration, divisor: Int): Seq[(String, Long)] = {
val fileSystem = new Path(sourceDir).getFileSystem(hadoopConfiguration)

if (FileUtils.getNumberOfFilesInDir(sourceDir, fileSystem) < FileUtils.THRESHOLD_DIR_LENGTH_FOR_SINGLE_FILE_CHECK) {
FileUtils.findAndLogAllNonDivisibleFiles(sourceDir, divisor, fileSystem) > 0
FileUtils.findAndLogAllNonDivisibleFiles(sourceDir, divisor, fileSystem)
else {
FileUtils.findAndLogFirstNonDivisibleFile(sourceDir, divisor, fileSystem)
FileUtils.findAndLogFirstNonDivisibleFile(sourceDir, divisor, fileSystem).toSeq
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ object FileUtils extends Logging {
* Finds the first file that is non-divisible by a given divisor and logs its name.
def findAndLogFirstNonDivisibleFile(sourceDir: String, divisor: Long, fileSystem: FileSystem): Boolean = {
def findAndLogFirstNonDivisibleFile(sourceDir: String, divisor: Long, fileSystem: FileSystem): Option[(String, Long)] = {

val allFiles = expandDirectories(fileSystem, fileSystem.globStatus(new Path(sourceDir), hiddenFileFilter))

Expand All @@ -197,13 +197,13 @@ object FileUtils extends Logging {
logger.error(s"File ${firstNonDivisibleFile.get.getPath} size (${firstNonDivisibleFile.get.getLen}) IS NOT divisible by $divisor.")

firstNonDivisibleFile.isDefined => (status.getPath.toString, status.getLen))

* Finds all the files the are not divisible by a given divisor and logs their names.
def findAndLogAllNonDivisibleFiles(sourceDir: String, divisor: Long, fileSystem: FileSystem): Long = {
def findAndLogAllNonDivisibleFiles(sourceDir: String, divisor: Long, fileSystem: FileSystem): Seq[(String, Long)] = {

val allFiles = expandDirectories(fileSystem, fileSystem.globStatus(new Path(sourceDir), hiddenFileFilter))

Expand All @@ -213,7 +213,7 @@ object FileUtils extends Logging {
allNonDivisibleFiles.foreach(file => logger.error(s"File ${file.getPath} size (${file.getLen}) IS NOT divisible by $divisor."))

allNonDivisibleFiles.length => (status.getPath.toString, status.getLen))

private def isNonDivisible(fileStatus: FileStatus, divisor: Long) = fileStatus.getLen % divisor != 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class Test27RecordLengthSpec extends AnyWordSpec with SparkTestBase with BinaryF
val ex = intercept[IllegalArgumentException] {
getDataFrame(tmpFileName, Map("record_length" -> "7")).collect()
assert(ex.getMessage.contains("are NOT DIVISIBLE by the RECORD SIZE"))
assert(ex.getMessage.contains("NOT DIVISIBLE by the RECORD SIZE"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,43 +30,70 @@ class Test07IgnoreHiddenFiles extends AnyFunSuite with BinaryFileFixture with Sp

test("Test findAndLogFirstNonDivisibleFile() finds a file") {
withTempDirectory("testHidden1") { tmpDir =>
createFileSize1(Files.createFile(Paths.get(tmpDir, "a")))
assert(FileUtils.findAndLogFirstNonDivisibleFile(tmpDir, 2, fileSystem))
assert(FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem) == 1)
createFileSize1(Files.createFile(Paths.get(tmpDir, "a-file.dat")))

val nonDivisibleFileOpt = FileUtils.findAndLogFirstNonDivisibleFile(tmpDir, 2, fileSystem)
val nonDivisibleFiles = FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem)

assert(nonDivisibleFileOpt.get._2 == 1)

assert(nonDivisibleFiles.length == 1)
assert(nonDivisibleFiles.head._2 == 1)

test("Test findAndLogFirstNonDivisibleFile() ignores a hidden file") {
withTempDirectory("testHidden1") { tmpDir =>
createFileSize1(Files.createFile(Paths.get(tmpDir, ".a")))
assert(!FileUtils.findAndLogFirstNonDivisibleFile(tmpDir, 2, fileSystem))
assert(FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem) == 0)
val nonDivisibleFileOpt = FileUtils.findAndLogFirstNonDivisibleFile(tmpDir, 2, fileSystem)
val nonDivisibleFiles = FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem)


test("Test findAndLogFirstNonDivisibleFile() ignores a hidden file in a nested dir") {
withTempDirectory("testHidden3") { tmpDir =>
Files.createDirectory(Paths.get(tmpDir, "dir1"))
createFileSize1(Files.createFile(Paths.get(tmpDir, "dir1", ".b2")))
assert(!FileUtils.findAndLogFirstNonDivisibleFile(tmpDir, 2, fileSystem))
assert(FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem) == 0)
val nonDivisibleFileOpt = FileUtils.findAndLogFirstNonDivisibleFile(tmpDir, 2, fileSystem)
val nonDivisibleFiles = FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem)


test("Test findAndLogFirstNonDivisibleFile() ignores a hidden dir") {
withTempDirectory("testHidden4") { tmpDir =>
Files.createDirectory(Paths.get(tmpDir, ".dir2"))
createFileSize1(Files.createFile(Paths.get(tmpDir, ".dir2", "c1")))
assert(!FileUtils.findAndLogFirstNonDivisibleFile(tmpDir, 2, fileSystem))
assert(FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem) == 0)
val nonDivisibleFileOpt = FileUtils.findAndLogFirstNonDivisibleFile(tmpDir, 2, fileSystem)
val nonDivisibleFiles = FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem)


test("Test findAndLogFirstNonDivisibleFile() works with globbing") {
withTempDirectory("testHidden1") { tmpDir =>
createFileSize1(Files.createFile(Paths.get(tmpDir, "a")))
assert(FileUtils.findAndLogFirstNonDivisibleFile(s"$tmpDir/*", 2, fileSystem))
assert(FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem) == 1)
createFileSize1(Files.createFile(Paths.get(tmpDir, "a.dat")))

val nonDivisibleFileOpt = FileUtils.findAndLogFirstNonDivisibleFile(tmpDir, 2, fileSystem)
val nonDivisibleFiles = FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem)

assert(nonDivisibleFileOpt.get._2 == 1)

assert(nonDivisibleFiles.length == 1)
assert(nonDivisibleFiles.head._2 == 1)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ class FileUtilsSpec extends AnyFlatSpec with BeforeAndAfterAll with BeforeAndAft

it should "return the number of files inside a directory" in {

val length = 10
val numFiles = 5

Expand All @@ -137,52 +136,54 @@ class FileUtilsSpec extends AnyFlatSpec with BeforeAndAfterAll with BeforeAndAft

it should "return 0 if there are no files inside a directory" in {

assertResult(0)(FileUtils.getNumberOfFilesInDir(controlledLengthFilesDir.getAbsolutePath, fileSystem))

it should "return 1 if there source is actually a file" in {

val aFile = getRandomFileToBeWritten
produceFileOfLength(aFile, 10)

assertResult(1)(FileUtils.getNumberOfFilesInDir(aFile.getAbsolutePath, fileSystem))

it should "return the file itself if non-divisible and if asked for first file" in {

val aFile = getRandomFileToBeWritten

val divisor = 10
produceFileOfLength(aFile, divisor + 1)

assertResult(true)(FileUtils.findAndLogFirstNonDivisibleFile(aFile.getAbsolutePath, divisor, fileSystem))
val notDivisibleFileOpt = FileUtils.findAndLogFirstNonDivisibleFile(aFile.getAbsolutePath, divisor, fileSystem)

assert(notDivisibleFileOpt.get._2 == 11)

it should "return the file itself if non-divisible and if asked for multiple files" in {

val aFile = getRandomFileToBeWritten

val divisor = 10
produceFileOfLength(aFile, divisor + 1)

assertResult(1)(FileUtils.findAndLogAllNonDivisibleFiles(aFile.getAbsolutePath, divisor, fileSystem))
val notDivisibleFiles = FileUtils.findAndLogAllNonDivisibleFiles(aFile.getAbsolutePath, divisor, fileSystem)

assert(notDivisibleFiles.length == 1)
assert(notDivisibleFiles.head._2 == 11)

it should "return true if found first non-divisible file" in {

val divisor = 10

produceFileOfLength(getRandomFileToBeWritten, divisor)
produceFileOfLength(getRandomFileToBeWritten, divisor * 2)
produceFileOfLength(getRandomFileToBeWritten, divisor * 3)
produceFileOfLength(getRandomFileToBeWritten, divisor + 1) // non-divisible

assertResult(true)(FileUtils.findAndLogFirstNonDivisibleFile(controlledLengthFilesDir.getAbsolutePath, divisor, fileSystem))
val notDivisibleFileOpt = FileUtils.findAndLogFirstNonDivisibleFile(controlledLengthFilesDir.getAbsolutePath, divisor, fileSystem)


it should "return number of non-divisible files" in {

val divisor = 10

produceFileOfLength(getRandomFileToBeWritten, divisor)
Expand All @@ -191,24 +192,25 @@ class FileUtilsSpec extends AnyFlatSpec with BeforeAndAfterAll with BeforeAndAft
produceFileOfLength(getRandomFileToBeWritten, divisor * 4 + 1) // non-divisible
produceFileOfLength(getRandomFileToBeWritten, divisor * 5 + 1) // non-divisible

assertResult(2)(FileUtils.findAndLogAllNonDivisibleFiles(controlledLengthFilesDir.getAbsolutePath, divisor, fileSystem))
val notDivisibleFiles = FileUtils.findAndLogAllNonDivisibleFiles(controlledLengthFilesDir.getAbsolutePath, divisor, fileSystem)

assert(notDivisibleFiles.length == 2)

it should "return false if no files are non-divisible by expected divisor" in {

val divisor = 10

produceFileOfLength(getRandomFileToBeWritten, divisor)
produceFileOfLength(getRandomFileToBeWritten, divisor * 2)
produceFileOfLength(getRandomFileToBeWritten, divisor * 3)
produceFileOfLength(getRandomFileToBeWritten, divisor * 4)

assertResult(false)(FileUtils.findAndLogFirstNonDivisibleFile(controlledLengthFilesDir.getAbsolutePath, divisor, fileSystem))
val notDivisibleFileOpt = FileUtils.findAndLogFirstNonDivisibleFile(controlledLengthFilesDir.getAbsolutePath, divisor, fileSystem)


it should "return 0 if no files are non-divisible by expected divisor" in {

val divisor = 10

produceFileOfLength(getRandomFileToBeWritten, divisor)
Expand All @@ -217,7 +219,9 @@ class FileUtilsSpec extends AnyFlatSpec with BeforeAndAfterAll with BeforeAndAft
produceFileOfLength(getRandomFileToBeWritten, divisor * 4) // non-divisible
produceFileOfLength(getRandomFileToBeWritten, divisor * 5) // non-divisible

assertResult(0)(FileUtils.findAndLogAllNonDivisibleFiles(controlledLengthFilesDir.getAbsolutePath, divisor, fileSystem))
val notDivisibleFiles = FileUtils.findAndLogAllNonDivisibleFiles(controlledLengthFilesDir.getAbsolutePath, divisor, fileSystem)


private def getRandomFileToBeWritten: File = new File(controlledLengthFilesDir, UUID.randomUUID().toString)
Expand Down

0 comments on commit c90c326

Please sign in to comment.