Skip to content

Save Y updates using YStore #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 66 additions & 10 deletions jupyter_server_ydoc/ydoc.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
# Distributed under the terms of the Modified BSD License.

import asyncio
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Optional, Tuple

from jupyter_server.base.handlers import JupyterHandler
Expand All @@ -10,36 +12,63 @@
from tornado import web
from tornado.websocket import WebSocketHandler
from ypy_websocket.websocket_server import WebsocketServer, YRoom # type: ignore
from ypy_websocket.ystore import ( # type: ignore
BaseYStore,
SQLiteYStore,
TempFileYStore,
YDocNotFound,
)

YFILE = YDOCS["file"]

RENAME_SESSION = 127


class JupyterTempFileYStore(TempFileYStore):
prefix_dir = "jupyter_ystore_"


class JupyterSQLiteYStore(SQLiteYStore):
db_path = ".jupyter_ystore.db"


class JupyterRoom(YRoom):
def __init__(self, type):
super().__init__(ready=False)
def __init__(self, type: str, ystore: BaseYStore):
super().__init__(ready=False, ystore=ystore)
self.type = type
self.cleaner = None
self.watcher = None
self.cleaner: Optional[asyncio.Task[Any]] = None
self.watcher: Optional[asyncio.Task[Any]] = None
self.document = YDOCS.get(type, YFILE)(self.ydoc)


async def metadata_callback() -> bytes:
# the current datetime will be stored in metadata as bytes
# it can be retrieved as:
# datetime.fromisoformat(metadata.decode())
return datetime.utcnow().isoformat().encode()


class JupyterWebsocketServer(WebsocketServer):

rooms: Dict[str, JupyterRoom]

def __init__(self, *args, **kwargs):
self.ystore_class = kwargs.pop("ystore_class")
super().__init__(*args, **kwargs)

def get_room(self, path: str) -> JupyterRoom:
file_format, file_type, file_path = path.split(":", 2)
if path not in self.rooms.keys():
self.rooms[path] = JupyterRoom(file_type)
p = Path(file_path)
updates_file_path = str(p.parent / f".{file_type}:{p.name}.y")
ystore = self.ystore_class(path=updates_file_path, metadata_callback=metadata_callback)
self.rooms[path] = JupyterRoom(file_type, ystore)
return self.rooms[path]


class YDocWebSocketHandler(WebSocketHandler, JupyterHandler):

saving_document: Optional[asyncio.Task[Any]]
websocket_server = JupyterWebsocketServer(rooms_ready=False, auto_clean_rooms=False)
websocket_server: Optional[JupyterWebsocketServer] = None
_message_queue: asyncio.Queue[Any]

# Override max_message size to 1GB
Expand All @@ -59,6 +88,7 @@ async def __anext__(self):
return message

def get_file_info(self) -> Tuple[str, str, str]:
assert self.websocket_server is not None
room_name = self.websocket_server.get_room_name(self.room)
file_format: str
file_type: str
Expand All @@ -67,6 +97,7 @@ def get_file_info(self) -> Tuple[str, str, str]:
return file_format, file_type, file_path

def set_file_info(self, value: str) -> None:
assert self.websocket_server is not None
self.websocket_server.rename_room(value, from_room=self.room)
self.path = value # needed to be compatible with WebsocketServer (websocket.path)

Expand All @@ -77,7 +108,13 @@ async def get(self, *args, **kwargs):
return await super().get(*args, **kwargs)

async def open(self, path):
self.ystore_class = self.settings["collaborative_ystore_class"]
if self.websocket_server is None:
YDocWebSocketHandler.websocket_server = JupyterWebsocketServer(
rooms_ready=False, auto_clean_rooms=False, ystore_class=self.ystore_class
)
self._message_queue = asyncio.Queue()
assert self.websocket_server is not None
self.room = self.websocket_server.get_room(path)
self.set_file_info(path)
self.saving_document = None
Expand All @@ -95,7 +132,20 @@ async def open(self, path):
self.last_modified = model["last_modified"]
# check again if ready, because loading the file can be async
if not self.room.ready:
self.room.document.source = model["content"]
# try to apply Y updates from the YStore for this document
try:
await self.room.ystore.apply_updates(self.room.ydoc)
read_from_source = False
except YDocNotFound:
# YDoc not found in the YStore, create the document from the source file (no change history)
read_from_source = True
if not read_from_source:
# if YStore updates and source file are out-of-sync, resync updates with source
if self.room.document.source != model["content"]:
read_from_source = True
if read_from_source:
self.room.document.source = model["content"]
await self.room.ystore.encode_state_as_update(self.room.ydoc)
self.room.document.dirty = False
self.room.ready = True
self.room.watcher = asyncio.create_task(self.watch_file())
Expand Down Expand Up @@ -138,6 +188,7 @@ def on_message(self, message):
# The client moved the document to a different location. After receiving this message, we make the current document available under a different url.
# The other clients are automatically notified of this change because the path is shared through the Yjs document as well.
self.set_file_info(message[1:].decode("utf-8"))
assert self.websocket_server is not None
self.websocket_server.rename_room(self.path, from_room=self.room)
# send rename acknowledge
self.write_message(bytes([RENAME_SESSION, 1]), binary=True)
Expand All @@ -155,9 +206,10 @@ async def clean_room(self) -> None:
if seconds is None:
return
await asyncio.sleep(seconds)
if self.room.watcher:
if self.room.watcher is not None:
self.room.watcher.cancel()
self.room.document.unobserve()
assert self.websocket_server is not None
self.websocket_server.delete_room(room=self.room)

def on_document_change(self, event):
Expand All @@ -181,7 +233,11 @@ def on_document_change(self, event):
async def maybe_save_document(self):
# save after 1 second of inactivity to prevent too frequent saving
await asyncio.sleep(1)
file_format, file_type, file_path = self.get_file_info()
# if the room cannot be found, don't save
try:
file_format, file_type, file_path = self.get_file_info()
except Exception:
return
model = await ensure_async(
self.contents_manager.get(file_path, type=file_type, format=file_format)
)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ classifiers = [
requires-python = ">=3.7"
dependencies = [
"jupyter_ydoc==0.1.13",
"ypy-websocket>=0.1.13"
"ypy-websocket>=0.2.0"
]

[[project.authors]]
Expand Down