-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
9d661b6
commit 9398fcb
Showing
8 changed files
with
714 additions
and
590 deletions.
There are no files selected for viewing
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
# -*- coding: utf-8 -*- | ||
"""Helper functions for CLI tools.""" | ||
|
||
from dfvfs.lib import definitions as dfvfs_definitions | ||
|
||
from dfimagetools import decorators | ||
|
||
|
||
# This function is deprecated use the one in backend.py instead. | ||
@decorators.deprecated | ||
def SetDFVFSBackEnd(back_end): | ||
"""Sets the dfVFS back-end. | ||
Args: | ||
back_end (str): dfVFS back-end. | ||
""" | ||
if back_end == 'APM': | ||
dfvfs_definitions.PREFERRED_APM_BACK_END = ( | ||
dfvfs_definitions.TYPE_INDICATOR_APM) | ||
|
||
elif back_end == 'EXT': | ||
dfvfs_definitions.PREFERRED_EXT_BACK_END = ( | ||
dfvfs_definitions.TYPE_INDICATOR_EXT) | ||
|
||
elif back_end == 'FAT': | ||
dfvfs_definitions.PREFERRED_FAT_BACK_END = ( | ||
dfvfs_definitions.TYPE_INDICATOR_FAT) | ||
|
||
elif back_end == 'GPT': | ||
dfvfs_definitions.PREFERRED_GPT_BACK_END = ( | ||
dfvfs_definitions.TYPE_INDICATOR_GPT) | ||
|
||
elif back_end == 'HFS': | ||
dfvfs_definitions.PREFERRED_HFS_BACK_END = ( | ||
dfvfs_definitions.TYPE_INDICATOR_HFS) | ||
|
||
elif back_end == 'NTFS': | ||
dfvfs_definitions.PREFERRED_NTFS_BACK_END = ( | ||
dfvfs_definitions.TYPE_INDICATOR_NTFS) | ||
|
||
elif back_end == 'TSK': | ||
dfvfs_definitions.PREFERRED_APM_BACK_END = ( | ||
dfvfs_definitions.TYPE_INDICATOR_TSK) | ||
dfvfs_definitions.PREFERRED_APM_BACK_END = ( | ||
dfvfs_definitions.TYPE_INDICATOR_TSK) | ||
dfvfs_definitions.PREFERRED_FAT_BACK_END = ( | ||
dfvfs_definitions.TYPE_INDICATOR_TSK) | ||
dfvfs_definitions.PREFERRED_GPT_BACK_END = ( | ||
dfvfs_definitions.TYPE_INDICATOR_TSK_PARTITION) | ||
dfvfs_definitions.PREFERRED_HFS_BACK_END = ( | ||
dfvfs_definitions.TYPE_INDICATOR_TSK) | ||
dfvfs_definitions.PREFERRED_NTFS_BACK_END = ( | ||
dfvfs_definitions.TYPE_INDICATOR_TSK) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,196 @@ | ||
#!/usr/bin/env python | ||
# -*- coding: utf-8 -*- | ||
"""Console script to extract data streams.""" | ||
|
||
import argparse | ||
import logging | ||
import os | ||
import sys | ||
|
||
from artifacts import reader as artifacts_reader | ||
from artifacts import registry as artifacts_registry | ||
|
||
from dfvfs.lib import errors as dfvfs_errors | ||
|
||
from dfimagetools import artifact_filters | ||
from dfimagetools import data_stream_writer | ||
from dfimagetools import file_entry_lister | ||
from dfimagetools import windows_registry | ||
from dfimagetools.helpers import command_line | ||
|
||
|
||
def Main(): | ||
"""Entry point of console script to extract data streams. | ||
Returns: | ||
int: exit code that is provided to sys.exit(). | ||
""" | ||
argument_parser = argparse.ArgumentParser(description=( | ||
'Extracts data streams from a storage media image.')) | ||
|
||
# TODO: add filter group | ||
argument_parser.add_argument( | ||
'--artifact_definitions', '--artifact-definitions', | ||
dest='artifact_definitions', type=str, metavar='PATH', action='store', | ||
help=('Path to a directory or file containing the artifact definition ' | ||
'.yaml files.')) | ||
|
||
argument_parser.add_argument( | ||
'--artifact_filters', '--artifact-filters', dest='artifact_filters', | ||
type=str, default=None, metavar='NAMES', action='store', help=( | ||
'Comma separated list of names of artifact definitions to extract.')) | ||
|
||
argument_parser.add_argument( | ||
'--custom_artifact_definitions', '--custom-artifact-definitions', | ||
dest='custom_artifact_definitions', type=str, metavar='PATH', | ||
action='store', help=( | ||
'Path to a directory or file containing custom artifact definition ' | ||
'.yaml files. ')) | ||
|
||
# TODO: add output group | ||
argument_parser.add_argument( | ||
'-t', '--target', dest='target', action='store', metavar='PATH', | ||
default=None, help=( | ||
'target (or destination) path of a directory where the extracted ' | ||
'data streams should be stored.')) | ||
|
||
# TODO: add source group | ||
command_line.AddStorageMediaImageCLIArguments(argument_parser) | ||
|
||
argument_parser.add_argument( | ||
'source', nargs='?', action='store', metavar='image.raw', | ||
default=None, help='path of the storage media image.') | ||
|
||
options = argument_parser.parse_args() | ||
|
||
if not options.source: | ||
print('Source value is missing.') | ||
print('') | ||
argument_parser.print_help() | ||
print('') | ||
return 1 | ||
|
||
if options.artifact_filters: | ||
if (not options.artifact_definitions and | ||
not options.custom_artifact_definitions): | ||
print('[ERROR] artifact filters were specified but no paths to ' | ||
'artifact definitions were provided.') | ||
print('') | ||
return 1 | ||
|
||
# TODO: improve this, for now this script needs at least 1 filter. | ||
if not options.artifact_filters: | ||
print('[ERROR] no artifact filters were specified.') | ||
print('') | ||
return 1 | ||
|
||
target_path = options.target | ||
if not target_path: | ||
source_name = os.path.basename(options.source) | ||
target_path = os.path.join(os.getcwd(), f'{source_name:s}.extracted') | ||
|
||
if not os.path.exists(target_path): | ||
os.makedirs(target_path) | ||
|
||
elif not os.path.isdir(target_path): | ||
print('[ERROR] target path is not a directory.') | ||
print('') | ||
return 1 | ||
|
||
logging.basicConfig( | ||
level=logging.INFO, format='[%(levelname)s] %(message)s') | ||
|
||
mediator, volume_scanner_options = ( | ||
command_line.ParseStorageMediaImageCLIArguments(options)) | ||
|
||
if options.artifact_filters: | ||
registry = artifacts_registry.ArtifactDefinitionsRegistry() | ||
reader = artifacts_reader.YamlArtifactsReader() | ||
|
||
if options.artifact_definitions: | ||
if os.path.isdir(options.artifact_definitions): | ||
registry.ReadFromDirectory(reader, options.artifact_definitions) | ||
elif os.path.isfile(options.artifact_definitions): | ||
registry.ReadFromFile(reader, options.artifact_definitions) | ||
|
||
if options.custom_artifact_definitions: | ||
if os.path.isdir(options.custom_artifact_definitions): | ||
registry.ReadFromDirectory( | ||
reader, options.custom_artifact_definitions) | ||
elif os.path.isfile(options.custom_artifact_definitions): | ||
registry.ReadFromFile(reader, options.custom_artifact_definitions) | ||
|
||
entry_lister = file_entry_lister.FileEntryLister(mediator=mediator) | ||
find_specs_generated = False | ||
|
||
try: | ||
base_path_specs = entry_lister.GetBasePathSpecs( | ||
options.source, options=volume_scanner_options) | ||
if not base_path_specs: | ||
print('No supported file system found in source.') | ||
print('') | ||
return 1 | ||
|
||
for base_path_spec in base_path_specs: | ||
if not options.artifact_filters: | ||
find_specs = [] | ||
else: | ||
windows_directory = entry_lister.GetWindowsDirectory(base_path_spec) | ||
if not windows_directory: | ||
environment_variables = [] | ||
else: | ||
winregistry_collector = windows_registry.WindowsRegistryCollector( | ||
base_path_spec, windows_directory) | ||
|
||
environment_variables = ( | ||
winregistry_collector.CollectSystemEnvironmentVariables()) | ||
|
||
filter_generator = artifact_filters.ArtifactDefinitionFiltersGenerator( | ||
registry, environment_variables, []) | ||
|
||
names = options.artifact_filters.split(',') | ||
find_specs = list(filter_generator.GetFindSpecs(names)) | ||
if not find_specs: | ||
continue | ||
|
||
find_specs_generated = True | ||
|
||
file_entries_generator = entry_lister.ListFileEntriesWithFindSpecs( | ||
[base_path_spec], find_specs) | ||
|
||
stream_writer = data_stream_writer.DataStreamWriter() | ||
for file_entry, path_segments in file_entries_generator: | ||
for data_stream in file_entry.data_streams: | ||
display_path = stream_writer.GetDisplayPath( | ||
path_segments, data_stream.name) | ||
destination_path = stream_writer.GetSanitizedPath( | ||
path_segments, data_stream.name, target_path) | ||
logging.info(f'Extracting: {display_path:s} to: {destination_path:s}') | ||
|
||
destination_directory = os.path.dirname(destination_path) | ||
os.makedirs(destination_directory, exist_ok=True) | ||
|
||
stream_writer.WriteDataStream( | ||
file_entry, data_stream.name, destination_path) | ||
|
||
except dfvfs_errors.ScannerError as exception: | ||
print(f'[ERROR] {exception!s}', file=sys.stderr) | ||
print('') | ||
return 1 | ||
|
||
except KeyboardInterrupt: | ||
print('Aborted by user.', file=sys.stderr) | ||
print('') | ||
return 1 | ||
|
||
if options.artifact_filters and not find_specs_generated: | ||
print('[ERROR] an artifact filter was specified but no corresponding ' | ||
'file system find specifications were generated.') | ||
print('') | ||
return 1 | ||
|
||
return 0 | ||
|
||
|
||
if __name__ == '__main__': | ||
sys.exit(Main()) |
Oops, something went wrong.