Skip to content

Commit

Permalink
Apply black formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
GMDSantana committed Nov 17, 2024
1 parent d169491 commit ae9f7da
Show file tree
Hide file tree
Showing 12 changed files with 156 additions and 57 deletions.
4 changes: 2 additions & 2 deletions crivo/file_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"""


def process_file(file_path):
"""
Reads the content of a text file and returns it as a string.
Expand All @@ -22,11 +23,10 @@ def process_file(file_path):
str: The content of the file.
"""
try:
with open(file_path, 'r', encoding='utf-8') as file:
with open(file_path, "r", encoding="utf-8") as file:
content = file.read()
return content
except FileNotFoundError:
raise FileNotFoundError(f"Error: The file '{file_path}' was not found.")
except IOError as e:
raise IOError(f"Error reading the file '{file_path}': {e}")

29 changes: 20 additions & 9 deletions crivo/filter_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import re
import os


def load_valid_tlds(file_path):
"""
Loads a list of valid TLDs from a file and constructs a regex pattern.
Expand All @@ -25,18 +26,27 @@ def load_valid_tlds(file_path):
str: A regex pattern matching valid TLDs.
"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
with open(file_path, "r", encoding="utf-8") as f:
tlds = [line.strip() for line in f if line.strip()]
if not tlds:
raise ValueError("The TLD list is empty.")
# Join TLDs into a single regex group
return '|'.join(tlds)
return "|".join(tlds)
except FileNotFoundError:
raise FileNotFoundError(f"Error: The file '{file_path}' was not found.")
except IOError as e:
raise IOError(f"Error reading the file '{file_path}': {e}")

def filter_content(content, scope_filters=None, filter_ip=False, filter_ipv4=False, filter_ipv6=False, filter_domain=False, filter_url=False):

def filter_content(
content,
scope_filters=None,
filter_ip=False,
filter_ipv4=False,
filter_ipv6=False,
filter_domain=False,
filter_url=False,
):
"""
Filters the provided content based on specified parameters.
Expand All @@ -53,12 +63,12 @@ def filter_content(content, scope_filters=None, filter_ip=False, filter_ipv4=Fal
list: A list of unique filtered strings based on the criteria.
"""
# Load the TLD regex dynamically
tld_regex = load_valid_tlds('valid_tlds.txt')
domain_pattern = rf'\b(?:[a-zA-Z0-9-]+\.)+(?:{tld_regex})\b'
tld_regex = load_valid_tlds("valid_tlds.txt")
domain_pattern = rf"\b(?:[a-zA-Z0-9-]+\.)+(?:{tld_regex})\b"

# Other regex patterns
ipv4_pattern = r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b'
ipv6_pattern = r'\b(?:[a-fA-F0-9]{1,4}:){7}[a-fA-F0-9]{1,4}\b'
ipv4_pattern = r"\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b"
ipv6_pattern = r"\b(?:[a-fA-F0-9]{1,4}:){7}[a-fA-F0-9]{1,4}\b"
url_pattern = r'\bhttps?://[^\s<>"\'#]+\b'
results = set()

