Skip to content

Commit

Permalink
ENH: Handle attachments in /Kids and provide object-oriented API (#3108)
Browse files Browse the repository at this point in the history
Closes #2087.
Closes #3103.
  • Loading branch information
stefan6419846 authored Feb 9, 2025
1 parent 912b50c commit dd106dc
Show file tree
Hide file tree
Showing 7 changed files with 339 additions and 70 deletions.
12 changes: 12 additions & 0 deletions docs/user/extract-attachments.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,15 @@ for name, content_list in reader.attachments.items():
with open(f"{name}-{i}", "wb") as fp:
fp.write(content)
```

Alternatively, you can retrieve them in an object-oriented fashion if you need
further details for these files:

```python
from pypdf import PdfReader

reader = PdfReader("example.pdf")

for attachment in reader.attachment_list:
print(attachment.name, attachment.alternative_name, attachment.content)
```
102 changes: 32 additions & 70 deletions pypdf/_doc_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from typing import (
Any,
Dict,
Generator,
Iterable,
Iterator,
List,
Expand Down Expand Up @@ -87,6 +88,7 @@
create_string_object,
is_null_or_none,
)
from .generic._files import EmbeddedFile
from .types import OutlineType, PagemodeType
from .xmp import XmpInformation

Expand Down Expand Up @@ -1332,13 +1334,19 @@ def xfa(self) -> Optional[Dict[str, Any]]:

@property
def attachments(self) -> Mapping[str, List[bytes]]:
"""Mapping of attachment filenames to their content."""
return LazyDict(
{
name: (self._get_attachment_list, name)
for name in self._list_attachments()
}
)

@property
def attachment_list(self) -> Generator[EmbeddedFile, None, None]:
"""Iterable of attachment objects."""
yield from EmbeddedFile._load(self.root_object)

def _list_attachments(self) -> List[str]:
"""
Retrieves the list of filenames of file attachments.
Expand All @@ -1347,36 +1355,12 @@ def _list_attachments(self) -> List[str]:
list of filenames
"""
catalog = self.root_object
# From the catalog get the embedded file names
try:
# This is a name tree of the format [name_1, reference_1, name_2, reference_2, ...]
names = cast(
ArrayObject,
cast(
DictionaryObject,
cast(DictionaryObject, catalog["/Names"])["/EmbeddedFiles"],
)["/Names"],
)
except KeyError:
return []
attachment_names: List[str] = []
for i, name in enumerate(names):
if isinstance(name, str):
attachment_names.append(name)
else:
name = name.get_object()
for key in ["/UF", "/F"]:
# PDF 2.0 reference, table 43:
# > A PDF reader shall use the value of the UF key, when present, instead of the F key.
if key in name:
name = name[key].get_object()
if name == names[i - 1]:
# Avoid duplicates for the same entry.
continue
attachment_names.append(name)
break
return attachment_names
names = []
for entry in self.attachment_list:
names.append(entry.name)
if (name := entry.alternative_name) != entry.name and name:
names.append(name)
return names

def _get_attachment_list(self, name: str) -> List[bytes]:
out = self._get_attachments(name)[name]
Expand All @@ -1402,50 +1386,28 @@ def _get_attachments(
If the filename exists multiple times a list of the different versions will be provided.
"""
catalog = self.root_object
# From the catalog get the embedded file names
try:
# This is a name tree of the format [name_1, reference_1, name_2, reference_2, ...]
names = cast(
ArrayObject,
cast(
DictionaryObject,
cast(DictionaryObject, catalog["/Names"])["/EmbeddedFiles"],
)["/Names"],
)
except KeyError:
return {}
attachments: Dict[str, Union[bytes, List[bytes]]] = {}

# Loop through attachments
for i, name in enumerate(names):
if isinstance(name, str):
# Retrieve the corresponding reference.
file_dictionary = names[i + 1].get_object()
else:
# We have the reference, but need to determine the name.
file_dictionary = name.get_object()
for key in ["/UF", "/F"]:
# PDF 2.0 reference, table 43:
# > A PDF reader shall use the value of the UF key, when present, instead of the F key.
if key in file_dictionary:
name = file_dictionary[key].get_object()
break
for entry in self.attachment_list:
names = set()
alternative_name = entry.alternative_name
if filename is not None:
if filename in {entry.name, alternative_name}:
name = entry.name if filename == entry.name else alternative_name
names.add(name)
else:
continue
if name == names[i - 1]:
# Avoid extracting the same file twice.
continue

if filename is not None and name != filename:
continue
file_data = file_dictionary["/EF"]["/F"].get_data()
if name in attachments:
if not isinstance(attachments[name], list):
attachments[name] = [attachments[name]] # type:ignore
attachments[name].append(file_data) # type:ignore
else:
attachments[name] = file_data
names = {entry.name, alternative_name}

for name in names:
if name is None:
continue
if name in attachments:
if not isinstance(attachments[name], list):
attachments[name] = [attachments[name]] # type:ignore
attachments[name].append(entry.content) # type:ignore
else:
attachments[name] = entry.content
return attachments

@abstractmethod
Expand Down
2 changes: 2 additions & 0 deletions pypdf/generic/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
TreeObject,
read_object,
)
from ._files import EmbeddedFile
from ._fit import Fit
from ._outline import OutlineItem
from ._rectangle import RectangleObject
Expand Down Expand Up @@ -207,6 +208,7 @@ def link(
"DecodedStreamObject",
"Destination",
"DictionaryObject",
"EmbeddedFile",
"EncodedStreamObject",
"Field",
"Fit",
Expand Down
148 changes: 148 additions & 0 deletions pypdf/generic/_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
from __future__ import annotations

from typing import TYPE_CHECKING, Generator, Optional, cast

from pypdf._utils import parse_iso8824_date
from pypdf.constants import FileSpecificationDictionaryEntries
from pypdf.errors import PdfReadError
from pypdf.generic import ArrayObject, DictionaryObject, StreamObject

if TYPE_CHECKING:
import datetime


class EmbeddedFile:
"""
Container holding the information on an embedded file.
Attributes are evaluated lazily if possible.
Further information on embedded files can be found in section 7.11 of the PDF 2.0 specification.
"""
def __init__(self, name: str, pdf_object: DictionaryObject) -> None:
"""
Args:
name: The (primary) name as provided in the name tree.
pdf_object: The corresponding PDF object to allow retrieving further data.
"""
self.name = name
self.pdf_object = pdf_object

@property
def alternative_name(self) -> Optional[str]:
"""Retrieve the alternative name (file specification)."""
for key in [FileSpecificationDictionaryEntries.UF, FileSpecificationDictionaryEntries.F]:
# PDF 2.0 reference, table 43:
# > A PDF reader shall use the value of the UF key, when present, instead of the F key.
if key in self.pdf_object:
return cast(str, self.pdf_object[key].get_object())
return None

@property
def description(self) -> Optional[str]:
"""Retrieve the description."""
return self.pdf_object.get(FileSpecificationDictionaryEntries.DESC)

@property
def associated_file_relationship(self) -> str:
"""Retrieve the relationship of the referring document to this embedded file."""
return self.pdf_object.get("/AFRelationship", "/Unspecified")

@property
def _embedded_file(self) -> StreamObject:
"""Retrieve the actual embedded file stream."""
if "/EF" not in self.pdf_object:
raise PdfReadError(f"/EF entry not found: {self.pdf_object}")
ef = cast(DictionaryObject, self.pdf_object["/EF"])
for key in [FileSpecificationDictionaryEntries.UF, FileSpecificationDictionaryEntries.F]:
if key in ef:
return cast(StreamObject, ef[key].get_object())
raise PdfReadError(f"No /(U)F key found in file dictionary: {ef}")

@property
def _params(self) -> DictionaryObject:
"""Retrieve the file-specific parameters."""
return self._embedded_file.get("/Params", DictionaryObject()).get_object()

@property
def subtype(self) -> Optional[str]:
"""Retrieve the subtype. This is a MIME media type, prefixed by a slash."""
return self._embedded_file.get("/Subtype")

@property
def content(self) -> bytes:
"""Retrieve the actual file content."""
return self._embedded_file.get_data()

@property
def size(self) -> Optional[int]:
"""Retrieve the size of the uncompressed file in bytes."""
return self._params.get("/Size")

@property
def creation_date(self) -> Optional[datetime.datetime]:
"""Retrieve the file creation datetime."""
return parse_iso8824_date(self._params.get("/CreationDate"))

@property
def modification_date(self) -> Optional[datetime.datetime]:
"""Retrieve the datetime of the last file modification."""
return parse_iso8824_date(self._params.get("/ModDate"))

@property
def checksum(self) -> Optional[bytes]:
"""Retrieve the MD5 checksum of the (uncompressed) file."""
return self._params.get("/CheckSum")

def __repr__(self) -> str:
return f"<{self.__class__.__name__} name={self.name!r}>"

@classmethod
def _load_from_names(cls, names: ArrayObject) -> Generator[EmbeddedFile, None, None]:
"""
Convert the given name tree into class instances.
Args:
names: The name tree to load the data from.
Returns:
Iterable of class instances for the files found.
"""
# This is a name tree of the format [name_1, reference_1, name_2, reference_2, ...]
for i, name in enumerate(names):
if not isinstance(name, str):
# Skip plain strings and retrieve them as `direct_name` by index.
file_dictionary = name.get_object()
direct_name = names[i - 1].get_object()
yield EmbeddedFile(name=direct_name, pdf_object=file_dictionary)

@classmethod
def _load(cls, catalog: DictionaryObject) -> Generator[EmbeddedFile, None, None]:
"""
Load the embedded files for the given document catalog.
This method and its signature are considered internal API and thus not exposed publicly for now.
Args:
catalog: The document catalog to load from.
Returns:
Iterable of class instances for the files found.
"""
try:
container = cast(
DictionaryObject,
cast(DictionaryObject, catalog["/Names"])["/EmbeddedFiles"],
)
except KeyError:
return

if "/Kids" in container:
for kid in cast(ArrayObject, container["/Kids"].get_object()):
# There might be further (nested) kids here.
# Wait for an example before evaluating an implementation.
kid = kid.get_object()
if "/Names" in kid:
yield from cls._load_from_names(cast(ArrayObject, kid["/Names"]))
if "/Names" in container:
yield from cls._load_from_names(cast(ArrayObject, container["/Names"]))
Empty file added tests/generic/__init__.py
Empty file.
Loading

0 comments on commit dd106dc

Please sign in to comment.