Source code for evdev.uinput

import os
import platform
import re
import stat
import time
from collections import defaultdict

from evdev import _uinput
from evdev import ecodes, util, device
from evdev.events import InputEvent
import evdev.ff as ff
import ctypes

try:
    from evdev.eventio_async import EventIO
except ImportError:
    from evdev.eventio import EventIO


class UInputError(Exception):
    pass


[docs] class UInput(EventIO): """ A userland input device and that can inject input events into the linux input subsystem. """ __slots__ = ( "name", "vendor", "product", "version", "bustype", "events", "devnode", "fd", "device", )
[docs] @classmethod def from_device(cls, *devices, filtered_types=(ecodes.EV_SYN, ecodes.EV_FF), **kwargs): """ Create an UInput device with the capabilities of one or more input devices. Arguments --------- devices : InputDevice|str Varargs of InputDevice instances or paths to input devices. filtered_types : Tuple[event type codes] Event types to exclude from the capabilities of the uinput device. **kwargs Keyword arguments to UInput constructor (i.e. name, vendor etc.). """ device_instances = [] for dev in devices: if not isinstance(dev, device.InputDevice): dev = device.InputDevice(str(dev)) device_instances.append(dev) all_capabilities = defaultdict(set) if "max_effects" not in kwargs: kwargs["max_effects"] = min([dev.ff_effects_count for dev in device_instances]) # Merge the capabilities of all devices into one dictionary. for dev in device_instances: for ev_type, ev_codes in dev.capabilities().items(): all_capabilities[ev_type].update(ev_codes) for evtype in filtered_types: if evtype in all_capabilities: del all_capabilities[evtype] return cls(events=all_capabilities, **kwargs)
[docs] def __init__( self, events=None, name="py-evdev-uinput", vendor=0x1, product=0x1, version=0x1, bustype=0x3, devnode="/dev/uinput", phys="py-evdev-uinput", input_props=None, max_effects=ecodes.FF_MAX_EFFECTS, ): """ Arguments --------- events : dict Dictionary of event types mapping to lists of event codes. The event types and codes that the uinput device will be able to inject - defaults to all key codes. name The name of the input device. vendor Vendor identifier. product Product identifier. version Version identifier. bustype Bustype identifier. phys Physical path. input_props Input properties and quirks. max_effects Maximum simultaneous force-feedback effects. Note ---- If you do not specify any events, the uinput device will be able to inject only ``KEY_*`` and ``BTN_*`` event codes. """ self.name = name #: Uinput device name. self.vendor = vendor #: Device vendor identifier. self.product = product #: Device product identifier. self.version = version #: Device version identifier. self.bustype = bustype #: Device bustype - e.g. ``BUS_USB``. self.phys = phys #: Uinput device physical path. self.devnode = devnode #: Uinput device node - e.g. ``/dev/uinput/``. if not events: events = {ecodes.EV_KEY: ecodes.keys.keys()} self._verify() #: Write-only, non-blocking file descriptor to the uinput device node. self.fd = _uinput.open(devnode) # Prepare the list of events for passing to _uinput.enable and _uinput.setup. absinfo, prepared_events = self._prepare_events(events) # Set phys name _uinput.set_phys(self.fd, phys) # Set properties input_props = input_props or [] for prop in input_props: _uinput.set_prop(self.fd, prop) for etype, code in prepared_events: _uinput.enable(self.fd, etype, code) _uinput.setup(self.fd, name, vendor, product, version, bustype, absinfo, max_effects) # Create the uinput device. _uinput.create(self.fd) self.dll = ctypes.CDLL(_uinput.__file__) self.dll._uinput_begin_upload.restype = ctypes.c_int self.dll._uinput_end_upload.restype = ctypes.c_int #: An :class:`InputDevice <evdev.device.InputDevice>` instance #: for the fake input device. ``None`` if the device cannot be #: opened for reading and writing. self.device = self._find_device(self.fd)
def _prepare_events(self, events): """Prepare events for passing to _uinput.enable and _uinput.setup""" absinfo, prepared_events = [], [] for etype, codes in events.items(): for code in codes: # Handle max, min, fuzz, flat. if isinstance(code, (tuple, list, device.AbsInfo)): # Flatten (ABS_Y, (0, 255, 0, 0, 0, 0)) to (ABS_Y, 0, 255, 0, 0, 0, 0). f = [code[0]] f.extend(code[1]) # Ensure the tuple is always 6 ints long, since uinput.c:uinput_create # does little in the way of checking the length. f.extend([0] * (6 - len(code[1]))) absinfo.append(f) code = code[0] prepared_events.append((etype, code)) return absinfo, prepared_events def __enter__(self): return self def __exit__(self, type, value, tb): if hasattr(self, "fd"): self.close() def __repr__(self): # TODO: v = (repr(getattr(self, i)) for i in ("name", "bustype", "vendor", "product", "version", "phys")) return "{}({})".format(self.__class__.__name__, ", ".join(v)) def __str__(self): msg = 'name "{}", bus "{}", vendor "{:04x}", product "{:04x}", version "{:04x}", phys "{}"\n' "event types: {}" evtypes = [i[0] for i in self.capabilities(True).keys()] msg = msg.format( self.name, ecodes.BUS[self.bustype], self.vendor, self.product, self.version, self.phys, " ".join(evtypes) ) return msg def close(self): # Close the associated InputDevice, if it was previously opened. if self.device is not None: self.device.close() # Destroy the uinput device. if self.fd > -1: _uinput.close(self.fd) self.fd = -1
[docs] def syn(self): """ Inject a ``SYN_REPORT`` event into the input subsystem. Events queued by :func:`write()` will be fired. If possible, events will be merged into an 'atomic' event. """ _uinput.write(self.fd, ecodes.EV_SYN, ecodes.SYN_REPORT, 0)
[docs] def capabilities(self, verbose=False, absinfo=True): """See :func:`capabilities <evdev.device.InputDevice.capabilities>`.""" if self.device is None: raise UInputError("input device not opened - cannot read capabilities") return self.device.capabilities(verbose, absinfo)
def begin_upload(self, effect_id): upload = ff.UInputUpload() upload.effect_id = effect_id ret = self.dll._uinput_begin_upload(self.fd, ctypes.byref(upload)) if ret: raise UInputError("Failed to begin uinput upload: " + os.strerror(ret)) return upload def end_upload(self, upload): ret = self.dll._uinput_end_upload(self.fd, ctypes.byref(upload)) if ret: raise UInputError("Failed to end uinput upload: " + os.strerror(ret)) def begin_erase(self, effect_id): erase = ff.UInputErase() erase.effect_id = effect_id ret = self.dll._uinput_begin_erase(self.fd, ctypes.byref(erase)) if ret: raise UInputError("Failed to begin uinput erase: " + os.strerror(ret)) return erase def end_erase(self, erase): ret = self.dll._uinput_end_erase(self.fd, ctypes.byref(erase)) if ret: raise UInputError("Failed to end uinput erase: " + os.strerror(ret)) def _verify(self): """ Verify that an uinput device exists and is readable and writable by the current process. """ try: m = os.stat(self.devnode)[stat.ST_MODE] if not stat.S_ISCHR(m): raise except (IndexError, OSError): msg = '"{}" does not exist or is not a character device file ' "- verify that the uinput module is loaded" raise UInputError(msg.format(self.devnode)) if not os.access(self.devnode, os.W_OK): msg = '"{}" cannot be opened for writing' raise UInputError(msg.format(self.devnode)) if len(self.name) > _uinput.maxnamelen: msg = "uinput device name must not be longer than {} characters" raise UInputError(msg.format(_uinput.maxnamelen)) def _find_device(self, fd): """ Tries to find the device node. Will delegate this task to one of several platform-specific functions. """ if platform.system() == "Linux": try: sysname = _uinput.get_sysname(fd) return self._find_device_linux(sysname) except OSError: # UI_GET_SYSNAME returned an error code. We're likely dealing with # an old kernel. Guess the device based on the filesystem. pass # If we're not running or Linux or the above method fails for any reason, # use the generic fallback method. return self._find_device_fallback() def _find_device_linux(self, sysname): """ Tries to find the device node when running on Linux. """ syspath = f"/sys/devices/virtual/input/{sysname}" # The sysfs entry for event devices should contain exactly one folder # whose name matches the format "event[0-9]+". It is then assumed that # the device node in /dev/input uses the same name. regex = re.compile("event[0-9]+") for entry in os.listdir(syspath): if regex.fullmatch(entry): device_path = f"/dev/input/{entry}" break else: # no break raise FileNotFoundError() # It is possible that there is some delay before /dev/input/event* shows # up on old systems that do not use devtmpfs, so if the device cannot be # found, wait for a short amount and then try again once. # # Furthermore, even if devtmpfs is in use, it is possible that the device # does show up immediately, but without the correct permissions that # still need to be set by udev. Wait for up to two seconds for either the # device to show up or the permissions to be set. for attempt in range(19): try: return device.InputDevice(device_path) except (FileNotFoundError, PermissionError): time.sleep(0.1) # Last attempt. If this fails, whatever exception the last attempt raises # shall be the exception that this function raises. return device.InputDevice(device_path) def _find_device_fallback(self): """ Tries to find the device node when UI_GET_SYSNAME is not available or we're running on a system sufficiently exotic that we do not know how to interpret its return value. """ #:bug: the device node might not be immediately available time.sleep(0.1) # There could also be another device with the same name already present, # make sure to select the newest one. # Strictly speaking, we cannot be certain that everything returned by list_devices() # ends at event[0-9]+: it might return something like "/dev/input/events_all". Find # the devices that have the expected structure and extract their device number. path_number_pairs = [] regex = re.compile("/dev/input/event([0-9]+)") for path in util.list_devices("/dev/input/"): regex_match = regex.fullmatch(path) if not regex_match: continue device_number = int(regex_match[1]) path_number_pairs.append((path, device_number)) # The modification date of the devnode is not reliable unfortunately, so we # are sorting by the number in the name path_number_pairs.sort(key=lambda pair: pair[1], reverse=True) for path, _ in path_number_pairs: d = device.InputDevice(path) if d.name == self.name: return d