# encoding: utf-8
import asyncio
import select
from evdev import eventio
# needed for compatibility
from evdev.eventio import EvdevError
[docs]class EventIO(eventio.EventIO):
def _do_when_readable(self, callback):
loop = asyncio.get_event_loop()
def ready():
loop.remove_reader(self.fileno())
callback()
loop.add_reader(self.fileno(), ready)
def _set_result(self, future, cb):
try:
future.set_result(cb())
except Exception as error:
future.set_exception(error)
[docs] def async_read_one(self):
'''
Asyncio coroutine to read and return a single input event as
an instance of :class:`InputEvent <evdev.events.InputEvent>`.
'''
future = asyncio.Future()
self._do_when_readable(lambda: self._set_result(future, self.read_one))
return future
[docs] def async_read(self):
'''
Asyncio coroutine to read multiple input events from device. Return
a generator object that yields :class:`InputEvent <evdev.events.InputEvent>`
instances.
'''
future = asyncio.Future()
self._do_when_readable(lambda: self._set_result(future, self.read))
return future
[docs] def async_read_loop(self):
'''
Return an iterator that yields input events. This iterator is
compatible with the ``async for`` syntax.
'''
return ReadIterator(self)
[docs] def close(self):
try:
loop = asyncio.get_event_loop()
loop.remove_reader(self.fileno())
except RuntimeError:
# no event loop present, so there is nothing to
# remove the reader from. Ignore
pass
class ReadIterator(object):
def __init__(self, device):
self.current_batch = iter(())
self.device = device
# Standard iterator protocol.
def __iter__(self):
return self
# Python 2.x compatibility.
def next(self):
return self.__next__()
def __next__(self):
try:
# Read from the previous batch of events.
return next(self.current_batch)
except StopIteration:
r, w, x = select.select([self.device.fd], [], [])
self.current_batch = self.device.read()
return next(self.current_batch)
def __aiter__(self):
return self
@asyncio.coroutine
def __anext__(self):
future = asyncio.Future()
try:
# Read from the previous batch of events.
future.set_result(next(self.current_batch))
except StopIteration:
def next_batch_ready(batch):
try:
self.current_batch = batch.result()
future.set_result(next(self.current_batch))
except Exception as e:
future.set_exception(e)
self.device.async_read().add_done_callback(next_batch_ready)
return future