Skip to content

Commit

Permalink
Port csr.py to cryptography; drop pyopenssl dep
Browse files Browse the repository at this point in the history
  • Loading branch information
akx committed Feb 3, 2025
1 parent 0706aaa commit dd35fda
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 110 deletions.
60 changes: 1 addition & 59 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 0 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ aiohttp = ">=3.10.2,<4.0"
aiodns = "^3.0"
brotli = "^1.0"
cchardet = { version="^2.1", python="<=3.10"}
# FIXME: Once everything using this is migrared to cryptography drop the dep
pyopenssl = ">=23.2"


[tool.poetry.group.dev.dependencies]
Expand All @@ -83,8 +81,6 @@ pytest-asyncio = ">=0.21,<1.0" # caret behaviour on 0.x is to lock to 0.x.*
bump2version = "^1.0"
detect-secrets = "^1.2"
httpx = ">=0.23,<1.0" # caret behaviour on 0.x is to lock to 0.x.*
# FIXME: Once everything using pyopenssl is migrared to cryptography drop the dep
types-pyopenssl = ">=23.2"

[build-system]
requires = ["poetry-core>=1.2.0"]
Expand Down
134 changes: 87 additions & 47 deletions src/libpvarki/mtlshelp/csr.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,23 @@
import logging
import stat
import asyncio
from ipaddress import ip_address

from OpenSSL import crypto # FIXME: use cryptography instead of pyOpenSSL

from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509.oid import NameOID
from cryptography.x509.name import _NAME_TO_NAMEOID

LOGGER = logging.getLogger(__name__)
KTYPE_NAMES = {getattr(crypto, name): name.replace("TYPE_", "") for name in dir(crypto) if name.startswith("TYPE_")}
KPTYPE = crypto.PKey
KPTYPE = rsa.RSAPrivateKey # TODO: should this be more than a type alias?
PUBDIR_MODE = stat.S_IRWXU | stat.S_IRGRP | stat.S_IROTH | stat.S_IXGRP | stat.S_IXOTH
PRIVDIR_MODE = stat.S_IRWXU

HASHER_MAP = {
"sha256": hashes.SHA256,
}


