Skip to content

switch to jupyter_events #862

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Aug 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions jupyter_server/base/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import prometheus_client
from jinja2 import TemplateNotFound
from jupyter_core.paths import is_hidden
from jupyter_events import EventLogger
from tornado import web
from tornado.log import app_log
from traitlets.config import Application
Expand Down Expand Up @@ -335,8 +336,8 @@ def config_manager(self):
return self.settings["config_manager"]

@property
def event_bus(self):
return self.settings["event_bus"]
def event_logger(self) -> EventLogger:
return self.settings["event_logger"]

# ---------------------------------------------------------------
# CORS
Expand Down
28 changes: 13 additions & 15 deletions jupyter_server/serverapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
from jupyter_client.session import Session
from jupyter_core.application import JupyterApp, base_aliases, base_flags
from jupyter_core.paths import jupyter_runtime_dir
from jupyter_events.logger import EventLogger
from nbformat.sign import NotebookNotary
from traitlets import (
Any,
Expand Down Expand Up @@ -123,7 +124,6 @@
AsyncContentsManager,
ContentsManager,
)
from jupyter_server.services.events.bus import EventBus
from jupyter_server.services.kernels.kernelmanager import (
AsyncMappingKernelManager,
MappingKernelManager,
Expand Down Expand Up @@ -212,7 +212,7 @@ def __init__(
session_manager,
kernel_spec_manager,
config_manager,
event_bus,
event_logger,
extra_services,
log,
base_url,
Expand Down Expand Up @@ -248,7 +248,7 @@ def __init__(
session_manager,
kernel_spec_manager,
config_manager,
event_bus,
event_logger,
extra_services,
log,
base_url,
Expand All @@ -270,7 +270,7 @@ def init_settings(
session_manager,
kernel_spec_manager,
config_manager,
event_bus,
event_logger,
extra_services,
log,
base_url,
Expand Down Expand Up @@ -359,7 +359,7 @@ def init_settings(
config_manager=config_manager,
authorizer=authorizer,
identity_provider=identity_provider,
event_bus=event_bus,
event_logger=event_logger,
# handlers
extra_services=extra_services,
# Jupyter stuff
Expand Down Expand Up @@ -770,7 +770,7 @@ class ServerApp(JupyterApp):
GatewaySessionManager,
GatewayClient,
Authorizer,
EventBus,
EventLogger,
]

subcommands = dict(
Expand Down Expand Up @@ -1552,10 +1552,10 @@ def _default_kernel_spec_manager_class(self):
),
)

event_bus = Instance(
EventBus,
event_logger = Instance(
EventLogger,
allow_none=True,
help="An EventBus for emitting structured event data from Jupyter Server and extensions.",
help="An EventLogger for emitting structured event data from Jupyter Server and extensions.",
)

info_file = Unicode()
Expand Down Expand Up @@ -1948,9 +1948,9 @@ def init_logging(self):
logger.parent = self.log
logger.setLevel(self.log.level)

def init_eventbus(self):
def init_event_logger(self):
"""Initialize the Event Bus."""
self.event_bus = EventBus.instance(parent=self)
self.event_logger = EventLogger(parent=self)

def init_webapp(self):
"""initialize tornado webapp"""
Expand Down Expand Up @@ -2012,7 +2012,7 @@ def init_webapp(self):
self.session_manager,
self.kernel_spec_manager,
self.config_manager,
self.event_bus,
self.event_logger,
self.extra_services,
self.log,
self.base_url,
Expand Down Expand Up @@ -2487,7 +2487,7 @@ def initialize(
if find_extensions:
self.find_server_extensions()
self.init_logging()
self.init_eventbus()
self.init_event_logger()
self.init_server_extensions()

# Special case the starter extension and load
Expand Down Expand Up @@ -2814,8 +2814,6 @@ async def _cleanup(self):
await self.cleanup_kernels()
if getattr(self, "session_manager", None):
self.session_manager.close()
if getattr(self, "event_bus", None):
self.event_bus.clear_instance()

def start_ioloop(self):
"""Start the IO Loop."""
Expand Down
15 changes: 0 additions & 15 deletions jupyter_server/services/events/bus.py

This file was deleted.

41 changes: 12 additions & 29 deletions jupyter_server/services/events/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@

.. versionadded:: 2.0
"""
import logging
import json
from datetime import datetime
from typing import Any, Dict, Optional

from jupyter_telemetry.eventlog import _skip_message
from pythonjsonlogger import jsonlogger
from jupyter_events import EventLogger
from tornado import web, websocket

from jupyter_server.auth import authorized
Expand All @@ -18,18 +17,6 @@
AUTH_RESOURCE = "events"


class WebSocketLoggingHandler(logging.Handler):
"""Python logging handler that routes records to a Tornado websocket."""

def __init__(self, websocket, *args, **kwargs):
super().__init__(*args, **kwargs)
self.websocket = websocket

def emit(self, record):
"""Emit the message across the websocket"""
self.websocket.write_message(record.msg)


class SubscribeWebsocket(
JupyterHandler,
websocket.WebSocketHandler,
Expand Down Expand Up @@ -58,26 +45,23 @@ async def get(self, *args, **kwargs):
res = super().get(*args, **kwargs)
await res

async def event_listener(self, logger: EventLogger, schema_id: str, data: dict) -> None:
capsule = dict(schema_id=schema_id, **data)
self.write_message(json.dumps(capsule))

def open(self):
"""Routes events that are emitted by Jupyter Server's
EventBus to a WebSocket client in the browser.
"""
self.logging_handler = WebSocketLoggingHandler(self)
# Add a JSON formatter to the handler.
formatter = jsonlogger.JsonFormatter(json_serializer=_skip_message)
self.logging_handler.setFormatter(formatter)
# To do: add an eventlog.add_handler method to jupyter_telemetry.
self.event_bus.log.addHandler(self.logging_handler)
self.event_bus.handlers.append(self.logging_handler)
self.event_logger.add_listener(listener=self.event_listener)

def on_close(self):
self.event_bus.log.removeHandler(self.logging_handler)
self.event_bus.handlers.remove(self.logging_handler)
self.event_logger.remove_listener(listener=self.event_listener)


def validate_model(data: Dict[str, Any]) -> None:
"""Validates for required fields in the JSON request body"""
required_keys = {"schema_name", "version", "event"}
required_keys = {"schema_id", "version", "data"}
for key in required_keys:
if key not in data:
raise web.HTTPError(400, f"Missing `{key}` in the JSON request body.")
Expand Down Expand Up @@ -115,10 +99,9 @@ async def post(self):

try:
validate_model(payload)
self.event_bus.record_event(
schema_name=payload.get("schema_name"),
version=payload.get("version"),
event=payload.get("event"),
self.event_logger.emit(
schema_id=payload.get("schema_id"),
data=payload.get("data"),
timestamp_override=get_timestamp(payload),
)
self.set_status(204)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ dependencies = [
"tornado>=6.1.0",
"traitlets>=5.1",
"websocket-client",
"jupyter_telemetry"
"jupyter_events>=0.4.0"
]

[project.urls]
Expand Down
28 changes: 26 additions & 2 deletions tests/extension/mockextensions/app.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import os

from jupyter_events import EventLogger
from jupyter_events.schema_registry import SchemaRegistryException
from traitlets import List, Unicode

from jupyter_server.base.handlers import JupyterHandler
Expand All @@ -11,16 +13,28 @@

STATIC_PATH = os.path.join(os.path.dirname(__file__), "static")

# Function that makes these extensions discoverable
# by the test functions.
EVENT_SCHEMA = """\
$id: https://events.jupyter.org/mockapp/v1/test
version: 1
properties:
msg:
type: string
required:
- msg
"""


# Function that makes these extensions discoverable
# by the test functions.
def _jupyter_server_extension_points():
return [{"module": __name__, "app": MockExtensionApp}]


class MockExtensionHandler(ExtensionHandlerMixin, JupyterHandler):
def get(self):
self.event_logger.emit(
schema_id="https://events.jupyter.org/mockapp/v1/test", data={"msg": "Hello, world!"}
)
self.finish(self.config.mock_trait)


Expand All @@ -45,6 +59,16 @@ class MockExtensionApp(ExtensionAppJinjaMixin, ExtensionApp):
def get_extension_package():
return "tests.extension.mockextensions"

def initialize_settings(self):
# Only add this event if it hasn't already been added.
# Log the error if it fails, but don't crash the app.
try:
elogger: EventLogger = self.serverapp.event_logger
elogger.register_event_schema(EVENT_SCHEMA)
except SchemaRegistryException as err:
self.log.error(err)
pass

def initialize_handlers(self):
self.handlers.append(("/mock", MockExtensionHandler))
self.handlers.append(("/mock_template", MockExtensionTemplateHandler))
Expand Down
18 changes: 18 additions & 0 deletions tests/extension/test_app.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import json
from io import StringIO
from logging import StreamHandler
from typing import Any

import pytest
Expand Down Expand Up @@ -171,3 +174,18 @@ async def _stop(*args):

# check the shutdown method was called twice
assert calls == 2


async def test_events(jp_serverapp, jp_fetch):
stream = StringIO()
handler = StreamHandler(stream)
jp_serverapp.event_logger.register_handler(handler)

await jp_fetch("mock")

handler.flush()
output = json.loads(stream.getvalue())
# Clear the sink.
stream.truncate(0)
stream.seek(0)
assert output["msg"] == "Hello, world!"
2 changes: 2 additions & 0 deletions tests/services/events/mock_event.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ type: object
properties:
event_message:
title: Event Messages
categories:
- unrestricted
description: |
Mock event message to read.
required:
Expand Down
9 changes: 4 additions & 5 deletions tests/services/events/mockextension/mock_extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,16 @@
class MockEventHandler(JupyterHandler):
def get(self):
# Emit an event.
self.event_bus.record_event(
schema_name="event.mockextension.jupyter.org/message",
version=1,
event={"event_message": "Hello world, from mock extension!"},
self.event_logger.emit(
schema_id="event.mockextension.jupyter.org/message",
data={"event_message": "Hello world, from mock extension!"},
)


def _load_jupyter_server_extension(serverapp):
# Register a schema with the EventBus
schema_file = pathlib.Path(__file__).parent / "mock_extension_event.yaml"
serverapp.event_bus.register_schema_file(schema_file)
serverapp.event_logger.register_event_schema(schema_file)
serverapp.web_app.add_handlers(
".*$", [(url_path_join(serverapp.base_url, "/mock/event"), MockEventHandler)]
)
2 changes: 2 additions & 0 deletions tests/services/events/mockextension/mock_extension_event.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ type: object
properties:
event_message:
title: Event Message
categories:
- unrestricted
description: |
Mock event message to read.
required:
Expand Down
Loading