Skip to content

Python SDK: Allow implicit merge-fragments/signals events and decorator for iterator responses #809

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions examples/python/fastapi/app.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import asyncio
from datetime import datetime

import uvicorn
from fastapi import FastAPI
from fastapi.responses import HTMLResponse

from datastar_py.fastapi import DatastarStreamingResponse, ServerSentEventGenerator
from datastar_py.starlette import sse_generator

app = FastAPI()

Expand All @@ -15,7 +17,7 @@
<head>
<title>DATASTAR on FastAPI</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0"/>
<script type="module" src="https://cdn.jsdelivr.net/gh/starfederation/datastar/bundles/datastar.js"></script>
<script type="module" src="https://cdn.jsdelivr.net/gh/starfederation/datastar@v1.0.0-beta.11/bundles/datastar.js"></script>
<style>
html, body { height: 100%; width: 100%; }
body { background-image: linear-gradient(to right bottom, oklch(0.424958 0.052808 253.972015), oklch(0.189627 0.038744 264.832977)); }
Expand All @@ -29,7 +31,7 @@
<div class="container">
<div
class="time"
data-on-load="@get('/updates')"
data-on-load="@get('/updates3')"
>
Current time from fragment: <span id="currentTime">CURRENT_TIME</span>
</div>
Expand Down Expand Up @@ -64,3 +66,35 @@ async def time_updates():
@app.get("/updates")
async def updates():
return DatastarStreamingResponse(time_updates())


# This is identical to the /updates endpoint, but uses a convenience decorator
@app.get("/updates2")
@sse_generator
async def updates2():
while True:
yield ServerSentEventGenerator.merge_fragments(
f"""<span id="currentTime">{datetime.now().isoformat()}"""
)
await asyncio.sleep(1)
yield ServerSentEventGenerator.merge_signals({"currentTime": f"{datetime.now().isoformat()}"})
await asyncio.sleep(1)


# This is also identical, but yielding a string of fragments automatically calls merge_fragments
# and dicts automatically calls merge_signals
@app.get("/updates3")
# Wraps the resulting async generator in a DatastarStreamingResponse
@sse_generator
async def updates3():
while True:
# Implicit merge_fragments
yield f"""<span id="currentTime">{datetime.now().isoformat()}"""
await asyncio.sleep(1)
# Implicit merge_signals
yield {"currentTime": f"{datetime.now().isoformat()}"}
await asyncio.sleep(1)


if __name__ == '__main__':
uvicorn.run(app)
21 changes: 8 additions & 13 deletions examples/python/quart/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from quart import Quart

from datastar_py.quart import make_datastar_response, ServerSentEventGenerator
from datastar_py.quart import make_datastar_response, ServerSentEventGenerator, sse_generator

app = Quart(__name__)

Expand All @@ -13,7 +13,7 @@
<head>
<title>DATASTAR on Quart</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0"/>
<script type="module" src="https://cdn.jsdelivr.net/gh/starfederation/datastar/bundles/datastar.js"></script>
<script type="module" src="https://cdn.jsdelivr.net/gh/starfederation/datastar@v1.0.0-beta.11/bundles/datastar.js"></script>
<style>
html, body { height: 100%; width: 100%; }
body { background-image: linear-gradient(to right bottom, oklch(0.424958 0.052808 253.972015), oklch(0.189627 0.038744 264.832977)); }
Expand Down Expand Up @@ -43,18 +43,13 @@


@app.route("/updates")
@sse_generator
async def updates():
async def time_updates():
while True:
yield ServerSentEventGenerator.merge_fragments(
f"""<span id="currentTime">{datetime.now().isoformat()}"""
)
await asyncio.sleep(1)
yield ServerSentEventGenerator.merge_signals({"currentTime": f"{datetime.now().isoformat()}"})
await asyncio.sleep(1)

response = await make_datastar_response(time_updates())
return response
while True:
yield f"""<span id="currentTime">{datetime.now().isoformat()}"""
await asyncio.sleep(1)
yield {"currentTime": f"{datetime.now().isoformat()}"}
await asyncio.sleep(1)


@app.route("/")
Expand Down
16 changes: 13 additions & 3 deletions sdk/python/src/datastar_py/django.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
import typing
from functools import wraps

from django.http import StreamingHttpResponse as _StreamingHttpResponse

from .sse import SSE_HEADERS, ServerSentEventGenerator
from .sse import SSE_HEADERS, ServerSentEventGenerator, _sse_iterable_wrapper


class DatastarStreamingHttpResponse(_StreamingHttpResponse):
@wraps(_StreamingHttpResponse.__init__)
def __init__(self, *args, **kwargs):
def __init__(self, streaming_content=(), *args, **kwargs):
kwargs["headers"] = {**SSE_HEADERS, **kwargs.get("headers", {})}
super().__init__(*args, **kwargs)
streaming_content = _sse_iterable_wrapper(streaming_content)
super().__init__(streaming_content, *args, **kwargs)


