Source code for ggblab.comm

import uuid
import json
# import ast
import queue

# import time
import asyncio
import threading

import tempfile
from websockets.asyncio.server import unix_serve, serve
import os

from IPython import get_ipython

[docs] class ggb_comm: """Dual-channel communication layer for kernel↔widget messaging. Implements a combination of IPython Comm (primary) and out-of-band socket (Unix domain socket on POSIX, TCP WebSocket on Windows) to enable message delivery during cell execution when IPython Comm is blocked. IPython Comm cannot receive messages while a notebook cell is executing, which breaks interactive workflows. The out-of-band socket solves this by providing a secondary channel for GeoGebra responses. Architecture: - IPython Comm: Command dispatch, event notifications, heartbeat - Out-of-band socket: Response delivery during cell execution Comm target is fixed at 'ggblab-comm' because multiplexing via multiple targets would not solve the IPython Comm receive limitation. Attributes: target_comm: IPython Comm object target_name (str): Comm target name ('ggblab-comm') server_handle: WebSocket server handle server_thread: Background thread running the socket server clients (set): Currently connected WebSocket clients socketPath (str): Unix domain socket path (POSIX) wsPort (int): TCP port number (Windows) recv_logs (dict): Response storage keyed by message ID recv_events (queue.Queue): Event queue for frontend notifications See: docs/architecture.md for detailed communication architecture. """ # [Frontent to kernel callback - JupyterLab - Jupyter Community Forum] # (https://discourse.jupyter.org/t/frontent-to-kernel-callback/1666) recv_msgs = {} recv_logs = {} recv_events = queue.Queue() logs = [] thread = None mid = None def __init__(self): self.target_comm = None self.target_name = 'ggblab-comm' self.server_handle = None self.server_thread = None self.clients = set() self.socketPath = None self.wsPort = 0 # oob websocket (unix_domain socket in posix)
[docs] def start(self): """Start the out-of-band socket server in a background thread. Creates a Unix domain socket (POSIX) or TCP WebSocket server (Windows) and runs it in a daemon thread. The server listens for GeoGebra responses. """ self.server_thread = threading.Thread(target=lambda: asyncio.run(self.server()), daemon=True) self.server_thread.start()
[docs] def stop(self): """Stop the out-of-band socket server.""" self.server_handle.close()
[docs] async def server(self): if os.name in [ 'posix' ]: _fd, self.socketPath = tempfile.mkstemp(prefix="/tmp/ggb_") os.close(_fd) os.remove(self.socketPath) async with unix_serve(self.client_handle, path=self.socketPath) as self.server_handle: await asyncio.Future() else: async with serve(self.client_handle, "localhost", 0) as self.server_handle: self.wsPort = self.server_handle.sockets[0].getsockname()[1] self.logs.append(f"WebSocket server started at ws://localhost:{self.wsPort}") await asyncio.Future()
[docs] async def client_handle(self, client_id): self.clients.add(client_id) self.logs.append(f"Client {client_id} registered.") try: async for msg in client_id: # _data = ast.literal_eval(msg) _data = json.loads(msg) _id = _data.get('id') # self.logs.append(f"Received message from client: {_id}") self.recv_logs[_id] = _data['payload'] except Exception as e: pass # self.logs.append(f"Connection closed: {e}") finally: self.clients.remove(client_id)
# self.logs.append(f"Client disconnected: {client_id}") # comm
[docs] def register_target(self): get_ipython().kernel.comm_manager.register_target( self.target_name, self.register_target_cb)
[docs] def register_target_cb(self, comm, msg): self.target_comm = comm @comm.on_msg def _recv(msg): self.handle_recv(msg) @comm.on_close def _close(): self.target_comm = None
[docs] def unregister_target_cb(self, comm, msg): self.target_comm.close() self.target_comm = None
[docs] def handle_recv(self, msg): if isinstance(msg['content']['data'], str): _data = json.loads(msg['content']['data']) else: _data = msg['content']['data'] # _id = _data.get('id') if 'id' not in _data: self.recv_events.put(_data)
# if self.mid and self.mid == _id: # self.recv_msgs[_id] = [_data] # self.mid = None # else: # if _id in self.recv_msgs: # # self.recv_msgs[_id].append(_data) # else: # self.recv_msgs[_id] = [_data]
[docs] def send(self, msg): return self.target_comm.send(msg)
[docs] async def send_recv(self, msg): """Send a message via IPython Comm and wait for response via out-of-band socket. This method: 1. Generates a unique message ID (UUID) 2. Sends the message via IPython Comm to the frontend 3. Waits for the response to arrive via the out-of-band socket 4. Returns the response payload The 3-second timeout is sufficient for interactive operations. For long-running operations, decompose into smaller steps. Args: msg (dict or str): Message to send (will be JSON-serialized). Returns: dict: Response payload from GeoGebra. Raises: TimeoutError: If no response arrives within 3 seconds. Example: >>> response = await comm.send_recv({ ... "type": "command", ... "payload": "A=(0,0)" ... }) """ try: async with asyncio.timeout(3.0): if isinstance(msg, str): _data = json.loads(msg) else: _data = msg _id = str(uuid.uuid4()) self.mid = _id msg['id'] = _id self.send(json.dumps(_data)) while not (_id in self.recv_logs): await asyncio.sleep(0.01) # self.recv_msgs.pop(_id, None) value = self.recv_logs.pop(_id, None) return value except TimeoutError: print(f"TimeoutError in send_recv {msg}") return { 'type': 'error', 'message': 'TimeoutError in send_recv' }