Source code for evdev.eventio_async

# encoding: utf-8

import asyncio
import select

from evdev import eventio
# needed for compatibility
from evdev.eventio import EvdevError

[docs]class EventIO(eventio.EventIO): def _do_when_readable(self, callback): loop = asyncio.get_event_loop() def ready(): loop.remove_reader(self.fileno()) callback() loop.add_reader(self.fileno(), ready) def _set_result(self, future, cb): try: future.set_result(cb()) except Exception as error: future.set_exception(error)
[docs] def async_read_one(self): ''' Asyncio coroutine to read and return a single input event as an instance of :class:`InputEvent <>`. ''' future = asyncio.Future() self._do_when_readable(lambda: self._set_result(future, self.read_one)) return future
[docs] def async_read(self): ''' Asyncio coroutine to read multiple input events from device. Return a generator object that yields :class:`InputEvent <>` instances. ''' future = asyncio.Future() self._do_when_readable(lambda: self._set_result(future, return future
[docs] def async_read_loop(self): ''' Return an iterator that yields input events. This iterator is compatible with the ``async for`` syntax. ''' return ReadIterator(self)
[docs] def close(self): try: loop = asyncio.get_event_loop() loop.remove_reader(self.fileno()) except RuntimeError: # no event loop present, so there is nothing to # remove the reader from. Ignore pass
class ReadIterator(object): def __init__(self, device): self.current_batch = iter(()) self.device = device # Standard iterator protocol. def __iter__(self): return self # Python 2.x compatibility. def next(self): return self.__next__() def __next__(self): try: # Read from the previous batch of events. return next(self.current_batch) except StopIteration: r, w, x =[self.device.fd], [], []) self.current_batch = return next(self.current_batch) def __aiter__(self): return self @asyncio.coroutine def __anext__(self): future = asyncio.Future() try: # Read from the previous batch of events. future.set_result(next(self.current_batch)) except StopIteration: def next_batch_ready(batch): try: self.current_batch = batch.result() future.set_result(next(self.current_batch)) except Exception as e: future.set_exception(e) self.device.async_read().add_done_callback(next_batch_ready) return future