Expand All @@ -82,7 +92,8 @@ def filter_content(content, scope_filters=None, filter_ip=False, filter_ipv4=Fal

# Apply scope filtering if provided
if scope_filters:
results = {item for item in results if any(scope in item for scope in scope_filters)}
results = {
item for item in results if any(scope in item for scope in scope_filters)
}

return sorted(results)

85 changes: 70 additions & 15 deletions crivo/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from crivo.filter_engine import filter_content
from crivo.version import __version__


def parse_arguments():
"""
Sets up the command-line arguments for Crivo.
Expand All @@ -28,22 +29,50 @@ def parse_arguments():
parser = argparse.ArgumentParser(
description="Crivo - A tool for extracting and filtering URLs, IPs, domains, and subdomains from text or web pages, with built-in web scraping capabilities."
)

parser.add_argument("-f", "--file", help="Input file with text to be filtered", type=str)
parser.add_argument("-w", "--webpage", help="URL of a webpage to have its content filtered", type=str)
parser.add_argument("-W", "--webpage-list", help="File containing a list of webpage URLs to be filtered", type=str)

parser.add_argument(
"-f", "--file", help="Input file with text to be filtered", type=str
)
parser.add_argument(
"-w",
"--webpage",
help="URL of a webpage to have its content filtered",
type=str,
)
parser.add_argument(
"-W",
"--webpage-list",
help="File containing a list of webpage URLs to be filtered",
type=str,
)
parser.add_argument("-o", "--output", help="Write the output to a file", type=str)
parser.add_argument("-s", "--scope", help="Comma-separated sequence of scope filters (ips, urls, domains, subdomains)", type=str)
parser.add_argument(
"-s",
"--scope",
help="Comma-separated sequence of scope filters (ips, urls, domains, subdomains)",
type=str,
)
parser.add_argument("--ip", help="Filter only IPs", action="store_true")
parser.add_argument("--ipv4", help="Filter only IPv4", action="store_true")
parser.add_argument("--ipv6", help="Filter only IPv6", action="store_true")
parser.add_argument("--domain", help="Filter only domains and subdomains", action="store_true")
parser.add_argument(
"--domain", help="Filter only domains and subdomains", action="store_true"
)
parser.add_argument("--url", help="Filter only URLs", action="store_true")
parser.add_argument("-v", "--verbose", help="Enable verbose output", action="store_true")
parser.add_argument("-V", "--version", help="Show the programme version", action="version", version=f"Crivo {__version__}")
parser.add_argument(
"-v", "--verbose", help="Enable verbose output", action="store_true"
)
parser.add_argument(
"-V",
"--version",
help="Show the programme version",
action="version",
version=f"Crivo {__version__}",
)

return parser.parse_args()


def main():
"""
Main logic for Crivo.
Expand All @@ -53,15 +82,27 @@ def main():
if args.file:
content = process_file(args.file)
filtered_output = filter_content(
content, args.scope.split(",") if args.scope else [], args.ip, args.ipv4, args.ipv6, args.domain, args.url
content,
args.scope.split(",") if args.scope else [],
args.ip,
args.ipv4,
args.ipv6,
args.domain,
args.url,
)
print("\n".join(filtered_output))

elif args.webpage:
print(args.webpage)
content = scrape_webpage(args.webpage)
filtered_output = filter_content(
content, args.scope.split(",") if args.scope else [], args.ip, args.ipv4, args.ipv6, args.domain, args.url
content,
args.scope.split(",") if args.scope else [],
args.ip,
args.ipv4,
args.ipv6,
args.domain,
args.url,
)
print("\n".join(filtered_output))

Expand All @@ -76,18 +117,32 @@ def main():
try:
content = scrape_webpage(url)
filtered_output = filter_content(
content, args.scope.split(",") if args.scope else [], args.ip, args.ipv4, args.ipv6, args.domain, args.url
content,
args.scope.split(",") if args.scope else [],
args.ip,
args.ipv4,
args.ipv6,
args.domain,
args.url,
)
print("\n".join(filtered_output)) # Print results directly after the URL
print(
"\n".join(filtered_output)
) # Print results directly after the URL
except Exception as e:
print(f"Error fetching content from {url}: {e}", file=sys.stderr)

except FileNotFoundError:
print(f"Error: The file '{args.webpage_list}' was not found.", file=sys.stderr)
print(
f"Error: The file '{args.webpage_list}' was not found.", file=sys.stderr
)
sys.exit(1)
else:
print("Error: An input file (-f), a webpage (-w), or a webpage list (-W) is required.", file=sys.stderr)
print(
"Error: An input file (-f), a webpage (-w), or a webpage list (-W) is required.",
file=sys.stderr,
)
sys.exit(1)



if __name__ == "__main__":
main()
4 changes: 3 additions & 1 deletion crivo/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"""


def normalise_text(text):
"""
Normalises text by stripping whitespace and converting to lowercase.
Expand All @@ -23,6 +24,7 @@ def normalise_text(text):
"""
return text.strip().lower()


def validate_scope(scope):
"""
Validates the provided scope filters and ensures they are in the correct format.
Expand All @@ -37,6 +39,7 @@ def validate_scope(scope):
return []
return [s.strip().lower() for s in scope.split(",") if s.strip()]


def log_verbose(message, verbose):
"""
Prints a message if verbose mode is enabled.
Expand All @@ -47,4 +50,3 @@ def log_verbose(message, verbose):
"""
if verbose:
print(message)

2 changes: 1 addition & 1 deletion crivo/web_scraper.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from urllib.parse import urljoin
import re


def scrape_webpage(url, resolve_links=False):
"""
Fetches the content of a webpage and optionally resolves relative links.
Expand Down Expand Up @@ -47,4 +48,3 @@ def scrape_webpage(url, resolve_links=False):
return content
except requests.exceptions.RequestException as e:
raise RuntimeError(f"Error fetching the webpage '{url}': {e}")

3 changes: 2 additions & 1 deletion crivo_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@

from crivo.main import main


def run_cli():
"""
Runs the Crivo command-line interface.
"""
main()


if __name__ == "__main__":
run_cli()

30 changes: 15 additions & 15 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,28 @@

# Helper function to read requirements.txt
def parse_requirements(filename):
with open(filename, 'r', encoding='utf-8') as f:
return [line.strip() for line in f if line.strip() and not line.startswith('#')]
with open(filename, "r", encoding="utf-8") as f:
return [line.strip() for line in f if line.strip() and not line.startswith("#")]


setup(
name='crivo',
name="crivo",
version=__version__,
packages=find_packages(),
install_requires=parse_requirements('requirements.txt'),
install_requires=parse_requirements("requirements.txt"),
entry_points={
'console_scripts': [
'crivo=crivo.main:main',
"console_scripts": [
"crivo=crivo.main:main",
],
},
author='Guilherme Santana',
author_email='git@gmdsantana.com',
description='A tool for extracting and filtering URLs, IPs, domains, and subdomains from text or web pages, with built-in web scraping capabilities.',
license='MIT',
url='https://github.com/GMDSantana/crivo',
author="Guilherme Santana",
author_email="git@gmdsantana.com",
description="A tool for extracting and filtering URLs, IPs, domains, and subdomains from text or web pages, with built-in web scraping capabilities.",
license="MIT",
url="https://github.com/GMDSantana/crivo",
classifiers=[
'Programming Language :: Python :: 3',
'License :: OSI Approved :: MIT License',
'Operating System :: POSIX :: Linux',
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: POSIX :: Linux",
],
)

5 changes: 4 additions & 1 deletion tests/test_file_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import pytest
from crivo.file_processor import process_file


def test_process_file_valid():
# Create a temporary file for testing
with open("temp_test_file.txt", "w", encoding="utf-8") as f:
Expand All @@ -24,14 +25,16 @@ def test_process_file_valid():

# Clean up the temporary file
import os

os.remove("temp_test_file.txt")


def test_process_file_not_found():
with pytest.raises(FileNotFoundError):
process_file("non_existent_file.txt")


def test_process_file_io_error(mocker):
mocker.patch("builtins.open", side_effect=IOError("Mocked IO error"))
with pytest.raises(IOError):
process_file("mocked_file.txt")

7 changes: 6 additions & 1 deletion tests/test_filter_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,38 +14,43 @@
import pytest
from crivo.filter_engine import filter_content


def test_filter_content_no_filters():
content = "192.168.0.1 example.com https://example.com"
result = filter_content(content)
expected = ["192.168.0.1", "example.com", "https://example.com"]
assert sorted(result) == sorted(expected)


def test_filter_content_ipv4():
content = "192.168.0.1 example.com https://example.com"
result = filter_content(content, filter_ipv4=True)
expected = ["192.168.0.1"]
assert sorted(result) == sorted(expected)


def test_filter_content_domain():
content = "192.168.0.1 example.com https://example.com"
result = filter_content(content, filter_domain=True)
expected = ["example.com"]
assert sorted(result) == sorted(expected)


def test_filter_content_url():
content = "Visit https://example.com and http://test.com for details."
result = filter_content(content, filter_url=True)
expected = ["https://example.com", "http://test.com"]
assert sorted(result) == sorted(expected)


def test_filter_content_with_scope():
content = "192.168.0.1 example.com https://example.com"
result = filter_content(content, scope_filters=["example"])
expected = ["example.com", "https://example.com"]
assert sorted(result) == sorted(expected)


def test_filter_content_combined_filters():
content = "192.168.0.1 example.com https://example.com"
result = filter_content(content, filter_ip=True, filter_url=True)
assert sorted(result) == ["192.168.0.1", "https://example.com"]

Loading

0 comments on commit ae9f7da

Please sign in to comment.