This file is indexed.

/usr/lib/python2.7/dist-packages/keysign/avahidiscovery.py is in gnome-keysign 0.9-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
#!/usr/bin/env python
#    Copyright 2016 Tobias Mueller <muelli@cryptobitch.de>
#
#    This file is part of GNOME Keysign.
#
#    GNOME Keysign is free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation, either version 3 of the License, or
#    (at your option) any later version.
#
#    GNOME Keysign is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License
#    along with GNOME Keysign.  If not, see <http://www.gnu.org/licenses/>.

import logging
import os
import sys

from requests.exceptions import ConnectionError

from gi.repository import GObject, GLib


if  __name__ == "__main__" and __package__ is None:
    logging.getLogger().error("You seem to be trying to execute " +
                              "this script directly which is discouraged. " +
                              "Try python -m instead.")
    parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
    os.sys.path.insert(0, parent_dir)
    os.sys.path.insert(0, os.path.join(parent_dir, 'monkeysign'))
    import keysign
    #mod = __import__('keysign')
    #sys.modules["keysign"] = mod
    __package__ = str('keysign')


from .util import strip_fingerprint, download_key_http, parse_barcode

try:
    from .gpgmh import fingerprint_from_keydata
except ImportError:
    # FIXME: Remove this conditional
    from .gpgmh import fingerprint_for_key as fingerprint_from_keydata

from .network.AvahiBrowser import AvahiBrowser
from .util import mac_verify

log = logging.getLogger(__name__)

class AvahiKeysignDiscovery(GObject.GObject):
    "A client discovery using Avahi"

    __gsignals__ = {
        # Gets emitted whenever a new server has been found or has been removed.
        # Is also emitted shortly after an object has been created.
        str("list-changed"): (GObject.SIGNAL_RUN_LAST, None, (int,)),
    }

    def __init__(self, *args, **kwargs):
        super(AvahiKeysignDiscovery, self).__init__(*args, **kwargs)
        self.log = logging.getLogger(__name__)
        # We should probably try to put this constant in a more central place
        avahi_service_type = '_gnome-keysign._tcp'
        self.avahi_browser = AvahiBrowser(service=avahi_service_type)
        self.avahi_browser.connect('new_service', self.on_new_service)
        self.avahi_browser.connect('remove_service', self.on_remove_service)
        self.discovered_services = []
        # It seems we cannot emit directly...
        GLib.idle_add(lambda: self.emit("list-changed",
            len(self.discovered_services)))

    def on_new_service(self, browser, name, address, port, txt_dict):
        published_fpr = txt_dict.get('fingerprint', None)
        self.log.info("discovered something: %s %s:%i:%s",
                      name, address, port, published_fpr)
        if not address.startswith('fe80::'):
            # We intend to ignore IPv6 link local addresses, because it seems
            # that you cannot just connect to that address without also
            # knowing which NIC the address belongs to.
            # http://serverfault.com/a/794967
            # FIXME: Use something more sane like attr.s instead of the tuple
            self.discovered_services += ((name, address, port, published_fpr), )
            self.emit("list-changed", len(self.discovered_services))

    def on_remove_service(self, browser, service_type, name):
        '''Handler for the on_remove signal from AvahiBrowser

        Removes a service from the internal list by calling
        remove_discovered_service.
        '''
        self.log.info("Received a remove signal, let's check; %s:%s",
                      service_type, name)
        self.remove_discovered_service(name)

    def remove_discovered_service(self, name):
        '''Removes server-side clients from discovered_services list
        when the server name with fpr is a match.'''
        for client in self.discovered_services:
            if client[0] == name:
                self.discovered_services.remove(client)
                self.emit("list-changed", len(self.discovered_services))
        self.log.info("Clients currently in list '%s'",
                      self.discovered_services)

    def find_key(self, userdata):
        "Returns the key if it thinks it found one..."
        self.log.info("Trying to find key with %r", userdata)
        parsed = parse_barcode(userdata)
        cleaned = strip_fingerprint(parsed["fingerprint"])
        downloaded_key = None
        # FIXME: Replace with attr.ib
        for (name, address, port, fpr) in self.discovered_services:
            if cleaned == fpr:
                # This is blocking :-/
                try:
                    downloaded_key = download_key_http(address, port)
                    if fingerprint_from_keydata(downloaded_key) != cleaned:
                        continue
                except ConnectionError:
                    self.log.exception("Error downloading from %r:%r",
                                  address, port)
        return downloaded_key

class AvahiKeysignDiscoveryWithMac(AvahiKeysignDiscovery):
    def find_key(self, userdata):
        "Returns the key if it thinks it found one which also matched the MAC"
        key = super(AvahiKeysignDiscoveryWithMac, self).find_key(userdata)
        if key:
            # For now, we cannot assume that a MAC exists, simply because
            # currently the MAC is only transferred via the barcode.
            # The user, however, might as well enter the fingerprint
            # manually.  Unless we stop allowing that, we won't have a MAC.
            mac = parse_barcode(userdata).get("MAC", [None])[0]
            if mac is None:
                # This is the ugly shortcut which exists for legacy reasons
                verified_key = key
            else:
                mac_key = fingerprint_from_keydata(key)
                verified = mac_verify(mac_key.encode('ascii'), key, mac)
                if verified:
                    verified_key = key
                else:
                    self.log.info("MAC validation failed: %r", verified)
                    verified_key = None
        else:
            verified_key = None

        return verified_key

def main(args):
    log = logging.getLogger(__name__)
    log.debug('Running main with args: %s', args)
    if not args:
        raise ValueError("You must provide an argument to identify the key")

    loop = GObject.MainLoop()

    arg = args[0]
    # FIXME: Enable parameter
    timeout = 5
    GObject.timeout_add_seconds(timeout, lambda: loop.quit())

    discover = AvahiKeysignDiscovery()
    # We quickly attach the found to the object to maintain state
    discover.found_key = None
    def find_key():
        keydata = discover.find_key(arg)
        if keydata:
            log.info("Found %d key bytes", len(keydata))
            discover.found_key = keydata
            print (keydata)
            loop.quit()
        return not keydata

    discover.avahi_browser.connect('new_service', lambda *args: find_key())
    # Instead of using this implementation detail for getting the notification,
    # it would be possible to repeatedly call find_key.
    # GObject.timeout_add(25, lambda: find_key() and False)
    # GObject.timeout_add(500, find_key)
    loop.run()
    if not discover.found_key:
        log.error("No Key found for %r!!1", arg)

if __name__ == '__main__':
    logging.basicConfig(stream=sys.stderr, level=logging.DEBUG,
            format='%(name)s (%(levelname)s): %(message)s')
    sys.exit(main(sys.argv[1:]))