From c9639f9182f0b9d4f504b48b48e4ffc2dcb92daf Mon Sep 17 00:00:00 2001 From: David Brochart Date: Sun, 10 Jul 2022 22:00:02 +0200 Subject: [PATCH 1/2] Save Y updates using YStore --- jupyter_server_ydoc/ydoc.py | 68 +++++++++++++++++++++++++++++++------ 1 file changed, 58 insertions(+), 10 deletions(-) diff --git a/jupyter_server_ydoc/ydoc.py b/jupyter_server_ydoc/ydoc.py index 26458ed7..91d91ea7 100644 --- a/jupyter_server_ydoc/ydoc.py +++ b/jupyter_server_ydoc/ydoc.py @@ -2,6 +2,7 @@ # Distributed under the terms of the Modified BSD License. import asyncio +from pathlib import Path from typing import Any, Dict, Optional, Tuple from jupyter_server.base.handlers import JupyterHandler @@ -10,18 +11,31 @@ from tornado import web from tornado.websocket import WebSocketHandler from ypy_websocket.websocket_server import WebsocketServer, YRoom # type: ignore +from ypy_websocket.ystore import ( # type: ignore + BaseYStore, + SQLiteYStore, + TempFileYStore, + YDocNotFound, +) YFILE = YDOCS["file"] - RENAME_SESSION = 127 +class JupyterTempFileYStore(TempFileYStore): + prefix_dir = "jupyter_ystore_" + + +class JupyterSQLiteYStore(SQLiteYStore): + db_path = ".jupyter_ystore.db" + + class JupyterRoom(YRoom): - def __init__(self, type): - super().__init__(ready=False) + def __init__(self, type: str, ystore: BaseYStore): + super().__init__(ready=False, ystore=ystore) self.type = type - self.cleaner = None - self.watcher = None + self.cleaner: Optional[asyncio.Task[Any]] = None + self.watcher: Optional[asyncio.Task[Any]] = None self.document = YDOCS.get(type, YFILE)(self.ydoc) @@ -29,17 +43,24 @@ class JupyterWebsocketServer(WebsocketServer): rooms: Dict[str, JupyterRoom] + def __init__(self, *args, **kwargs): + self.ystore_class = kwargs.pop("ystore_class") + super().__init__(*args, **kwargs) + def get_room(self, path: str) -> JupyterRoom: file_format, file_type, file_path = path.split(":", 2) if path not in self.rooms.keys(): - self.rooms[path] = JupyterRoom(file_type) + p = Path(file_path) + updates_file_path = str(p.parent / f".{file_type}:{p.name}.y") + ystore = self.ystore_class(path=updates_file_path) + self.rooms[path] = JupyterRoom(file_type, ystore) return self.rooms[path] class YDocWebSocketHandler(WebSocketHandler, JupyterHandler): saving_document: Optional[asyncio.Task[Any]] - websocket_server = JupyterWebsocketServer(rooms_ready=False, auto_clean_rooms=False) + websocket_server: Optional[JupyterWebsocketServer] = None _message_queue: asyncio.Queue[Any] # Override max_message size to 1GB @@ -59,6 +80,7 @@ async def __anext__(self): return message def get_file_info(self) -> Tuple[str, str, str]: + assert self.websocket_server is not None room_name = self.websocket_server.get_room_name(self.room) file_format: str file_type: str @@ -67,6 +89,7 @@ def get_file_info(self) -> Tuple[str, str, str]: return file_format, file_type, file_path def set_file_info(self, value: str) -> None: + assert self.websocket_server is not None self.websocket_server.rename_room(value, from_room=self.room) self.path = value # needed to be compatible with WebsocketServer (websocket.path) @@ -77,7 +100,13 @@ async def get(self, *args, **kwargs): return await super().get(*args, **kwargs) async def open(self, path): + self.ystore_class = self.settings["collaborative_ystore_class"] + if self.websocket_server is None: + YDocWebSocketHandler.websocket_server = JupyterWebsocketServer( + rooms_ready=False, auto_clean_rooms=False, ystore_class=self.ystore_class + ) self._message_queue = asyncio.Queue() + assert self.websocket_server is not None self.room = self.websocket_server.get_room(path) self.set_file_info(path) self.saving_document = None @@ -95,7 +124,20 @@ async def open(self, path): self.last_modified = model["last_modified"] # check again if ready, because loading the file can be async if not self.room.ready: - self.room.document.source = model["content"] + # try to apply Y updates from the YStore for this document + try: + await self.room.ystore.apply_updates(self.room.ydoc) + read_from_source = False + except YDocNotFound: + # YDoc not found in the YStore, create the document from the source file (no change history) + read_from_source = True + if not read_from_source: + # if YStore updates and source file are out-of-sync, resync updates with source + if self.room.document.source != model["content"]: + read_from_source = True + if read_from_source: + self.room.document.source = model["content"] + await self.room.ystore.encode_state_as_update(self.room.ydoc) self.room.document.dirty = False self.room.ready = True self.room.watcher = asyncio.create_task(self.watch_file()) @@ -138,6 +180,7 @@ def on_message(self, message): # The client moved the document to a different location. After receiving this message, we make the current document available under a different url. # The other clients are automatically notified of this change because the path is shared through the Yjs document as well. self.set_file_info(message[1:].decode("utf-8")) + assert self.websocket_server is not None self.websocket_server.rename_room(self.path, from_room=self.room) # send rename acknowledge self.write_message(bytes([RENAME_SESSION, 1]), binary=True) @@ -155,9 +198,10 @@ async def clean_room(self) -> None: if seconds is None: return await asyncio.sleep(seconds) - if self.room.watcher: + if self.room.watcher is not None: self.room.watcher.cancel() self.room.document.unobserve() + assert self.websocket_server is not None self.websocket_server.delete_room(room=self.room) def on_document_change(self, event): @@ -181,7 +225,11 @@ def on_document_change(self, event): async def maybe_save_document(self): # save after 1 second of inactivity to prevent too frequent saving await asyncio.sleep(1) - file_format, file_type, file_path = self.get_file_info() + # if the room cannot be found, don't save + try: + file_format, file_type, file_path = self.get_file_info() + except Exception: + return model = await ensure_async( self.contents_manager.get(file_path, type=file_type, format=file_format) ) From 39fdf5c61e35b6846e7ac446124a3f8782a8df68 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Wed, 13 Jul 2022 12:13:15 +0200 Subject: [PATCH 2/2] Store timestamp in metadata --- jupyter_server_ydoc/ydoc.py | 10 +++++++++- pyproject.toml | 2 +- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/jupyter_server_ydoc/ydoc.py b/jupyter_server_ydoc/ydoc.py index 91d91ea7..c357c01e 100644 --- a/jupyter_server_ydoc/ydoc.py +++ b/jupyter_server_ydoc/ydoc.py @@ -2,6 +2,7 @@ # Distributed under the terms of the Modified BSD License. import asyncio +from datetime import datetime from pathlib import Path from typing import Any, Dict, Optional, Tuple @@ -39,6 +40,13 @@ def __init__(self, type: str, ystore: BaseYStore): self.document = YDOCS.get(type, YFILE)(self.ydoc) +async def metadata_callback() -> bytes: + # the current datetime will be stored in metadata as bytes + # it can be retrieved as: + # datetime.fromisoformat(metadata.decode()) + return datetime.utcnow().isoformat().encode() + + class JupyterWebsocketServer(WebsocketServer): rooms: Dict[str, JupyterRoom] @@ -52,7 +60,7 @@ def get_room(self, path: str) -> JupyterRoom: if path not in self.rooms.keys(): p = Path(file_path) updates_file_path = str(p.parent / f".{file_type}:{p.name}.y") - ystore = self.ystore_class(path=updates_file_path) + ystore = self.ystore_class(path=updates_file_path, metadata_callback=metadata_callback) self.rooms[path] = JupyterRoom(file_type, ystore) return self.rooms[path] diff --git a/pyproject.toml b/pyproject.toml index 7c8b06fe..7353a4b5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,7 +22,7 @@ classifiers = [ requires-python = ">=3.7" dependencies = [ "jupyter_ydoc==0.1.13", - "ypy-websocket>=0.1.13" + "ypy-websocket>=0.2.0" ] [[project.authors]]