This file is indexed.

/usr/share/pyshared/x2go/cache.py is in python-x2go 0.1.1.8-0ubuntu1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# -*- coding: utf-8 -*-

# Copyright (C) 2010-2011 by Mike Gabriel <mike.gabriel@das-netzwerkteam.de>
#
# Python X2go is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# Python X2go is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the
# Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.

"""\
X2goListSessionCache class - caching X2go session information.

"""
__NAME__ = 'x2gocache-pylib'

# modules
import copy
import gevent

# Python X2go modules
import log
import x2go_exceptions

class X2goListSessionsCache(object):
    """\
    For non-blocking operations in client applications using Python X2go, it is
    recommended to enable the L{X2goListSessionsCache}. This can be done by calling
    the constructor of the L{X2goClient} class.

    The session list and desktop cache gets updated in regular intervals by a threaded
    L{X2goSessionGuardian} instance. For the session list and desktop list update, the
    X2go server commands C{x2golistsessions} and C{x2godesktopsessions} are called and
    the command's stdout is cached in the session list cache.

    Whenever your client application needs access to either the server's session list
    or the server's desktop list the session cache is queried instead. This assures that
    the server's session/desktop list is available without delay, even on slow internet
    connections.

    """
    x2go_listsessions_cache = {}

    def __init__(self, client_instance, logger=None, loglevel=log.loglevel_DEFAULT):
        """\
        @param client_instance: the L{X2goClient} instance that uses this L{X2goListSessionsCache}
        @type client_instance: C{instance}
        @param logger: you can pass an L{X2goLogger} object to the L{X2goListSessionsCache} constructor
        @type logger: C{instance}
        @param loglevel: if no L{X2goLogger} object has been supplied a new one will be
            constructed with the given loglevel
        @type loglevel: C{int}

        """
        self.x2go_listsessions_cache = {}
        self.last_listsessions_cache = {}

        if logger is None:
            self.logger = log.X2goLogger(loglevel=loglevel)
        else:
            self.logger = copy.deepcopy(logger)
        self.logger.tag = __NAME__

        self.client_instance = client_instance

    def delete(self, profile_name):
        """\
        Remove session list from cache for a given profile.

        @param profile_name: name of profile to operate on
        @type profile_name: C{str}

        """
        try: del self.x2go_listsessions_cache[profile_name]
        except KeyError: pass

    def check_cache(self):
        """\
        Check if session list cache elements are still valid (i.e. if all corresponding
        session profiles are still connected). If not so, remove invalid cache entries from
        the session list cache.

        """
        for profile_name in self.x2go_listsessions_cache.keys():
            if profile_name not in self.client_instance.client_connected_profiles(return_profile_names=True):
                del self.x2go_listsessions_cache[profile_name]

    def update_all(self, update_sessions=True, update_desktops=False):
        """\
        Update L{X2goListSessionsCache} for all connected session profiles.

        @param update_sessions: cache recent session lists from all connected servers
        @type update_sessions: C{bool}
        @param update_desktops: cache recent desktop lists from all connected servers
        @type update_desktops: C{bool}

        """
        for profile_name in self.client_instance.client_connected_profiles(return_profile_names=True):
            self.update(profile_name, update_sessions=update_sessions, update_desktops=update_desktops)

        self.check_cache()

    def update(self, profile_name, update_sessions=True, update_desktops=False):
        """\
        Update L{X2goListSessionsCache} (i.e. session/desktops) for session profile C{profile_name}.

        @param profile_name: name of profile to update
        @type profile_name: C{str}
        @param update_sessions: cache recent session list from server
        @type update_sessions: C{bool}
        @param update_desktops: cache recent desktop list from server
        @type update_desktops: C{bool}

        """
        self.last_listsessions_cache = copy.deepcopy(self.x2go_listsessions_cache)
        control_session = self.client_instance.client_control_session_of_profile_name(profile_name)
        if not self.x2go_listsessions_cache.has_key(profile_name):
            self.x2go_listsessions_cache[profile_name] = {'sessions': None, 'desktops': None, }
        if update_sessions:
            self._update_sessions(profile_name, control_session)
        if update_desktops:
            self._update_desktops(profile_name, control_session)

    def _update_desktops(self, profile_name, control_session):
        """\
        Update session lists of L{X2goListSessionsCache} for session profile C{profile_name}.

        @param profile_name: name of profile to update
        @type profile_name: C{str}

        """
        try:
            self.x2go_listsessions_cache[profile_name]['desktops'] = control_session.list_desktops()
        except x2go_exceptions.X2goControlSessionException, e:
            try:
                del self.x2go_listsessions_cache[profile_name]
            except KeyError:
                pass
            raise e

    def _update_sessions(self, profile_name, control_session):
        """\
        Update desktop list of L{X2goListSessionsCache} for session profile C{profile_name}.

        @param profile_name: name of profile to update
        @type profile_name: C{str}

        """
        try:
            self.x2go_listsessions_cache[profile_name]['sessions'] = control_session.list_sessions()
        except x2go_exceptions.X2goControlSessionException, e:
            try:
                del self.x2go_listsessions_cache[profile_name]
            except KeyError:
                pass
            raise e

    def list_sessions(self, session_uuid):
        """\
        Retrieve a session list from the current cache content of L{X2goListSessionsCache}
        for a given L{X2goSession} instance (specified by its unique session UUID).

        @param session_uuid: unique identifier of session to query cache for
        @type session_uuid: C{str}
        @return: a data object containing available session information
        @rtype: C{X2goServerSessionList*} instance

        """
        profile_name = self.client_instance.get_session_profile_name(session_uuid)
        if self.is_cached(session_uuid=session_uuid):
            return self.x2go_listsessions_cache[profile_name]['sessions']
        else:
            return None

    def list_desktops(self, session_uuid):
        """\
        Retrieve a list of available desktop sessions from the current cache content of
        L{X2goListSessionsCache} for a given L{X2goSession} instance (specified by its 
        unique session UUID).

        @param session_uuid: unique identifier of session to query cache for
        @type session_uuid: C{str}
        @return: a list of strings representing X2go desktop sessions available for sharing
        @rtype: C{list}

        """
        profile_name = self.client_instance.get_session_profile_name(session_uuid)
        if self.is_cached(session_uuid=session_uuid):
            return self.x2go_listsessions_cache[profile_name]['desktops']
        else:
            return None

    def is_cached(self, profile_name=None, session_uuid=None, cache_type=None):
        """\
        Check if session list is cached.

        @param profile_name: name of profile to update
        @type profile_name: C{str}
        @param session_uuid: unique identifier of session to query cache for
        @type session_uuid: C{str}

        """
        if profile_name is None and session_uuid:
            profile_name = self.client_instance.get_session_profile_name(session_uuid)
        _is_profile_cached = self.x2go_listsessions_cache.has_key(profile_name)
        _is_cache_type_cached = _is_profile_cached and self.x2go_listsessions_cache[profile_name].has_key(cache_type)
        if cache_type is None:
            return _is_profile_cached
        else:
            return _is_cache_type_cached