/usr/share/pyshared/hotwire_ui/odisp.py is in hotwire 0.721-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 | # This file is part of the Hotwire Shell user interface.
#
# Copyright (C) 2007 Colin Walters <walters@verbum.org>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
import os,sys,re,Queue,logging,inspect,locale
import gtk, gobject, pango
from hotwire.util import class_is_assignable
from hotwire.command import CommandQueue, Pipeline
from hotwire_ui.render import ClassRendererMapping, DefaultObjectsRenderer
from hotwire.logutil import log_except
import hotwire_ui.widgets as hotwidgets
_logger = logging.getLogger("hotwire.ui.ODisp")
class ObjectsDisplay(gtk.VBox):
__gsignals__ = {
"object-input" : (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, (gobject.TYPE_PYOBJECT,)),
"status-changed" : (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, ()),
}
def __init__(self, output_spec, context, **kwargs):
super(ObjectsDisplay, self).__init__(**kwargs)
self.__context = context
self.__box = gtk.VBox()
self.add(self.__box)
self.__scroll = gtk.ScrolledWindow()
self.__scroll.set_policy(gtk.POLICY_AUTOMATIC, gtk.POLICY_AUTOMATIC)
vadjust = self.__scroll.get_vadjustment()
vadjust.connect('value-changed', self.__on_scroll_value_changed)
self.__search = None
self.__inputarea = None
self.__output_type = None
self.__old_focus = None
self.__box.pack_start(self.__scroll, expand=True)
self.__display = None
self.__add_display(output_spec)
self.__doing_autoscroll = False
self.__user_scrolled = False
self.__autoscroll_id = 0
self._common_supertype = None
def __add_display(self, output_spec, force=False):
if output_spec != 'any':
self.__display = ClassRendererMapping.getInstance().lookup(output_spec, self.__context)
if not self.__display and force:
self.__display = DefaultObjectsRenderer(self.__context)
if self.__display:
self.__display.connect('status-changed', self.__on_status_changed)
self.__display_widget = self.__display.get_widget()
self.__display_widget.show_all()
self.__scroll.add(self.__display_widget)
self.__output_type = output_spec
def __on_status_changed(self, renderer):
self.emit('status-changed')
self.do_autoscroll()
def start_search(self, old_focus):
try:
self.__display.start_search()
return True
except NotImplementedError, e:
pass
if self.__search is None:
self.__search = self.__display.get_search()
if self.__search is not True:
self.__box.pack_start(self.__search, expand=False)
self.__search.connect("close", self.__on_search_close)
self.__old_focus = old_focus
if self.__search is not True:
self.__search.show_all()
self.__search.focus()
return True
return False
def __on_search_close(self, search):
if self.__search is not True:
self.__search.hide()
if self.__old_focus:
self.__old_focus.grab_focus()
def supports_input(self):
return self.__display and self.__display.supports_input()
def start_input(self, old_focus):
if self.__inputarea is None:
self.__inputarea = self.__display.get_input()
if self.__inputarea is not True:
self.__box.pack_start(self.__inputarea, expand=False)
self.__inputarea.connect("close", self.__on_inputarea_close)
self.__inputarea.connect("object-input", self.__on_object_input)
self.__old_focus = old_focus
if self.__inputarea is not True:
self.__inputarea.show_all()
self.__inputarea.focus()
def __on_object_input(self, ia, obj, *args):
_logger.debug("got interactive object input: %s", obj)
self.emit('object-input', obj)
def __on_inputarea_close(self, search):
if self.__inputarea is not True:
self.__inputarea.hide()
if self.__old_focus:
self.__old_focus.grab_focus()
def get_opt_formats(self):
if self.__display:
return self.__display.get_opt_formats()
return []
def get_status_str(self):
return self.__display and self.__display.get_status_str()
def get_objects(self):
if self.__display:
for obj in self.__display.get_objects():
yield obj
else:
raise ValueError("Can't get object snapshot, no display")
def get_selected_objects(self):
if self.__display:
for obj in self.__display.get_selected_objects():
yield obj
else:
raise ValueError("Can't get object snapshot, no display")
def get_output_type(self):
"""Return the typespec for the current pipeline. See Pipeline
for a description of typespecs."""
return self.__output_type
def __recurse_get_common_superclass(self, c1, c2):
for base in c1.__bases__:
if base == c2:
return base
tmp = self.__recurse_get_common_superclass(base, c2)
if tmp:
return tmp
def __get_common_superclass(self, c1, c2):
if c1 == c2:
return c1
if isinstance(c2, c1):
(c1, c2) = (c2, c1)
elif not isinstance(c1, c2):
return object
return self.__recurse_get_common_superclass(c1, c2)
def get_output_common_supertype(self):
"""Return the common Python supertype inspected from the current stream."""
return self._common_supertype
def append_object(self, obj, fmt=None, **kwargs):
if fmt is None:
otype = type(obj)
# This is kind of a hack.
elif fmt in ('bytearray/chunked' 'x-filedescriptor/special'):
otype = str
# If we don't have a display at this point, it means we have a dynamically-typed
# object stream. In that case, force the issue and add the default display.
if not self.__display:
self.__add_display(otype, force=True)
# Determine common supertype so we can display it.
if self._common_supertype is None:
self._common_supertype = otype
elif self._common_supertype is not object:
self._common_supertype = self.__get_common_superclass(otype, self._common_supertype)
# Actually append.
if fmt is not None:
kwargs['fmt'] = fmt
self.__display.append_obj(obj, **kwargs)
def __vadjust(self, pos, full, forceuser=False):
adjustment = self.__scroll.get_vadjustment()
if not full:
val = self.__scroll.get_vadjustment().page_increment
if not pos:
val = 0 - val;
newval = adjustment.value + val
else:
if pos:
newval = adjustment.upper
else:
newval = adjustment.lower
newval = max(min(newval, adjustment.upper-adjustment.page_size), adjustment.lower)
adjustment.value = newval
def __on_scroll_value_changed(self, vadjust):
upper = vadjust.upper - vadjust.page_size
if upper - vadjust.value < (vadjust.page_size/3):
self.__user_scrolled = False
else:
self.__user_scrolled = True
def scroll_up(self, full, forceuser=True):
self.__vadjust(False, full)
def scroll_down(self, full, forceuser=True):
self.__vadjust(True, full)
def do_copy(self):
if self.__display:
return self.__display.do_copy()
return False
def __idle_do_autoscroll(self):
vadjust = self.__scroll.get_vadjustment()
vadjust.value = max(vadjust.lower, vadjust.upper - vadjust.page_size)
self.__autoscroll_id = 0
def do_autoscroll(self):
if self.__display and self.__display.get_autoscroll():
if not self.__user_scrolled:
if self.__autoscroll_id == 0:
self.__autoscroll_id = gobject.timeout_add(150, self.__idle_do_autoscroll)
class MultiObjectsDisplay(gtk.Notebook):
__gsignals__ = {
"primary-complete" : (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, ()),
"changed" : (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, ()),
}
def __init__(self, context, pipeline):
super(MultiObjectsDisplay, self).__init__()
self.__context = context
self.__pipeline = pipeline
self.__cancelled = False
self.__default_odisp = None
self.__queues = {}
self.__ocount = 0
self.__do_autoswitch = True
self.__suppress_noyield = not not list(pipeline.get_status_commands())
self.set_show_tabs(False)
self.__inputqueue = None
intype = self.__pipeline.get_input_type()
_logger.debug("input type %s opt: %s", intype, self.__pipeline.get_input_optional())
# FIXME assume for the moment we can only input strings; also explicitly avoid allowing input for 'any'
# Long term we might consider only allowing input for SysBuiltin.
if intype not in (None, 'any') and Pipeline.streamtype_is_assignable(intype, str, False) and self.__pipeline.get_input_optional():
self.__inputqueue = CommandQueue()
self.__pipeline.set_input_queue(self.__inputqueue)
self.append_ostream(pipeline.get_output_type(), None, pipeline.get_output(), False)
for aux in pipeline.get_auxstreams():
self.append_ostream(aux.schema.otype, aux.name, aux.queue, aux.schema.merge_default)
def start_search(self, old_focus):
self.__default_odisp.start_search(old_focus)
def supports_input(self):
return self.__default_odisp and self.__default_odisp.supports_input()
def start_input(self, old_focus):
self.__default_odisp.start_input(old_focus)
def do_copy(self):
return self.__default_odisp.do_copy()
def get_opt_formats(self):
if self.__default_odisp:
return self.__default_odisp.get_opt_formats()
return []
def get_pipeline(self):
return self.__pipeline
def get_output_common_supertype(self):
if self.__default_odisp:
return self.__default_odisp.get_output_common_supertype()
return None
def make_snapshot(self, selected=False):
odisp = self.__default_odisp
if selected:
objs = self.__default_odisp.get_selected_objects()
else:
objs = self.__default_odisp.get_objects()
# snapshot it - FIXME this should really be async
objs = list(objs)
if self.__pipeline.is_singlevalue:
return objs[0]
return objs
def append_ostream(self, otype, name, queue, merged):
label = name or ''
if merged:
odisp = self.__default_odisp
elif not (otype is None):
odisp = ObjectsDisplay(otype, self.__context)
if name is None:
self.__default_odisp = odisp
odisp.connect('object-input', self.__on_object_input)
odisp.connect('status-changed', self.__on_status_change)
self.__default_odisp
self.insert_page(odisp, position=0)
self.set_tab_label_text(odisp, name or 'Default')
odisp.show_all()
elif not self.__suppress_noyield:
self.__noobjects = gtk.Label()
self.__noobjects.set_alignment(0, 0)
self.__noobjects.set_markup('<i>(Pipeline yields no objects)</i>')
self.__noobjects.show()
self.insert_page(self.__noobjects, position=0)
odisp = None
else:
odisp = None
self.__queues[queue] = (odisp, name, merged)
queue.connect(self.__idle_handle_output)
def cancel(self):
self.__cancelled = True
for queue in self.__queues.iterkeys():
queue.disconnect()
def get_ocount(self):
return self.__ocount
def get_status_str(self):
return self.__default_odisp and self.__default_odisp.get_status_str()
def get_default_output_type(self):
return self.__default_odisp and self.__default_odisp.get_output_type()
def __on_object_input(self, odisp, obj):
self.__inputqueue.put(obj)
def __on_status_change(self, odisp):
self.emit("changed")
def __idle_handle_output(self, queue):
if self.__cancelled:
return False
empty = False
changed = False
(odisp, name, merged) = self.__queues[queue]
odisp_displayed = odisp in self.get_children()
active_odisp = False
maxitems = 100
i = 0
append_kwargs = {}
if queue.opt_type:
append_kwargs['fmt'] = queue.opt_type
try:
while i < maxitems:
i += 1
item = queue.get(False)
changed = True
if item is None:
if name is None:
self.emit("primary-complete")
empty = True
queue.disconnect()
break
_logger.debug("appending item: %s", item)
if odisp:
if not odisp_displayed:
self.append_page(odisp)
odisp.show_all()
self.set_tab_label_text(odisp, name or 'Default')
self.set_show_tabs(True)
odisp_displayed = True
odisp.append_object(item, **append_kwargs)
self.__ocount += 1
if self.__do_autoswitch:
self.set_current_page(self.page_num(odisp))
self.__do_autoswitch = False
active_odisp = True
else:
_logger.warn("Unexpected item %s from queue %s", item, name)
except Queue.Empty:
pass
if empty:
del self.__queues[queue]
if active_odisp:
odisp.do_autoscroll()
if changed:
self.emit("changed")
return (not empty) and (i == maxitems)
def scroll_up(self, full=False):
self.get_nth_page(self.get_current_page()).scroll_up(full)
def scroll_down(self, full=False):
self.get_nth_page(self.get_current_page()).scroll_down(full)
|