Source code for keri.app.apping

# -*- encoding: utf-8 -*-
"""
KERI
keri.app.apping module

"""

from hio.base import doing
from hio.core.serial import serialing
from hio.help import ogler

from ..db import Baser

logger = ogler.getLogger()


[docs] class Consoler(doing.Doer): """Manages a command-line serial console Reads lines from a serial console, parses simple movement commands, and echoes feedback. Inherits lifecycle management (enter/recur/exit) from :class:`hio.base.doing.Doer`. Attributes: db (Baser): Database instance used by this doer. console (serialing.Console): Serial console for input/output. """
[docs] def __init__(self, db=None, console=None, **kwa): """Initializes Consoler with optional database and console. Args: db (Baser, optional): Database instance. Defaults to a new :class:`~keri.db.basing.Baser` instance if None. console (serialing.Console, optional): Serial console instance. Defaults to a new :class:`~hio.core.serial.serialing.Console` instance if None. **kwa: Additional keyword arguments passed to :class:`~hio.base.doing.Doer`. """ super(Consoler, self).__init__(**kwa) self.db = db if db is not None else Baser() self.console = console if console is not None else serialing.Console()
[docs] def enter(self, *, temp=None): """Opens the serial console resource. Called by the Doer framework when entering the task context. Args: temp (bool, optional): Unused. Reserved for interface compatibility. Raises: IOError: If the console cannot be opened. """ if not self.console.reopen(): raise IOError("Unable to open serial console.")
[docs] def recur(self, tyme): """Reads one line from the console and dispatches a movement command. Recognized commands (matched on the first character, case-insensitive): - ``r`` / ``right``: turn right - ``l`` / ``left``: turn left - ``w`` / ``walk``: walk 1 step - ``s`` / ``stop``: stop Args: tyme (float): Current loop time provided by the Doist scheduler. Returns: bool: Always ``False``; signals the Doer to continue recurring. """ line = self.console.get() if not line: return False chunks = line.lower().split() if not chunks: self.console.put("Try one of: l[eft] r[ight] w[alk] s[top]\n") return False verb = chunks[0] if verb.startswith(b'r'): command = ('turn', 'right') elif verb.startswith(b'l'): command = ('turn', 'left') elif verb.startswith(b'w'): command = ('walk', 1) elif verb.startswith(b's'): command = ('stop', '') else: self.console.put("Invalid command: {0}\n".format(verb)) self.console.put("Try one of: t[urn] s[top] w[alk]\n") return False self.console.put("Did: {} {}\n".format(command[0], command[1]).encode("utf-8")) return False
[docs] def exit(self): """Closes the serial console resource. Called by the Doer framework when leaving the task context. """ self.console.close()