From dd106dc193b7fba4e8ee8345ed0f24ca68ad40f7 Mon Sep 17 00:00:00 2001 From: Stefan <96178532+stefan6419846@users.noreply.github.com> Date: Sun, 9 Feb 2025 10:11:17 +0100 Subject: [PATCH] ENH: Handle attachments in /Kids and provide object-oriented API (#3108) Closes #2087. Closes #3103. --- docs/user/extract-attachments.md | 12 +++ pypdf/_doc_common.py | 102 +++++++-------------- pypdf/generic/__init__.py | 2 + pypdf/generic/_files.py | 148 +++++++++++++++++++++++++++++++ tests/generic/__init__.py | 0 tests/generic/test_files.py | 111 +++++++++++++++++++++++ tests/test_doc_common.py | 34 +++++++ 7 files changed, 339 insertions(+), 70 deletions(-) create mode 100644 pypdf/generic/_files.py create mode 100644 tests/generic/__init__.py create mode 100644 tests/generic/test_files.py diff --git a/docs/user/extract-attachments.md b/docs/user/extract-attachments.md index c73b17ebd..0ec6a0a7f 100644 --- a/docs/user/extract-attachments.md +++ b/docs/user/extract-attachments.md @@ -16,3 +16,15 @@ for name, content_list in reader.attachments.items(): with open(f"{name}-{i}", "wb") as fp: fp.write(content) ``` + +Alternatively, you can retrieve them in an object-oriented fashion if you need +further details for these files: + +```python +from pypdf import PdfReader + +reader = PdfReader("example.pdf") + +for attachment in reader.attachment_list: + print(attachment.name, attachment.alternative_name, attachment.content) +``` \ No newline at end of file diff --git a/pypdf/_doc_common.py b/pypdf/_doc_common.py index 1f989842b..b71e00e5a 100644 --- a/pypdf/_doc_common.py +++ b/pypdf/_doc_common.py @@ -35,6 +35,7 @@ from typing import ( Any, Dict, + Generator, Iterable, Iterator, List, @@ -87,6 +88,7 @@ create_string_object, is_null_or_none, ) +from .generic._files import EmbeddedFile from .types import OutlineType, PagemodeType from .xmp import XmpInformation @@ -1332,6 +1334,7 @@ def xfa(self) -> Optional[Dict[str, Any]]: @property def attachments(self) -> Mapping[str, List[bytes]]: + """Mapping of attachment filenames to their content.""" return LazyDict( { name: (self._get_attachment_list, name) @@ -1339,6 +1342,11 @@ def attachments(self) -> Mapping[str, List[bytes]]: } ) + @property + def attachment_list(self) -> Generator[EmbeddedFile, None, None]: + """Iterable of attachment objects.""" + yield from EmbeddedFile._load(self.root_object) + def _list_attachments(self) -> List[str]: """ Retrieves the list of filenames of file attachments. @@ -1347,36 +1355,12 @@ def _list_attachments(self) -> List[str]: list of filenames """ - catalog = self.root_object - # From the catalog get the embedded file names - try: - # This is a name tree of the format [name_1, reference_1, name_2, reference_2, ...] - names = cast( - ArrayObject, - cast( - DictionaryObject, - cast(DictionaryObject, catalog["/Names"])["/EmbeddedFiles"], - )["/Names"], - ) - except KeyError: - return [] - attachment_names: List[str] = [] - for i, name in enumerate(names): - if isinstance(name, str): - attachment_names.append(name) - else: - name = name.get_object() - for key in ["/UF", "/F"]: - # PDF 2.0 reference, table 43: - # > A PDF reader shall use the value of the UF key, when present, instead of the F key. - if key in name: - name = name[key].get_object() - if name == names[i - 1]: - # Avoid duplicates for the same entry. - continue - attachment_names.append(name) - break - return attachment_names + names = [] + for entry in self.attachment_list: + names.append(entry.name) + if (name := entry.alternative_name) != entry.name and name: + names.append(name) + return names def _get_attachment_list(self, name: str) -> List[bytes]: out = self._get_attachments(name)[name] @@ -1402,50 +1386,28 @@ def _get_attachments( If the filename exists multiple times a list of the different versions will be provided. """ - catalog = self.root_object - # From the catalog get the embedded file names - try: - # This is a name tree of the format [name_1, reference_1, name_2, reference_2, ...] - names = cast( - ArrayObject, - cast( - DictionaryObject, - cast(DictionaryObject, catalog["/Names"])["/EmbeddedFiles"], - )["/Names"], - ) - except KeyError: - return {} attachments: Dict[str, Union[bytes, List[bytes]]] = {} - - # Loop through attachments - for i, name in enumerate(names): - if isinstance(name, str): - # Retrieve the corresponding reference. - file_dictionary = names[i + 1].get_object() - else: - # We have the reference, but need to determine the name. - file_dictionary = name.get_object() - for key in ["/UF", "/F"]: - # PDF 2.0 reference, table 43: - # > A PDF reader shall use the value of the UF key, when present, instead of the F key. - if key in file_dictionary: - name = file_dictionary[key].get_object() - break + for entry in self.attachment_list: + names = set() + alternative_name = entry.alternative_name + if filename is not None: + if filename in {entry.name, alternative_name}: + name = entry.name if filename == entry.name else alternative_name + names.add(name) else: continue - if name == names[i - 1]: - # Avoid extracting the same file twice. - continue - - if filename is not None and name != filename: - continue - file_data = file_dictionary["/EF"]["/F"].get_data() - if name in attachments: - if not isinstance(attachments[name], list): - attachments[name] = [attachments[name]] # type:ignore - attachments[name].append(file_data) # type:ignore else: - attachments[name] = file_data + names = {entry.name, alternative_name} + + for name in names: + if name is None: + continue + if name in attachments: + if not isinstance(attachments[name], list): + attachments[name] = [attachments[name]] # type:ignore + attachments[name].append(entry.content) # type:ignore + else: + attachments[name] = entry.content return attachments @abstractmethod diff --git a/pypdf/generic/__init__.py b/pypdf/generic/__init__.py index 4f4c27a8f..dc4545993 100644 --- a/pypdf/generic/__init__.py +++ b/pypdf/generic/__init__.py @@ -60,6 +60,7 @@ TreeObject, read_object, ) +from ._files import EmbeddedFile from ._fit import Fit from ._outline import OutlineItem from ._rectangle import RectangleObject @@ -207,6 +208,7 @@ def link( "DecodedStreamObject", "Destination", "DictionaryObject", + "EmbeddedFile", "EncodedStreamObject", "Field", "Fit", diff --git a/pypdf/generic/_files.py b/pypdf/generic/_files.py new file mode 100644 index 000000000..9f27a2ec9 --- /dev/null +++ b/pypdf/generic/_files.py @@ -0,0 +1,148 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Generator, Optional, cast + +from pypdf._utils import parse_iso8824_date +from pypdf.constants import FileSpecificationDictionaryEntries +from pypdf.errors import PdfReadError +from pypdf.generic import ArrayObject, DictionaryObject, StreamObject + +if TYPE_CHECKING: + import datetime + + +class EmbeddedFile: + """ + Container holding the information on an embedded file. + + Attributes are evaluated lazily if possible. + + Further information on embedded files can be found in section 7.11 of the PDF 2.0 specification. + """ + def __init__(self, name: str, pdf_object: DictionaryObject) -> None: + """ + Args: + name: The (primary) name as provided in the name tree. + pdf_object: The corresponding PDF object to allow retrieving further data. + """ + self.name = name + self.pdf_object = pdf_object + + @property + def alternative_name(self) -> Optional[str]: + """Retrieve the alternative name (file specification).""" + for key in [FileSpecificationDictionaryEntries.UF, FileSpecificationDictionaryEntries.F]: + # PDF 2.0 reference, table 43: + # > A PDF reader shall use the value of the UF key, when present, instead of the F key. + if key in self.pdf_object: + return cast(str, self.pdf_object[key].get_object()) + return None + + @property + def description(self) -> Optional[str]: + """Retrieve the description.""" + return self.pdf_object.get(FileSpecificationDictionaryEntries.DESC) + + @property + def associated_file_relationship(self) -> str: + """Retrieve the relationship of the referring document to this embedded file.""" + return self.pdf_object.get("/AFRelationship", "/Unspecified") + + @property + def _embedded_file(self) -> StreamObject: + """Retrieve the actual embedded file stream.""" + if "/EF" not in self.pdf_object: + raise PdfReadError(f"/EF entry not found: {self.pdf_object}") + ef = cast(DictionaryObject, self.pdf_object["/EF"]) + for key in [FileSpecificationDictionaryEntries.UF, FileSpecificationDictionaryEntries.F]: + if key in ef: + return cast(StreamObject, ef[key].get_object()) + raise PdfReadError(f"No /(U)F key found in file dictionary: {ef}") + + @property + def _params(self) -> DictionaryObject: + """Retrieve the file-specific parameters.""" + return self._embedded_file.get("/Params", DictionaryObject()).get_object() + + @property + def subtype(self) -> Optional[str]: + """Retrieve the subtype. This is a MIME media type, prefixed by a slash.""" + return self._embedded_file.get("/Subtype") + + @property + def content(self) -> bytes: + """Retrieve the actual file content.""" + return self._embedded_file.get_data() + + @property + def size(self) -> Optional[int]: + """Retrieve the size of the uncompressed file in bytes.""" + return self._params.get("/Size") + + @property + def creation_date(self) -> Optional[datetime.datetime]: + """Retrieve the file creation datetime.""" + return parse_iso8824_date(self._params.get("/CreationDate")) + + @property + def modification_date(self) -> Optional[datetime.datetime]: + """Retrieve the datetime of the last file modification.""" + return parse_iso8824_date(self._params.get("/ModDate")) + + @property + def checksum(self) -> Optional[bytes]: + """Retrieve the MD5 checksum of the (uncompressed) file.""" + return self._params.get("/CheckSum") + + def __repr__(self) -> str: + return f"<{self.__class__.__name__} name={self.name!r}>" + + @classmethod + def _load_from_names(cls, names: ArrayObject) -> Generator[EmbeddedFile, None, None]: + """ + Convert the given name tree into class instances. + + Args: + names: The name tree to load the data from. + + Returns: + Iterable of class instances for the files found. + """ + # This is a name tree of the format [name_1, reference_1, name_2, reference_2, ...] + for i, name in enumerate(names): + if not isinstance(name, str): + # Skip plain strings and retrieve them as `direct_name` by index. + file_dictionary = name.get_object() + direct_name = names[i - 1].get_object() + yield EmbeddedFile(name=direct_name, pdf_object=file_dictionary) + + @classmethod + def _load(cls, catalog: DictionaryObject) -> Generator[EmbeddedFile, None, None]: + """ + Load the embedded files for the given document catalog. + + This method and its signature are considered internal API and thus not exposed publicly for now. + + Args: + catalog: The document catalog to load from. + + Returns: + Iterable of class instances for the files found. + """ + try: + container = cast( + DictionaryObject, + cast(DictionaryObject, catalog["/Names"])["/EmbeddedFiles"], + ) + except KeyError: + return + + if "/Kids" in container: + for kid in cast(ArrayObject, container["/Kids"].get_object()): + # There might be further (nested) kids here. + # Wait for an example before evaluating an implementation. + kid = kid.get_object() + if "/Names" in kid: + yield from cls._load_from_names(cast(ArrayObject, kid["/Names"])) + if "/Names" in container: + yield from cls._load_from_names(cast(ArrayObject, container["/Names"])) diff --git a/tests/generic/__init__.py b/tests/generic/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/generic/test_files.py b/tests/generic/test_files.py new file mode 100644 index 000000000..cc7fb0719 --- /dev/null +++ b/tests/generic/test_files.py @@ -0,0 +1,111 @@ +"""Test the pypdf.generic._files module.""" +import datetime +import shutil +import subprocess +from io import BytesIO +from pathlib import Path + +import pytest + +from pypdf import PdfReader +from pypdf.errors import PdfReadError +from pypdf.generic import ByteStringObject, DictionaryObject, EmbeddedFile, NameObject +from tests import get_data_from_url + +TESTS_ROOT = Path(__file__).parent.parent.resolve() +PROJECT_ROOT = TESTS_ROOT.parent +SAMPLE_ROOT = PROJECT_ROOT / "sample-files" + +PDFATTACH_BINARY = shutil.which("pdfattach") + + +@pytest.mark.skipif(PDFATTACH_BINARY is None, reason="Requires poppler-utils") +def test_embedded_file__basic(tmpdir): + clean_path = SAMPLE_ROOT / "002-trivial-libre-office-writer" / "002-trivial-libre-office-writer.pdf" + attached_path = tmpdir / "attached.pdf" + file_path = tmpdir / "test.txt" + file_path.write_binary(b"Hello World\n") + subprocess.run([PDFATTACH_BINARY, clean_path, file_path, attached_path]) # noqa: S603 + with PdfReader(str(attached_path)) as reader: + attachment = next(iter(EmbeddedFile._load(reader.root_object))) + + assert attachment.name == "test.txt" + assert attachment.alternative_name == "test.txt" + assert attachment.description is None + assert attachment.associated_file_relationship == "/Unspecified" + assert attachment.subtype is None + assert attachment.content == b"Hello World\n" + assert attachment.size == 12 + assert attachment.creation_date is None + assert attachment.modification_date is None + assert attachment.checksum is None + assert repr(attachment) == "" + + +def test_embedded_file__artificial(): + # No alternative name. + pdf_object = DictionaryObject(answer=42) + attachment = EmbeddedFile(name="dummy", pdf_object=pdf_object) + assert attachment.alternative_name is None + + # No /EF. + with pytest.raises(PdfReadError, match=f"/EF entry not found: {pdf_object}"): + _ = attachment._embedded_file + + # Empty /EF dictionary. + pdf_object = DictionaryObject() + pdf_object[NameObject("/EF")] = DictionaryObject() + attachment = EmbeddedFile(name="dummy", pdf_object=pdf_object) + with pytest.raises(PdfReadError, match=r"No /\(U\)F key found in file dictionary: {}"): + _ = attachment._embedded_file + + # Missing /Params key. + pdf_object[NameObject("/EF")] = DictionaryObject() + pdf_object[NameObject("/EF")][NameObject("/F")] = DictionaryObject(answer=42) + assert attachment._params == DictionaryObject() + + # An actual checksum is set. + # Generated using `hashlib.md5(b"Hello World!\n").digest()` + params = DictionaryObject() + params[NameObject("/CheckSum")] = ByteStringObject(b"\x8d\xdd\x8b\xe4\xb1y\xa5)\xaf\xa5\xf2\xff\xaeK\x98X") + pdf_object[NameObject("/EF")][NameObject("/F")][NameObject("/Params")] = params + assert attachment.checksum == b"\x8d\xdd\x8b\xe4\xb1y\xa5)\xaf\xa5\xf2\xff\xaeK\x98X" + + +@pytest.mark.enable_socket +def test_embedded_file__kids(): + # Generated using the instructions available from + # https://medium.com/@pymupdf/zugferd-and-ghostscript-how-to-create-industry-standard-and-compliant-pdf-e-invoices-83c9fde31ee5 + # Notes: + # * Yes, we need the full paths. Otherwise, the output file will only have an empty page. + # * The XML file has been a custom basic text file. + # * The input PDF file has been the `002-trivial-libre-office-writer.pdf` file. + url = "https://github.com/user-attachments/files/18691309/embedded_files_kids.pdf" + name = "embedded_files_kids.pdf" + reader = PdfReader(BytesIO(get_data_from_url(url, name=name))) + attachments = list(EmbeddedFile._load(reader.root_object)) + assert len(attachments) == 1 + attachment = attachments[0] + + assert attachment.name == "factur-x.xml" + assert attachment.alternative_name == "factur-x.xml" + assert attachment.description == "ZUGFeRD electronic invoice" + assert attachment.associated_file_relationship == "/Alternative" + assert attachment.subtype == "/text/xml" + assert attachment.content.startswith(b"Hello World!\n\nLorem ipsum dolor sit amet, ") + assert attachment.content.endswith(b"\ntakimata sanctus est Lorem ipsum dolor sit amet.\n") + assert attachment.size == 606 + assert attachment.creation_date is None + assert attachment.modification_date == datetime.datetime( + 2013, 1, 21, 8, 14, 33, tzinfo=datetime.timezone(datetime.timedelta(hours=1)) + ) + assert attachment.checksum is None + assert repr(attachment) == "" + + # No /Names in /Kids. + del ( + reader.root_object[NameObject("/Names")][NameObject("/EmbeddedFiles")][NameObject("/Kids")][0] + .get_object()[NameObject("/Names")] + ) + attachments = list(EmbeddedFile._load(reader.root_object)) + assert attachments == [] diff --git a/tests/test_doc_common.py b/tests/test_doc_common.py index cc653f879..8b05cad3e 100644 --- a/tests/test_doc_common.py +++ b/tests/test_doc_common.py @@ -3,10 +3,12 @@ import shutil import subprocess from pathlib import Path +from unittest import mock import pytest from pypdf import PdfReader, PdfWriter +from pypdf.generic import EmbeddedFile TESTS_ROOT = Path(__file__).parent.resolve() PROJECT_ROOT = TESTS_ROOT.parent @@ -21,6 +23,7 @@ def test_attachments(tmpdir): clean_path = SAMPLE_ROOT / "002-trivial-libre-office-writer" / "002-trivial-libre-office-writer.pdf" with PdfReader(clean_path) as pdf: assert pdf._list_attachments() == [] + assert list(pdf.attachment_list) == [] # UF = name. attached_path = tmpdir / "attached.pdf" @@ -30,6 +33,8 @@ def test_attachments(tmpdir): with PdfReader(str(attached_path)) as pdf: assert pdf._list_attachments() == ["test.txt"] assert pdf._get_attachments("test.txt") == {"test.txt": b"Hello World\n"} + assert [(x.name, x.content) for x in pdf.attachment_list] == [("test.txt", b"Hello World\n")] + assert next(pdf.attachment_list).alternative_name == "test.txt" # UF != name. different_path = tmpdir / "different.pdf" @@ -38,6 +43,8 @@ def test_attachments(tmpdir): assert pdf._list_attachments() == ["test.txt", "my-file.txt"] assert pdf._get_attachments("test.txt") == {"test.txt": b"Hello World\n"} assert pdf._get_attachments("my-file.txt") == {"my-file.txt": b"Hello World\n"} + assert [(x.name, x.content) for x in pdf.attachment_list] == [("test.txt", b"Hello World\n")] + assert next(pdf.attachment_list).alternative_name == "my-file.txt" # Only name. no_f_path = tmpdir / "no-f.pdf" @@ -45,6 +52,8 @@ def test_attachments(tmpdir): with PdfReader(str(no_f_path)) as pdf: assert pdf._list_attachments() == ["test.txt"] assert pdf._get_attachments("test.txt") == {"test.txt": b"Hello World\n"} + assert [(x.name, x.content) for x in pdf.attachment_list] == [("test.txt", b"Hello World\n")] + assert next(pdf.attachment_list).alternative_name is None # UF and F. uf_f_path = tmpdir / "uf-f.pdf" @@ -52,6 +61,8 @@ def test_attachments(tmpdir): with PdfReader(str(uf_f_path)) as pdf: assert pdf._list_attachments() == ["test.txt"] assert pdf._get_attachments("test.txt") == {"test.txt": b"Hello World\n"} + assert [(x.name, x.content) for x in pdf.attachment_list] == [("test.txt", b"Hello World\n")] + assert next(pdf.attachment_list).alternative_name == "test.txt" # Only F. only_f_path = tmpdir / "f.pdf" @@ -59,6 +70,8 @@ def test_attachments(tmpdir): with PdfReader(str(only_f_path)) as pdf: assert pdf._list_attachments() == ["test.txt"] assert pdf._get_attachments("test.txt") == {"test.txt": b"Hello World\n"} + assert [(x.name, x.content) for x in pdf.attachment_list] == [("test.txt", b"Hello World\n")] + assert next(pdf.attachment_list).alternative_name == "test.txt" def test_get_attachments__same_attachment_more_than_twice(): @@ -69,3 +82,24 @@ def test_get_attachments__same_attachment_more_than_twice(): assert writer._get_attachments("test.txt") == { "test.txt": [b"content0", b"content1", b"content2", b"content3", b"content4"] } + assert [(x.name, x.content) for x in writer.attachment_list] == [ + ("test.txt", b"content0"), + ("test.txt", b"content1"), + ("test.txt", b"content2"), + ("test.txt", b"content3"), + ("test.txt", b"content4"), + ] + + +def test_get_attachments__alternative_name_is_none(): + writer = PdfWriter() + attachment = EmbeddedFile(name="test.txt", pdf_object=writer.root_object) + assert attachment.alternative_name is None + with mock.patch( + "pypdf._writer.PdfWriter.attachment_list", + new_callable=mock.PropertyMock(return_value=[attachment]) + ), mock.patch( + "pypdf.generic._files.EmbeddedFile.content", + new_callable=mock.PropertyMock(return_value=b"content") + ): + assert writer._get_attachments() == {"test.txt": b"content"}