/usr/lib/python3/dist-packages/Onboard/AtspiStateTracker.py is in onboard 1.0.0-0ubuntu4.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 | # -*- coding: utf-8 -*-
""" GTK specific keyboard class """
from __future__ import division, print_function, unicode_literals
import logging
_logger = logging.getLogger(__name__)
try:
from gi.repository import Atspi
except ImportError as e:
_logger.warning("Atspi typelib missing, auto-show unavailable")
from Onboard.utils import Rect, EventSource, Process, unicode_str
class AsyncEvent:
"""
Decouple AT-SPI events from D-Bus calls to reduce the risk for deadlocks.
"""
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
self._kwargs = kwargs
def __repr__(self):
return type(self).__name__ + "(" + \
", ".join(str(key) + "=" + repr(val) \
for key, val in self._kwargs.items()) \
+ ")"
class AtspiStateTracker(EventSource):
"""
Keeps track of the currently active accessible by listening
to AT-SPI focus events.
"""
_focus_event_names = ("text-entry-activated",)
_text_event_names = ("text-changed", "text-caret-moved")
_key_stroke_event_names = ("key-pressed",)
_async_event_names = ("async-focus-changed",
"async-text-changed",
"async-text-caret-moved")
_event_names = _async_event_names + \
_focus_event_names + \
_text_event_names + \
_key_stroke_event_names
_focus_listeners_registered = False
_keystroke_listeners_registered = False
_text_listeners_registered = False
_keystroke_listener = None
# asynchronously accessible members
_focused_accessible = None # any currently focused accessible
_active_accessible = None # editable focused accessible
_last_active_accessible = None
_state = None # cache of various accessible properties
def __new__(cls, *args, **kwargs):
"""
Singleton magic.
"""
if not hasattr(cls, "self"):
cls.self = object.__new__(cls, *args, **kwargs)
cls.self.construct()
return cls.self
def __init__(self):
"""
Called multiple times, don't use this.
"""
pass
def construct(self):
"""
Singleton constructor, runs only once.
"""
EventSource.__init__(self, self._event_names)
self._state = {}
self._frozen = False
def cleanup(self):
EventSource.cleanup(self)
self._register_atspi_listeners(False)
def connect(self, event_name, callback):
EventSource.connect(self, event_name, callback)
self._update_listeners()
def disconnect(self, event_name, callback):
EventSource.disconnect(self, event_name, callback)
self._update_listeners()
def _update_listeners(self):
register = self.has_listeners(self._focus_event_names)
self._register_atspi_focus_listeners(register)
register = self.has_listeners(self._text_event_names)
self._register_atspi_text_listeners(register)
register = self.has_listeners(self._key_stroke_event_names)
self._register_atspi_keystroke_listeners(register)
def _register_atspi_listeners(self, register):
self._register_atspi_focus_listeners(register)
self._register_atspi_text_listeners(register)
self._register_atspi_keystroke_listeners(register)
def _register_atspi_focus_listeners(self, register):
if not "Atspi" in globals():
return
if self._focus_listeners_registered != register:
if register:
self.atspi_connect("_listener_focus",
"focus",
self._on_atspi_global_focus)
self.atspi_connect("_listener_object_focus",
"object:state-changed:focused",
self._on_atspi_object_focus)
# private asynchronous events
for name in self._async_event_names:
handler = "_on_" + name.replace("-", "_")
EventSource.connect(self, name, getattr(self, handler))
else:
self.atspi_disconnect("_listener_focus",
"focus")
self.atspi_disconnect("_listener_object_focus",
"object:state-changed:focused")
for name in self._async_event_names:
handler = "_on_" + name.replace("-", "_")
EventSource.disconnect(self, name, getattr(self, handler))
self._focus_listeners_registered = register
def _register_atspi_text_listeners(self, register):
if not "Atspi" in globals():
return
if self._text_listeners_registered != register:
if register:
self.atspi_connect("_listener_text_changed",
"object:text-changed",
self._on_atspi_text_changed)
self.atspi_connect("_listener_text_caret_moved",
"object:text-caret-moved",
self._on_atspi_text_caret_moved)
else:
self.atspi_disconnect("_listener_text_changed",
"object:text-changed")
self.atspi_disconnect("_listener_text_caret_moved",
"object:text-caret-moved")
self._text_listeners_registered = register
def _register_atspi_keystroke_listeners(self, register):
if not "Atspi" in globals():
return
if self._keystroke_listeners_registered != register:
modifier_masks = range(16)
if register:
if not self._keystroke_listener:
self._keystroke_listener = \
Atspi.DeviceListener.new(self._on_atspi_keystroke, None)
for modifier_mask in modifier_masks:
Atspi.register_keystroke_listener( \
self._keystroke_listener,
None, # key set, None=all
modifier_mask,
Atspi.KeyEventType.PRESSED,
Atspi.KeyListenerSyncType.SYNCHRONOUS)
else:
# Apparently any single deregister call will turn off
# all the other registered modifier_masks too. Since
# deregistering takes extremely long (~2.5s for 16 calls)
# seize the opportunity and just pick a single arbitrary
# mask (Quantal).
modifier_masks = [2]
for modifier_mask in modifier_masks:
Atspi.deregister_keystroke_listener(
self._keystroke_listener,
None, # key set, None=all
modifier_mask,
Atspi.KeyEventType.PRESSED)
self._keystroke_listeners_registered = register
def atspi_connect(self, attribute, event, callback):
"""
Start listening to an AT-SPI event.
Creates a new event listener for each event, since this seems
to be the only way to allow reliable deregistering of events.
"""
if hasattr(self, attribute):
listener = getattr(self, attribute)
else:
listener = None
if listener is None:
listener = Atspi.EventListener.new(callback, None)
setattr(self, attribute, listener)
listener.register(event)
def atspi_disconnect(self, attribute, event):
"""
Stop listening to AT-SPI event.
"""
listener = getattr(self, attribute)
listener.deregister(event)
def freeze(self):
"""
Freeze AT-SPI message processing, e.g. while displaying
a dialog or popoup menu.
"""
self._register_atspi_listeners(False)
self._frozen = True
def thaw(self):
"""
Resume AT-SPI message processing.
"""
self._update_listeners()
self._frozen = False
def emit_async(self, event_name, *args, **kwargs):
if not self._frozen:
EventSource.emit_async(self, event_name, *args, **kwargs)
########## synchronous handlers ##########
def _on_atspi_global_focus(self, event, user_data):
self._on_atspi_focus(event, True)
def _on_atspi_object_focus(self, event, user_data):
self._on_atspi_focus(event)
def _on_atspi_focus(self, event, focus_received = False):
focused = bool(focus_received) or bool(event.detail1) # received focus?
ae = AsyncEvent(accessible = event.source,
focused = focused)
self.emit_async("async-focus-changed", ae)
def _on_atspi_text_changed(self, event, user_data):
#print("_on_atspi_text_changed", event.detail1, event.detail2, event.source, event.type, event.type.endswith("delete"))
ae = AsyncEvent(accessible = event.source,
type = event.type,
pos = event.detail1,
length = event.detail2)
self.emit_async("async-text-changed", ae)
return False
def _on_atspi_text_caret_moved(self, event, user_data):
#print("_on_atspi_text_caret_moved", event.detail1, event.detail2, event.source, event.type, event.source.get_name(), event.source.get_role())
ae = AsyncEvent(accessible = event.source,
caret = event.detail1)
self.emit_async("async-text-caret-moved", ae)
return False
def _on_atspi_keystroke(self, event, user_data):
if event.type == Atspi.EventType.KEY_PRESSED_EVENT:
_logger.debug("key-stroke {} {} {} {}" \
.format(event.modifiers,
event.hw_code, event.id, event.is_text))
#keysym = event.id # What is this? Not XK_ keysyms apparently.
ae = AsyncEvent(hw_code = event.hw_code,
modifiers = event.modifiers)
self.emit_async("key-pressed", ae)
return False # don't consume event
########## asynchronous handlers ##########
def _on_async_focus_changed(self, event):
accessible = event.accessible
focused = event.focused
self._state = {}
# Don't access the accessible while frozen. This leads to deadlocks
# while displaying Onboard's own dialogs/popup menu's.
if not self._frozen:
self._log_accessible(accessible, focused)
if not accessible is None:
try:
self._state = \
self._read_initial_accessible_state(accessible)
except Exception as ex: # Private exception gi._glib.GError when
# gedit became unresponsive.
_logger.info("_on_focus_changed(): "
"invalid accessible, failed to read state: " \
+ unicode_str(ex))
editable = self._is_accessible_editable(self._state)
activate = focused and editable
if focused:
self._focused_accessible = accessible
elif not focused and self._focused_accessible == accessible:
self._focused_accessible = None
else:
activate = None
if not activate is None:
if activate:
active_accessible = self._focused_accessible
else:
active_accessible = None
self._set_active_accessible(active_accessible)
def _set_active_accessible(self, accessible):
self._active_accessible = accessible
if not self._active_accessible is None or \
not self._last_active_accessible is None:
if not accessible is None:
try:
self._state.update( \
self._read_remaining_accessible_state(accessible))
except Exception as ex: # Private exception gi._glib.GError when
# gedit became unresponsive.
_logger.info("_set_active_accessible(): "
"invalid accessible, failed to read remaining state: " \
+ unicode_str(ex))
# notify listeners
self.emit("text-entry-activated",
self._active_accessible)
self._last_active_accessible = self._active_accessible
def _on_async_text_changed(self, event):
if event.accessible is self._active_accessible:
type = event.type
insert = type.endswith(("insert", "insert:system"))
delete = type.endswith(("delete", "delete:system"))
#print(event.accessible.get_id(), type, insert)
if insert or delete:
event.insert = insert
self.emit("text-changed", event)
else:
_logger.warning("_on_async_text_changed: "
"unknown event type '{}'" \
.format(event.type))
def _on_async_text_caret_moved(self, event):
if event.accessible is self._active_accessible:
self.emit("text-caret-moved", event)
def get_state(self):
""" All available state of the active accessible """
return self._state
def get_role(self):
""" Role of the active accessible """
return self._state.get("role")
def get_state_set(self):
""" State set of the active accessible """
return self._state.get("state-set")
def is_single_line(self):
""" Is active accessible a single line text entry? """
state_set = self.get_state_set()
return state_set and state_set.contains(Atspi.StateType.SINGLE_LINE)
def get_extents(self):
""" Screen rect of the active accessible """
return self._state.get("extents", Rect())
@staticmethod
def get_accessible_extents(accessible):
""" Screen rect of the given accessible, no caching """
try:
ext = accessible.get_extents(Atspi.CoordType.SCREEN)
except Exception as ex: # private exception gi._glib.GError when
# right clicking onboards unity2d launcher (Precise)
_logger.info("Invalid accessible,"
" failed to get extents: " + unicode_str(ex))
return Rect()
return Rect(ext.x, ext.y, ext.width, ext.height)
@staticmethod
def get_accessible_text(accessible, begin, end):
""" Text of the given accessible, no caching """
try:
text = Atspi.Text.get_text(accessible, begin, end)
except Exception as ex: # private exception
# gi._glib.GError: timeout from dbind
# with web search in firefox.
_logger.info("Invalid accessible,"
" failed to get text: " + unicode_str(ex))
return None
return text
def _is_accessible_editable(self, acc_state):
""" Is this an accessible onboard should be shown for? """
role = acc_state.get("role")
state_set = acc_state.get("state-set")
if not state_set is None:
if role in [Atspi.Role.TEXT,
Atspi.Role.TERMINAL,
Atspi.Role.DATE_EDITOR,
Atspi.Role.PASSWORD_TEXT,
Atspi.Role.EDITBAR,
Atspi.Role.ENTRY,
Atspi.Role.DOCUMENT_TEXT,
Atspi.Role.DOCUMENT_FRAME,
Atspi.Role.DOCUMENT_EMAIL,
Atspi.Role.SPIN_BUTTON,
Atspi.Role.COMBO_BOX,
Atspi.Role.DATE_EDITOR,
Atspi.Role.PARAGRAPH, # LibreOffice Writer
Atspi.Role.HEADER,
Atspi.Role.FOOTER,
]:
if role in [Atspi.Role.TERMINAL] or \
(not state_set is None and \
state_set.contains(Atspi.StateType.EDITABLE)):
return True
return False
def _read_initial_accessible_state(self, accessible):
"""
Read just enough to find out if we are intereseted in this accessible.
"""
state = {}
state["role"] = accessible.get_role()
state["state-set"] = accessible.get_state_set()
return state
def _read_remaining_accessible_state(self, accessible):
"""
Read more attributes and find out as much as we
can about the accessibles purpose.
"""
state = {}
state["attributes"] = accessible.get_attributes()
state["interfaces"] = accessible.get_interfaces()
ext = accessible.get_extents(Atspi.CoordType.SCREEN)
state["extents"] = Rect(ext.x, ext.y, ext.width, ext.height)
# These are currently used only in debug output
if _logger.isEnabledFor(logging.DEBUG):
state["id"] = accessible.get_id()
state["name"] = accessible.get_name()
pid = accessible.get_process_id()
state["process-id"] = pid
if pid != -1:
state["process-name"] = Process.get_process_name(pid)
app = accessible.get_application()
if app:
state["app-name"] = app.get_name()
state["app-description"] = app.get_description()
return state
def _log_accessible(self, accessible, focused):
if _logger.isEnabledFor(logging.DEBUG):
msg = "AT-SPI focus event: focused={}, ".format(focused)
if not accessible:
msg += "accessible={}".format(accessible)
else:
name = "unknown"
role = None
role_name = None
editable = None
states = None
extents = None
try:
name = accessible.get_name()
role = accessible.get_role()
role_name = accessible.get_role_name()
state_set = accessible.get_state_set()
states = state_set.states
editable = state_set.contains(Atspi.StateType.EDITABLE) \
if state_set else None
ext = accessible.get_extents(Atspi.CoordType.SCREEN)
extents = Rect(ext.x, ext.y, ext.width, ext.height)
except: # private exception gi._glib.GError when gedit became unresponsive
pass
msg += "name={name}, role={role}({role_name}), " \
"editable={editable}, states={states}, " \
"extents={extents}]" \
.format(name=name,
role = role,
role_name = role_name,
editable = editable,
states = states,
extents = extents \
)
_logger.debug(msg)
class AtspiStateType:
states = ['ACTIVE',
'ANIMATED',
'ARMED',
'BUSY',
'CHECKED',
'COLLAPSED',
'DEFUNCT',
'EDITABLE',
'ENABLED',
'EXPANDABLE',
'EXPANDED',
'FOCUSABLE',
'FOCUSED',
'HAS_TOOLTIP',
'HORIZONTAL',
'ICONIFIED',
'INDETERMINATE',
'INVALID',
'INVALID_ENTRY',
'IS_DEFAULT',
'LAST_DEFINED',
'MANAGES_DESCENDANTS',
'MODAL',
'MULTISELECTABLE',
'MULTI_LINE',
'OPAQUE',
'PRESSED',
'REQUIRED',
'RESIZABLE',
'SELECTABLE',
'SELECTABLE_TEXT',
'SELECTED',
'SENSITIVE',
'SHOWING',
'SINGLE_LINE',
'STALE',
'SUPPORTS_AUTOCOMPLETION',
'TRANSIENT',
'TRUNCATED',
'VERTICAL',
'VISIBLE',
'VISITED',
]
@staticmethod
def to_strings(state_set):
result = []
for s in AtspiStateType.states:
if state_set.contains(getattr(Atspi.StateType, s)):
result.append(s)
return result
|