summaryrefslogtreecommitdiffstats
path: root/core/signatures.py
diff options
context:
space:
mode:
authorAndrew Godwin2022-11-12 15:10:15 -0700
committerAndrew Godwin2022-11-12 15:10:15 -0700
commitdd4328ae523bb375dd871e85d1bacd9311e87a89 (patch)
tree6a4ec8bc83be3bdd18421b3f0c221b7a6091cf9e /core/signatures.py
parent8fd5a9292c7d3aac352d3c0e96288bff8a79cb47 (diff)
downloadtakahe-dd4328ae523bb375dd871e85d1bacd9311e87a89.tar.gz
takahe-dd4328ae523bb375dd871e85d1bacd9311e87a89.tar.bz2
takahe-dd4328ae523bb375dd871e85d1bacd9311e87a89.zip
Add JSON-LD signatures and tests for sig stuff
Diffstat (limited to 'core/signatures.py')
-rw-r--r--core/signatures.py190
1 files changed, 178 insertions, 12 deletions
diff --git a/core/signatures.py b/core/signatures.py
index 27e7f7d..74b5324 100644
--- a/core/signatures.py
+++ b/core/signatures.py
@@ -1,16 +1,33 @@
import base64
import json
-from typing import TYPE_CHECKING, Dict, List, Literal, TypedDict
+from typing import Dict, List, Literal, TypedDict
from urllib.parse import urlparse
import httpx
from cryptography.hazmat.primitives import hashes
from django.http import HttpRequest
-from django.utils.http import http_date
+from django.utils import timezone
+from django.utils.http import http_date, parse_http_date
+from OpenSSL import crypto
+from pyld import jsonld
-# Prevent a circular import
-if TYPE_CHECKING:
- from users.models import Identity
+from core.ld import format_date
+
+
+class VerificationError(BaseException):
+ """
+ There was an error with verifying the signature
+ """
+
+ pass
+
+
+class VerificationFormatError(VerificationError):
+ """
+ There was an error with the format of the signature (not if it is valid)
+ """
+
+ pass
class HttpSignature:
@@ -47,13 +64,13 @@ class HttpSignature:
return "\n".join(f"{name.lower()}: {value}" for name, value in headers.items())
@classmethod
- def parse_signature(cls, signature: str) -> "SignatureDetails":
+ def parse_signature(cls, signature: str) -> "HttpSignatureDetails":
bits = {}
for item in signature.split(","):
name, value = item.split("=", 1)
value = value.strip('"')
bits[name.lower()] = value
- signature_details: SignatureDetails = {
+ signature_details: HttpSignatureDetails = {
"headers": bits["headers"].split(),
"signature": base64.b64decode(bits["signature"]),
"algorithm": bits["algorithm"],
@@ -62,7 +79,7 @@ class HttpSignature:
return signature_details
@classmethod
- def compile_signature(cls, details: "SignatureDetails") -> str:
+ def compile_signature(cls, details: "HttpSignatureDetails") -> str:
value = f'keyId="{details["keyid"]}",headers="'
value += " ".join(h.lower() for h in details["headers"])
value += '",signature="'
@@ -71,11 +88,65 @@ class HttpSignature:
return value
@classmethod
+ def verify_signature(
+ cls,
+ signature: bytes,
+ cleartext: str,
+ public_key: str,
+ ):
+ x509 = crypto.X509()
+ x509.set_pubkey(
+ crypto.load_publickey(
+ crypto.FILETYPE_PEM,
+ public_key.encode("ascii"),
+ )
+ )
+ try:
+ crypto.verify(x509, signature, cleartext.encode("ascii"), "sha256")
+ except crypto.Error:
+ raise VerificationError("Signature mismatch")
+
+ @classmethod
+ def verify_request(cls, request, public_key, skip_date=False):
+ """
+ Verifies that the request has a valid signature for its body
+ """
+ # Verify body digest
+ if "HTTP_DIGEST" in request.META:
+ expected_digest = HttpSignature.calculate_digest(request.body)
+ if request.META["HTTP_DIGEST"] != expected_digest:
+ print("Wrong digest")
+ raise VerificationFormatError("Digest is incorrect")
+ # Verify date header
+ if "HTTP_DATE" in request.META and not skip_date:
+ header_date = parse_http_date(request.META["HTTP_DATE"])
+ if abs(timezone.now().timestamp() - header_date) > 60:
+ print(
+ f"Date mismatch - they sent {header_date}, now is {timezone.now().timestamp()}"
+ )
+ raise VerificationFormatError("Date is too far away")
+ # Get the signature details
+ if "HTTP_SIGNATURE" not in request.META:
+ raise VerificationFormatError("No signature header present")
+ signature_details = cls.parse_signature(request.META["HTTP_SIGNATURE"])
+ # Reject unknown algorithms
+ if signature_details["algorithm"] != "rsa-sha256":
+ raise VerificationFormatError("Unknown signature algorithm")
+ # Create the signature payload
+ headers_string = cls.headers_from_request(request, signature_details["headers"])
+ cls.verify_signature(
+ signature_details["signature"],
+ headers_string,
+ public_key,
+ )
+
+ @classmethod
async def signed_request(
self,
uri: str,
body: Dict,
- identity: "Identity",
+ private_key: str,
+ key_id: str,
content_type: str = "application/json",
method: Literal["post"] = "post",
):
@@ -96,11 +167,20 @@ class HttpSignature:
signed_string = "\n".join(
f"{name.lower()}: {value}" for name, value in headers.items()
)
+ pkey = crypto.load_privatekey(
+ crypto.FILETYPE_PEM,
+ private_key.encode("ascii"),
+ )
+ signature = crypto.sign(
+ pkey,
+ signed_string.encode("ascii"),
+ "sha256",
+ )
headers["Signature"] = self.compile_signature(
{
- "keyid": identity.key_id,
+ "keyid": key_id,
"headers": list(headers.keys()),
- "signature": identity.sign(signed_string),
+ "signature": signature,
"algorithm": "rsa-sha256",
}
)
@@ -120,8 +200,94 @@ class HttpSignature:
return response
-class SignatureDetails(TypedDict):
+class HttpSignatureDetails(TypedDict):
algorithm: str
headers: List[str]
signature: bytes
keyid: str
+
+
+class LDSignature:
+ """
+ Creates and verifies signatures of JSON-LD documents
+ """
+
+ @classmethod
+ def verify_signature(cls, document: Dict, public_key: str) -> None:
+ """
+ Verifies a document
+ """
+ try:
+ # Strip out the signature from the incoming document
+ signature = document.pop("signature")
+ # Create the options document
+ options = {
+ "@context": "https://w3id.org/identity/v1",
+ "creator": signature["creator"],
+ "created": signature["created"],
+ }
+ except KeyError:
+ raise VerificationFormatError("Invalid signature section")
+ if signature["type"].lower() != "rsasignature2017":
+ raise VerificationFormatError("Unknown signature type")
+ # Get the normalised hash of each document
+ final_hash = cls.normalized_hash(options) + cls.normalized_hash(document)
+ # Verify the signature
+ x509 = crypto.X509()
+ x509.set_pubkey(
+ crypto.load_publickey(
+ crypto.FILETYPE_PEM,
+ public_key.encode("ascii"),
+ )
+ )
+ try:
+ crypto.verify(
+ x509,
+ base64.b64decode(signature["signatureValue"]),
+ final_hash,
+ "sha256",
+ )
+ except crypto.Error:
+ raise VerificationError("Signature mismatch")
+
+ @classmethod
+ def create_signature(
+ cls, document: Dict, private_key: str, key_id: str
+ ) -> Dict[str, str]:
+ """
+ Creates the signature for a document
+ """
+ # Create the options document
+ options: Dict[str, str] = {
+ "@context": "https://w3id.org/identity/v1",
+ "creator": key_id,
+ "created": format_date(timezone.now()),
+ }
+ # Get the normalised hash of each document
+ final_hash = cls.normalized_hash(options) + cls.normalized_hash(document)
+ # Create the signature
+ pkey = crypto.load_privatekey(
+ crypto.FILETYPE_PEM,
+ private_key.encode("ascii"),
+ )
+ signature = base64.b64encode(crypto.sign(pkey, final_hash, "sha256"))
+ # Add it to the options document along with other bits
+ options["signatureValue"] = signature.decode("ascii")
+ options["type"] = "RsaSignature2017"
+ return options
+
+ @classmethod
+ def normalized_hash(cls, document) -> bytes:
+ """
+ Takes a JSON-LD document and create a hash of its URDNA2015 form,
+ in the same way that Mastodon does internally.
+
+ Reference: https://socialhub.activitypub.rocks/t/making-sense-of-rsasignature2017/347
+ """
+ norm_form = jsonld.normalize(
+ document,
+ {"algorithm": "URDNA2015", "format": "application/n-quads"},
+ )
+ digest = hashes.Hash(hashes.SHA256())
+ digest.update(norm_form.encode("utf8"))
+ return digest.finalize().hex().encode("ascii")