def resolve_filepaths(basedir: Path, nameprefix: str) -> Tuple[Path, Path, Path]:
"""Returns paths for privkey, pubkey, and csr files (but the files are not created
Expand All @@ -36,7 +43,7 @@ def resolve_filepaths(basedir: Path, nameprefix: str) -> Tuple[Path, Path, Path]
return privkeypath, pubkeypath, csrpath


def create_keypair(privkeypath: Path, pubkeypath: Path, ktype: int = crypto.TYPE_RSA, ksize: int = 4096) -> crypto.PKey:
def create_keypair(privkeypath: Path, pubkeypath: Path, ktype: str = "RSA", ksize: int = 4096) -> rsa.RSAPrivateKey:
"""Generate a keypair, saves files to given paths (directory must exist) and returns the
keypair object"""
for check_path in (privkeypath, pubkeypath):
Expand All @@ -48,94 +55,127 @@ def create_keypair(privkeypath: Path, pubkeypath: Path, ktype: int = crypto.TYPE
raise ValueError("Invalid path {}".format(check_path))
if check_path.exists():
LOGGER.warning("{} already exists, it will be overwritten".format(check_path))
ckp = crypto.PKey()
LOGGER.info(
"Generating {} keypair of size {}, this will take a moment".format(KTYPE_NAMES.get(ktype, "unknown"), ksize)
"Generating {} keypair of size {}, this will take a moment".format(ktype, ksize)
)
ckp.generate_key(ktype, ksize)
if ktype == "RSA":
ckp = rsa.generate_private_key(public_exponent=65537, key_size=ksize)
else:
raise NotImplementedError(f"Key type {ktype} not supported")
LOGGER.info("Keygen done")
privkeypath.write_bytes(crypto.dump_privatekey(crypto.FILETYPE_PEM, ckp))
privkeypath.write_bytes(ckp.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
))
privkeypath.chmod(stat.S_IRUSR) # read-only to the owner, others get nothing
pubkeypath.write_bytes(crypto.dump_publickey(crypto.FILETYPE_PEM, ckp))
pubkeypath.write_bytes(ckp.public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
))
pubkeypath.chmod(stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) # everyone can read
LOGGER.info("Wrote {} and {}".format(privkeypath, pubkeypath))
return ckp


async def async_create_keypair(
privkeypath: Path, pubkeypath: Path, ktype: int = crypto.TYPE_RSA, ksize: int = 4096
) -> crypto.PKey:
privkeypath: Path, pubkeypath: Path, ktype: str = "RSA", ksize: int = 4096
) -> rsa.RSAPrivateKey:
"""Async wrapper for create_keypair see it for details"""
return await asyncio.get_event_loop().run_in_executor(None, create_keypair, privkeypath, pubkeypath, ktype, ksize)


def sign_and_write_csrfile(req: crypto.X509Req, keypair: crypto.PKey, csrpath: Path, digest: str = "sha265") -> str:
def sign_and_write_csrfile(builder: x509.CertificateSigningRequestBuilder, keypair: rsa.RSAPrivateKey, csrpath: Path, digest: str) -> str:
"""internal helper to be more DRY, returns the PEM"""
req.set_pubkey(keypair)
req.sign(keypair, digest)
csrpath.write_bytes(crypto.dump_certificate_request(crypto.FILETYPE_PEM, req))
csr = builder.sign(keypair, HASHER_MAP[digest.lower()]())
csr_pem = csr.public_bytes(serialization.Encoding.PEM)
csrpath.write_bytes(csr_pem)
csrpath.chmod(stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) # everyone can read
LOGGER.info("Wrote {}".format(csrpath))
return csrpath.read_text(encoding="utf-8")
return csr_pem.decode("utf-8")


def _build_san_names(names: Sequence[str]):
san_names = []
for name in names:
if name.startswith("IP:"):
san_names.append(x509.IPAddress(ip_address(name[3:])))
else:
dns_name = name[4:] if name.startswith("DNS:") else name
san_names.append(x509.DNSName(dns_name))
return san_names


def _build_csr_builder(subject_name: x509.Name, extended_usages) -> x509.CertificateSigningRequestBuilder:
return (
x509.CertificateSigningRequestBuilder()
.subject_name(subject_name)
.add_extension(
x509.KeyUsage(
digital_signature=True,
content_commitment=True,
key_encipherment=True,
data_encipherment=False,
key_agreement=False,
key_cert_sign=False,
crl_sign=False,
encipher_only=False,
decipher_only=False,
),
critical=True,
)
.add_extension(
x509.ExtendedKeyUsage(extended_usages),
critical=True,
)
)


def create_client_csr(keypair: crypto.PKey, csrpath: Path, reqdn: Mapping[str, str], digest: str = "sha256") -> str:
def create_client_csr(keypair: rsa.RSAPrivateKey, csrpath: Path, reqdn: Mapping[str, str], digest: str = "sha256") -> str:
"""Generate CSR file with clientAuth extended usage, returns the PEM encoded contents
reqdn must contain only names that are valid for X509Name, for example::
{ "CN": "me.pvarki.fi" }
"""

req = crypto.X509Req()
sub = req.get_subject()
for name, value in reqdn.items():
setattr(sub, name, value)
req.add_extensions(
[
crypto.X509Extension(b"keyUsage", True, b"digitalSignature,nonRepudiation,keyEncipherment"),
crypto.X509Extension(b"extendedKeyUsage", True, b"clientAuth"),
]
name = x509.Name([x509.NameAttribute(_NAME_TO_NAMEOID[key], value) for key, value in reqdn.items()])
req = _build_csr_builder(
subject_name=name,
extended_usages=[x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH],
)
return sign_and_write_csrfile(req, keypair, csrpath, digest)


async def async_create_client_csr(
keypair: crypto.PKey, csrpath: Path, reqdn: Mapping[str, str], digest: str = "sha256"
keypair: rsa.RSAPrivateKey, csrpath: Path, reqdn: Mapping[str, str], digest: str = "sha256"
) -> str:
"""Async wrapper for create_client_csr see it for details"""
return await asyncio.get_event_loop().run_in_executor(None, create_client_csr, keypair, csrpath, reqdn, digest)


def create_server_csr(keypair: crypto.PKey, csrpath: Path, names: Sequence[str], digest: str = "sha256") -> str:
def create_server_csr(keypair: rsa.RSAPrivateKey, csrpath: Path, names: Sequence[str], digest: str = "sha256") -> str:
"""Generate CSR file with serverAuth extended usage, returns the PEM encoded contents
First name will go to CN, all names will go to subjectAltNames
If the value in names does not start with DNS: or IP: DNS: will be prepended, the first name will be used
for CN and MUST NOT contain a prefix (thus it SHOULD be a dns name)
"""

req = crypto.X509Req()
req.get_subject().CN = names[0]
sannames = []
for name in names:
if name.startswith("IP:") or name.startswith("DNS:"):
sannames.append(name)
else:
sannames.append(f"DNS:{name}")
sanbytes = ", ".join(sannames).encode("utf-8")
req.add_extensions(
[
crypto.X509Extension(b"keyUsage", True, b"digitalSignature,nonRepudiation,keyEncipherment"),
crypto.X509Extension(b"extendedKeyUsage", True, b"serverAuth"),
crypto.X509Extension(b"subjectAltName", False, sanbytes),
]
name = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, names[0])])
req = _build_csr_builder(
subject_name=name,
extended_usages=[x509.oid.ExtendedKeyUsageOID.SERVER_AUTH],
)
req = req.add_extension(
x509.SubjectAlternativeName(_build_san_names(names)),
critical=False,
)

return sign_and_write_csrfile(req, keypair, csrpath, digest)



async def async_create_server_csr(
keypair: crypto.PKey, csrpath: Path, names: Sequence[str], digest: str = "sha256"
keypair: rsa.RSAPrivateKey, csrpath: Path, names: Sequence[str], digest: str = "sha256"
) -> str:
"""Async wrapper for create_server_csr see it for details"""
return await asyncio.get_event_loop().run_in_executor(None, create_server_csr, keypair, csrpath, names, digest)

0 comments on commit dd35fda

Please sign in to comment.