def sse_generator(generator_func):
@wraps(generator_func)
def _wrapper(*args, **kwargs):
content = generator_func(*args, **kwargs)
return DatastarStreamingHttpResponse(content)
return _wrapper
2 changes: 1 addition & 1 deletion sdk/python/src/datastar_py/fastapi.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from .starlette import DatastarStreamingResponse, ServerSentEventGenerator
from .starlette import DatastarStreamingResponse, ServerSentEventGenerator, sse_generator
2 changes: 1 addition & 1 deletion sdk/python/src/datastar_py/fasthtml.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
from .sse import ServerSentEventGenerator
from .starlette import DatastarStreamingResponse
from .starlette import DatastarStreamingResponse, sse_generator
13 changes: 11 additions & 2 deletions sdk/python/src/datastar_py/quart.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
from functools import wraps

from quart import make_response as _make_response

from .sse import ServerSentEventGenerator, SSE_HEADERS
from .sse import ServerSentEventGenerator, SSE_HEADERS, _sse_iterable_wrapper


async def make_datastar_response(async_generator):
response = await _make_response(async_generator, SSE_HEADERS)
response = await _make_response(_sse_iterable_wrapper(async_generator), SSE_HEADERS)
response.timeout = None
return response


def sse_generator(generator_func):
@wraps(generator_func)
async def _wrapper(*args, **kwargs):
return await make_datastar_response(generator_func(*args, **kwargs))
return _wrapper
27 changes: 22 additions & 5 deletions sdk/python/src/datastar_py/sanic.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,28 @@
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Optional, AnyStr

from .sse import SSE_HEADERS, ServerSentEventGenerator
from sanic.response import HTTPResponse as _HTTPResponse

from .sse import SSE_HEADERS, ServerSentEventGenerator, _wrap_event

if TYPE_CHECKING:
from sanic import Request, HTTPResponse
from sanic import Request


class DatastarResponse(_HTTPResponse):
def __init__(self, *args, **kwargs):
kwargs["headers"] = {**SSE_HEADERS, **kwargs.get("headers", {})}
super().__init__(*args, **kwargs)

async def send(
self,
data: Optional[AnyStr] = None,
end_stream: Optional[bool] = None,
) -> None:
data = _wrap_event(data)
await super().send(data, end_stream)


async def datastar_respond(request: "Request") -> "HTTPResponse":
response = await request.respond(headers=SSE_HEADERS)
# TODO: Deprecate this in favor of `request.respond(DatastarResponse())`?
async def datastar_respond(request: "Request", *args, **kwargs) -> "_HTTPResponse":
response = await request.respond(DatastarResponse(*args, **kwargs))
return response
41 changes: 40 additions & 1 deletion sdk/python/src/datastar_py/sse.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
from __future__ import annotations

import json
from itertools import chain
from typing import Optional, Protocol, Union, runtime_checkable
from typing import (
Optional,
Protocol,
Union,
runtime_checkable,
Callable,
AsyncIterable,
TypeVar,
Iterable,
)

import datastar_py.consts as consts

Expand Down Expand Up @@ -176,3 +187,31 @@ def execute_script(
@classmethod
def redirect(cls, location: str):
return cls.execute_script(f"setTimeout(() => window.location = '{location}')")


def _wrap_event(event: str | _HtmlProvider | dict) -> str:
if isinstance(event, _HtmlProvider) or (
isinstance(event, str) and event.startswith("<")
):
return ServerSentEventGenerator.merge_fragments(event)
elif isinstance(event, dict):
return ServerSentEventGenerator.merge_signals(event)
else:
return event


async def _async_map(func: Callable, async_iter: AsyncIterable) -> AsyncIterable:
async for item in async_iter:
yield func(item)


SyncOrAsyncIterable = TypeVar("SyncOrAsyncIterable", AsyncIterable, Iterable)


def _sse_iterable_wrapper(iterable: SyncOrAsyncIterable) -> SyncOrAsyncIterable:
"""Wraps an iterable to allow implicitly turning fragments and dictionaries
into merge-fragments and merge-signals events."""
if isinstance(iterable, AsyncIterable):
return _async_map(_wrap_event, iterable)
else:
return map(_wrap_event, iterable)
19 changes: 16 additions & 3 deletions sdk/python/src/datastar_py/starlette.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,25 @@
import typing
from functools import wraps

from starlette.responses import StreamingResponse as _StreamingResponse

from .sse import SSE_HEADERS, ServerSentEventGenerator
from .sse import SSE_HEADERS, ServerSentEventGenerator, _wrap_event, _async_map, _sse_iterable_wrapper

if typing.TYPE_CHECKING:
from starlette.responses import ContentStream


class DatastarStreamingResponse(_StreamingResponse):
@wraps(_StreamingResponse.__init__)
def __init__(self, *args, **kwargs):
def __init__(self, content: ContentStream, *args, **kwargs):
kwargs["headers"] = {**SSE_HEADERS, **kwargs.get("headers", {})}
super().__init__(*args, **kwargs)
content = _sse_iterable_wrapper(content)
super().__init__(content, *args, **kwargs)


def sse_generator(generator_func):
@wraps(generator_func)
def _wrapper(*args, **kwargs):
content = generator_func(*args, **kwargs)
return DatastarStreamingResponse(content)
return _wrapper