/usr/lib/python3/dist-packages/dcos/emitting.py is in python3-dcos 0.2.0-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 | from __future__ import print_function
import abc
import collections
import json
import os
import pydoc
import re
import sys
import pager
import pygments
import six
from dcos import constants, errors, util
from pygments.formatters import Terminal256Formatter
from pygments.lexers import JsonLexer
logger = util.get_logger(__name__)
class Emitter(object):
"""Abstract class for emitting events."""
@abc.abstractmethod
def publish(self, event):
"""Publishes an event.
:param event: event to publish
:type event: any
"""
raise NotImplementedError
class FlatEmitter(Emitter):
"""Simple emitter that sends all publish events to the provided handler.
If no handler is provider then use :py:const:`DEFAULT_HANDLER`.
:param handler: event handler to call when publish is called
:type handler: func(event) where event is defined in
:py:func:`FlatEmitter.publish`
"""
def __init__(self, handler=None):
if handler is None:
self._handler = DEFAULT_HANDLER
else:
self._handler = handler
def publish(self, event):
"""Publishes an event.
:param event: event to publish
:type event: any
"""
self._handler(event)
def print_handler(event):
"""Default handler for printing event to stdout.
:param event: event to emit to stdout
:type event: str, dict, list, or dcos.errors.Error
"""
pager_command = os.environ.get(constants.DCOS_PAGER_COMMAND_ENV)
if event is None:
# Do nothing
pass
elif isinstance(event, six.string_types):
_page(event, pager_command)
elif isinstance(event, errors.Error):
print(event.error(), file=sys.stderr)
sys.stderr.flush()
elif (isinstance(event, collections.Mapping) or
isinstance(event, collections.Sequence) or isinstance(event, bool) or
isinstance(event, six.integer_types) or isinstance(event, float)):
# These are all valid JSON types let's treat them different
processed_json = _process_json(event, pager_command)
_page(processed_json, pager_command)
elif isinstance(event, errors.DCOSException):
print(event, file=sys.stderr)
else:
logger.debug('Printing unknown type: %s, %r.', type(event), event)
_page(event, pager_command)
def publish_table(emitter, objs, table_fn, json_):
"""Publishes a json representation of `objs` if `json_` is True,
otherwise, publishes a table representation.
:param emitter: emitter to use for publishing
:type emitter: Emitter
:param objs: objects to print
:type objs: [object]
:param table_fn: function used to generate a PrettyTable from `objs`
:type table_fn: objs -> PrettyTable
:param json_: whether or not to publish a json representation
:type json_: bool
:rtype: None
"""
if json_:
emitter.publish(objs)
else:
table = table_fn(objs)
output = str(table)
if output:
emitter.publish(output)
def _process_json(event, pager_command):
"""Conditionally highlights the supplied JSON value.
:param event: event to emit to stdout
:type event: str, dict, list, or dcos.errors.Error
:returns: String representation of the supplied JSON value,
possibly syntax-highlighted.
:rtype: str
"""
json_output = json.dumps(event, sort_keys=True, indent=2)
# Strip trailing whitespace
json_output = re.sub(r'\s+$', '', json_output, 0, re.M)
force_colors = False # TODO(CD): Introduce a --colors flag
if not sys.stdout.isatty():
if force_colors:
return _highlight_json(json_output)
else:
return json_output
supports_colors = not util.is_windows_platform()
pager_is_set = pager_command is not None
should_highlight = force_colors or supports_colors and not pager_is_set
if should_highlight:
json_output = _highlight_json(json_output)
return json_output
def _page(output, pager_command=None):
"""Conditionally pipes the supplied output through a pager.
:param output:
:type output: object
:param pager_command:
:type pager_command: str
"""
output = str(output)
if pager_command is None:
pager_command = 'less -R'
if not sys.stdout.isatty() or util.is_windows_platform():
print(output)
return
num_lines = output.count('\n')
exceeds_tty_height = pager.getheight() - 1 < num_lines
if exceeds_tty_height:
pydoc.pipepager(output, cmd=pager_command)
else:
print(output)
def _highlight_json(json_value):
"""
:param json_value: JSON value to syntax-highlight
:type json_value: dict, list, number, string, boolean, or None
:returns: A string representation of the supplied JSON value,
highlighted for a terminal that supports ANSI colors.
:rtype: str
"""
return pygments.highlight(
json_value, JsonLexer(), Terminal256Formatter()).strip()
DEFAULT_HANDLER = print_handler
"""The default handler for an emitter: :py:func:`print_handler`."""
|