From 0207d5708b9bc3bb1e657d99ad866552bf1cfa98 Mon Sep 17 00:00:00 2001 From: Eero af Heurlin Date: Sat, 10 Feb 2024 14:27:12 +0200 Subject: [PATCH] implement wrapper for signing certs via CLI since even *that* CFSSLs own REST API does not actually do the right way --- src/ocsprest/config.py | 5 ++++ src/ocsprest/routes.py | 57 ++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 60 insertions(+), 2 deletions(-) diff --git a/src/ocsprest/config.py b/src/ocsprest/config.py index 7be0417..65e7b78 100644 --- a/src/ocsprest/config.py +++ b/src/ocsprest/config.py @@ -28,6 +28,11 @@ class RESTConfig(BaseSettings): rootcakey: Path = Field( alias="RUN_CA_KEY", description="root CA key to use in commands", default="/data/persistent/init_ca-key.pem" ) + conf: Path = Field( + alias="RUN_CA_CFSSL_CONF", + description="Path to the db config file", + default="/data/persistent/root_ca_cfssl.json", + ) dbconf: Path = Field( alias="RUN_DB_CONFIG", description="Path to the db config file", default="/data/persistent/db.json" ) diff --git a/src/ocsprest/routes.py b/src/ocsprest/routes.py index 9ed419d..300949a 100644 --- a/src/ocsprest/routes.py +++ b/src/ocsprest/routes.py @@ -35,8 +35,61 @@ async def refresh_all(request: Request) -> Dict[str, Any]: return {"success": True} -@ROUTER.post("/sign") -async def sign_one(request: Request) -> Dict[str, Any]: +@ROUTER.post("/csr/sign") +async def csr_sign(request: Request) -> Dict[str, Any]: + """Sign a CSR, we have to do it via CLI if we want CFSSL to add the Authority Information Access properties""" + cnf = RESTConfig().singleton() + data = await request.json() + csrtmp = Path(tempfile.gettempdir()) / f"{uuid.uuid4()}.csr" + certtmp = csrtmp.with_suffix(".crt") + try: + csrtmp.write_text(data["certificate_request"]) + args_sign: List[str] = [ + str(cnf.cfssl), + "sign", + f"-config {cnf.conf}", + f"-db-config {cnf.dbconf}", + f"-ca {cnf.cacrt}", + f"-ca-key {cnf.cakey}", + f"-profile {data['profile']}", + f"-csr {csrtmp}", + f"-loglevel {cfssl_loglevel()}", + ] + cmd_sign = " ".join(args_sign) + ret_sign, out_sign, _ = await call_cmd(cmd_sign) + if ret_sign != 0: + raise HTTPException( + status_code=500, + detail={"success": False, "error": f"CFSSL CLI call to sign failed, code {ret_sign}. See server logs"}, + ) + resp_sign = json.loads(out_sign) + if not data.get("bundle", True): + return {"result": {"certificate": resp_sign["cert"].replace("\n", "\\n")}} + # Create the bundle + certtmp.write_text(resp_sign["cert"]) + args_bundle: List[str] = [ + str(cnf.cfssl), + "bundle", + f"-ca-bundle {cnf.rootcacrt}", + f"-int-bundle {cnf.cacrt}", + "-flavor optimal", + f"-cert {certtmp}", + f"-loglevel {cfssl_loglevel()}", + ] + cmd_bundle = " ".join(args_bundle) + ret_bundle, out_bundle, _ = await call_cmd(cmd_bundle) + if ret_bundle != 0: + LOGGER.error("CFSSL CLI call to bundle failed, returning the signed cert anyway") + return {"result": {"certificate": resp_sign["cert"].replace("\n", "\\n")}} + resp_bundle = json.loads(out_bundle) + return {"result": {"certificate": resp_bundle["cert"].replace("\n", "\\n")}} + finally: + csrtmp.unlink() + certtmp.unlink() + + +@ROUTER.post("/ocsp/sign") +async def ocsp_sign_one(request: Request) -> Dict[str, Any]: """calls cfssl ocspsign""" data = await request.json() certtmp = Path(tempfile.gettempdir()) / f"{uuid.uuid4()}.pem"