-
Notifications
You must be signed in to change notification settings - Fork 60
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
e66f202
commit 9501704
Showing
92 changed files
with
12,742 additions
and
0 deletions.
There are no files selected for viewing
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,165 @@ | ||
import os | ||
import time | ||
import pytest | ||
import threading | ||
import tempfile | ||
from unittest.mock import patch | ||
from twinTrim.dataStructures.allFileMetadata import AllFileMetadata | ||
from twinTrim.dataStructures.allFileMetadata import add_or_update_file, store, store_lock | ||
from twinTrim.utils import get_file_hash | ||
|
||
# Fixtures | ||
@pytest.fixture | ||
def temp_file(tmp_path): | ||
"""Fixture to create a temporary file for testing.""" | ||
temp_file_path = tmp_path / "test_file.txt" | ||
with open(temp_file_path, "w") as f: | ||
f.write("Sample content") | ||
return str(temp_file_path) | ||
|
||
@pytest.fixture(autouse=True) | ||
def reset_store(): | ||
"""Fixture to reset the store before each test.""" | ||
store.clear() | ||
|
||
# Mocking the get_file_hash function | ||
def mock_get_file_hash(file_path): | ||
return f"hash_{file_path}" | ||
|
||
@pytest.fixture(autouse=True) | ||
def mock_file_hash_function(monkeypatch): | ||
"""Mock get_file_hash to provide predictable outputs for tests.""" | ||
monkeypatch.setattr("twinTrim.dataStructures.allFileMetadata.get_file_hash", mock_get_file_hash) | ||
|
||
# Test cases | ||
def test_add_new_file_metadata(temp_file): | ||
store.clear() | ||
add_or_update_file(temp_file) | ||
file_hash = mock_get_file_hash(temp_file) | ||
|
||
assert file_hash in store, "File hash should be in the store after adding" | ||
assert store[file_hash].filepath == temp_file, "Stored file path should match the added file" | ||
|
||
|
||
def test_add_or_update_file_concurrently(): | ||
file_paths = ["file_1", "file_2", "file_3"] | ||
|
||
def worker(file_path): | ||
add_or_update_file(file_path) | ||
|
||
threads = [threading.Thread(target=worker, args=(file_path,)) for file_path in file_paths] | ||
|
||
for thread in threads: | ||
thread.start() | ||
for thread in threads: | ||
thread.join() | ||
|
||
assert len(store) == 3 | ||
assert all(f"hash_{file_path}" in store for file_path in file_paths) | ||
for file_path in file_paths: | ||
assert store[f"hash_{file_path}"].filepath == file_path | ||
|
||
def test_add_or_update_file_with_duplicates_concurrently(): | ||
file_paths = ["file_1", "file_1", "file_1"] | ||
|
||
def worker(file_path): | ||
add_or_update_file(file_path) | ||
|
||
threads = [threading.Thread(target=worker, args=(file_path,)) for file_path in file_paths] | ||
|
||
for thread in threads: | ||
thread.start() | ||
for thread in threads: | ||
thread.join() | ||
|
||
assert len(store) == 1 | ||
assert "hash_file_1" in store | ||
assert store["hash_file_1"].filepath == "file_1" | ||
|
||
def test_add_or_update_file_mixed_concurrently(): | ||
file_paths = ["file_1", "file_1", "file_2", "file_3", "file_3"] | ||
|
||
def worker(file_path): | ||
add_or_update_file(file_path) | ||
|
||
threads = [threading.Thread(target=worker, args=(file_path,)) for file_path in file_paths] | ||
|
||
for thread in threads: | ||
thread.start() | ||
for thread in threads: | ||
thread.join() | ||
|
||
assert len(store) == 3 | ||
assert "hash_file_1" in store | ||
assert "hash_file_2" in store | ||
assert "hash_file_3" in store | ||
assert store["hash_file_1"].filepath == "file_1" | ||
assert store["hash_file_2"].filepath == "file_2" | ||
assert store["hash_file_3"].filepath == "file_3" | ||
|
||
def test_add_or_update_file_when_file_doesnt_exist(mocker): | ||
mockfile = "mockfile.txt" | ||
mocker.patch("twinTrim.dataStructures.allFileMetadata.os.path.exists", return_value=False) | ||
mocker.patch("twinTrim.dataStructures.allFileMetadata.get_file_hash", side_effect=FileNotFoundError) | ||
|
||
with pytest.raises(FileNotFoundError): | ||
add_or_update_file(mockfile) | ||
|
||
@pytest.fixture | ||
def setup_files(): | ||
temp_file1 = tempfile.NamedTemporaryFile(delete=False) | ||
temp_file2 = tempfile.NamedTemporaryFile(delete=False) | ||
|
||
temp_file1.write(b'Initial content for file 1.') | ||
temp_file1.close() | ||
|
||
temp_file2.write(b'Initial content for file 2.') | ||
temp_file2.close() | ||
|
||
yield temp_file1.name, temp_file2.name | ||
|
||
if os.path.exists(temp_file1.name): | ||
os.remove(temp_file1.name) | ||
if os.path.exists(temp_file2.name): | ||
os.remove(temp_file2.name) | ||
|
||
def test_compare_and_replace_removes_correct_file(setup_files): | ||
file1, file2 = setup_files | ||
|
||
with patch('os.path.getmtime') as mock_getmtime, \ | ||
patch('twinTrim.dataStructures.allFileMetadata.handle_and_remove') as mock_remove: | ||
|
||
mock_getmtime.side_effect = lambda path: 1000 if path == file1 else 2000 | ||
|
||
metadata1 = AllFileMetadata(file1) | ||
metadata2 = AllFileMetadata(file2) | ||
|
||
metadata1.compare_and_replace(metadata2) | ||
|
||
mock_remove.assert_called_once_with(file1) | ||
|
||
def test_compare_and_replace_no_file_removed_when_file_missing(setup_files): | ||
file1, _ = setup_files | ||
metadata1 = AllFileMetadata(file1) | ||
nonexistent_file = 'nonexistent_file.txt' | ||
metadata2 = AllFileMetadata(nonexistent_file) | ||
|
||
with patch('twinTrim.utils.handle_and_remove') as mock_remove: | ||
metadata1.compare_and_replace(metadata2) | ||
mock_remove.assert_not_called() | ||
|
||
def test_compare_and_replace_no_file_removed_when_both_files_missing(setup_files): | ||
file1, _ = setup_files | ||
metadata1 = AllFileMetadata(file1) | ||
nonexistent_file1 = 'nonexistent_file1.txt' | ||
nonexistent_file2 = 'nonexistent_file2.txt' | ||
metadata2 = AllFileMetadata(nonexistent_file1) | ||
metadata3 = AllFileMetadata(nonexistent_file2) | ||
|
||
with patch('twinTrim.utils.handle_and_remove') as mock_remove: | ||
metadata1.compare_and_replace(metadata2) | ||
mock_remove.assert_not_called() | ||
|
||
with patch('twinTrim.utils.handle_and_remove') as mock_remove: | ||
metadata1.compare_and_replace(metadata3) | ||
mock_remove.assert_not_called() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
import pytest | ||
from twinTrim.dataStructures.fileFilter import FileFilter | ||
|
||
|
||
# Test setting valid max file size values | ||
def test_set_max_file_size_valid(): | ||
"""Test setting valid max file size values.""" | ||
file_filter = FileFilter() | ||
|
||
# Test with a smaller size | ||
file_filter.setMaxFileSize("500mb") | ||
assert file_filter.maxFileSize == "500mb", "Failed to set max file size to 500mb" | ||
|
||
# Test with a larger size | ||
file_filter.setMaxFileSize("2gb") | ||
assert file_filter.maxFileSize == "2gb", "Failed to set max file size to 2gb" | ||
|
||
# Test with a minimum size (edge case) | ||
file_filter.setMaxFileSize("1kb") | ||
assert file_filter.maxFileSize == "1kb", "Failed to set max file size to 1kb" | ||
|
||
# Test setting the same max file size | ||
def test_set_max_file_size_same_value(): | ||
"""Test setting the same max file size value multiple times.""" | ||
file_filter = FileFilter() | ||
|
||
# Test setting the max file size to the default value | ||
file_filter.setMaxFileSize("1gb") | ||
assert file_filter.maxFileSize == "1gb", "Failed to set max file size to 1gb" | ||
|
||
# Test setting the max file size to the same value again | ||
file_filter.setMaxFileSize("1gb") | ||
assert file_filter.maxFileSize == "1gb", "Failed to set max file size to 1gb again" | ||
|
||
# Test boundary values for max file size | ||
def test_set_max_file_size_boundary(): | ||
"""Test boundary values for max file size.""" | ||
file_filter = FileFilter() | ||
|
||
# Test setting a value just under the default | ||
file_filter.setMaxFileSize("999mb") | ||
assert file_filter.maxFileSize == "999mb", "Failed to set max file size to 999mb" | ||
|
||
# Test setting a value just over the default | ||
file_filter.setMaxFileSize("1001mb") | ||
assert file_filter.maxFileSize == "1001mb", "Failed to set max file size to 1001mb" | ||
|
||
# Test setting an empty value to max file size | ||
def test_set_max_file_size_empty_value(): | ||
"""Test setting an empty value to max file size.""" | ||
file_filter = FileFilter() | ||
|
||
# Since there is no validation, an empty value would still set it to the empty string | ||
file_filter.setMaxFileSize("") | ||
assert file_filter.maxFileSize == "", "Failed to set max file size to empty value" | ||
|
||
# Test adding files to the exclude list | ||
def test_add_file_exclude_adds_file(): | ||
"""Test adding a file to the exclude list.""" | ||
file_filter = FileFilter() | ||
file_filter.addFileExclude("test_file.txt") | ||
assert "test_file.txt" in file_filter.fileExclude, "File was not added to the exclude list" | ||
|
||
|
||
# Test setting valid file types | ||
def test_set_file_type_valid(): | ||
"""Test setting valid file types.""" | ||
file_filter = FileFilter() | ||
|
||
# Test setting a common file type | ||
file_filter.setFileType("txt") | ||
assert file_filter.fileType == r"^.+\.txt$", "Failed to set file type regex for .txt files" | ||
|
||
# Test setting a file type with multiple extensions | ||
file_filter.setFileType("tar.gz") | ||
assert file_filter.fileType == r"^.+\.tar.gz$", "Failed to set file type regex for .tar.gz files" | ||
|
||
# Test setting a single character file type | ||
file_filter.setFileType("c") | ||
assert file_filter.fileType == r"^.+\.c$", "Failed to set file type regex for .c files" | ||
|
||
# Test setting an empty string as file type | ||
def test_set_file_type_empty_string(): | ||
"""Test setting an empty string as file type.""" | ||
file_filter = FileFilter() | ||
|
||
# The empty string should be accepted and form a regex matching any file without an extension | ||
file_filter.setFileType("") | ||
assert file_filter.fileType == r"^.+\.$", "Failed to set file type regex for empty file type" | ||
|
||
# Test setting file types with special characters | ||
def test_set_file_type_special_characters(): | ||
"""Test setting file types with special characters.""" | ||
file_filter = FileFilter() | ||
|
||
# Special characters should be part of the file extension in the regex | ||
file_filter.setFileType("mp4#") | ||
assert file_filter.fileType == r"^.+\.mp4#$", "Failed to set file type regex for special characters" | ||
|
||
# Test setting numeric file type extensions | ||
def test_set_file_type_numeric(): | ||
"""Test setting numeric file type extensions.""" | ||
file_filter = FileFilter() | ||
|
||
# File types with numbers should be accepted | ||
file_filter.setFileType("123") | ||
assert file_filter.fileType == r"^.+\.123$", "Failed to set file type regex for numeric file type" |
Oops, something went wrong.