Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions dev/e2e/faker/actor.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,45 @@
from apsig import KeyUtil

from apmodel import Person, Multikey
from apmodel.security.cryptographickey import CryptographicKey

def fake(publicKeys: dict):
keyutl = KeyUtil(private_key=publicKeys["ed25519-key"])
return {
"""
CryptographicKey(
id="https://apsig.amase.cc/actor#main-key",
controller="https://apsig.amase.cc/actor",
owner="https://apsig.amase.cc/actor",
publicKeyPem=publicKeys["publicKeyPem"].decode("utf-8"),
)
"""
p = Person(
id="https://apsig.amase.cc/actor",
inbox="https://apsig.amase.cc/actor/inbox",
outbox="https://apsig.amase.cc/actor/outbox",
publicKey={
"id": "https://apsig.amase.cc/actor#main-key",
"controller": "https://apsig.amase.cc/actor",
"owner": "https://apsig.amase.cc/actor",
"publicKeyPem": publicKeys["publicKeyPem"].decode("utf-8"),
"type": "CryptographicKey"
},
assertionMethod=[
Multikey(
id="https://apsig.amase.cc/actor#ed25519-key",
controller="https://apsig.amase.cc/actor",
publicKeyMultibase=keyutl.encode_multibase()
)
],
preferredUsername="apsig_dev",
name="APSig Test Actor",
summary="testing purposes only, don't use on production environment!",
url="https://apsig.amase.cc/actor"
)
return p.to_dict()
{
"@context": [
"https://www.w3.org/ns/activitystreams",
"https://w3id.org/security"
"https://w3id.org/security/v1",
"https://w3id.org/security/data-integrity/v1",
"https://www.w3.org/ns/did/v1",
Expand Down
2 changes: 1 addition & 1 deletion dev/e2e/server_proof.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ async def note():
@app.get("/send")
async def send(request: Request):
user_pin = request.query.get("pin")
if user_pin != pin:
if int(user_pin) != pin:
return {"error": "Missing Permission"}
url = request.query.get("url")
if url is None:
Expand Down
109 changes: 109 additions & 0 deletions dev/e2e/server_rfc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import asyncio
import datetime
from pprint import pprint
import random
import json

import aiohttp
import uvicorn
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ed25519, rsa
from faker.actor import fake
from yarl import URL
from notturno import Notturno
from notturno.models.request import Request

from apsig.rfc9421 import RFC9421Signer

app = Notturno()
ed_privatekey = ed25519.Ed25519PrivateKey.generate()
rsa_privatekey = rsa.generate_private_key(
public_exponent=65537, key_size=2048, backend=default_backend()
)
actor_obj = fake(
{
"ed25519-key": ed_privatekey,
"publicKeyPem": rsa_privatekey.public_key().public_bytes(
serialization.Encoding.PEM, serialization.PublicFormat.SubjectPublicKeyInfo
),
}
)
now = datetime.datetime.now().isoformat(sep="T", timespec="seconds") + "Z"


@app.get("/actor")
async def actor():
return actor_obj


@app.post("/inbox")
async def inbox(request: Request):
print(request.body)


@app.get("/note")
async def note():
return {
"@context": "https://www.w3.org/ns/activitystreams",
"type": "Note",
"id": "https://apsig.amase.cc/note",
"attributedTo": "https://apsig.amase.cc/actor",
"content": "Hello world",
"published": now,
"to": [
"https://www.w3.org/ns/activitystreams#Public",
],
}


@app.get("/send")
async def send(request: Request):
user_pin = int(request.query.get("pin", b'0'))
if user_pin != pin:
return {"error": "Missing Permission"}
url = request.query.get("url", b'None').decode()

if url == "None":
return {"error": "url is required"}
#await asyncio.sleep(3)
#return {"resp": "Failed to verify the request signature.", "status": 401}
async with aiohttp.ClientSession() as session:
body = {
"@context": [
"https://www.w3.org/ns/activitystreams",
"https://w3id.org/security/data-integrity/v1",
],
"id": "https://apsig.amase.cc/note",
"actor": "https://apsig.amase.cc/actor",
"type": "Create",
"object": {
"id": "https://apsig.amase.cc/note",
"type": "Note",
"attributedTo": "https://apsig.amase.cc/actor",
"content": "Hello world",
},
}
target = URL(url)
signer = RFC9421Signer(private_key=ed_privatekey, key_id="https://apsig.amase.cc/actor#ed25519-key")
signed = signer.sign(body=body, method="POST", path=target.path, host=target.host + ":" + str(target.port) if target.port is not None else "", headers={"Content-Type": "application/activity+json"})
with open("./test.json", "w") as f:
json.dump(signed, f)

pprint(signed)
async with session.post(
url,
json=body,
headers=signed,
) as resp:
text = await resp.text()
status = resp.status
print(text)
print(status)
return {"resp": text, "status": status}


pin = random.randint(1000, 9999)
#pin = 1751
print("Server Pin is: " + str(pin))
uvicorn.run(app, host="0.0.0.0")
Loading
Loading