/usr/lib/python2.7/dist-packages/xmpp/session.py is in python-xmpp 0.4.1-cvs20080505.4.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 | ##
## XMPP server
##
## Copyright (C) 2004 Alexey "Snake" Nezhdanov
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2, or (at your option)
## any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
__version__="$Id"
"""
When your handler is called it is getting the session instance as the first argument.
This is the difference from xmpppy 0.1 where you got the "Client" instance.
With Session class you can have "multi-session" client instead of having
one client for each connection. Is is specifically important when you are
writing the server.
"""
from protocol import *
# Transport-level flags
SOCKET_UNCONNECTED =0
SOCKET_ALIVE =1
SOCKET_DEAD =2
# XML-level flags
STREAM__NOT_OPENED =1
STREAM__OPENED =2
STREAM__CLOSING =3
STREAM__CLOSED =4
# XMPP-session flags
SESSION_NOT_AUTHED =1
SESSION_AUTHED =2
SESSION_BOUND =3
SESSION_OPENED =4
SESSION_CLOSED =5
class Session:
"""
The Session class instance is used for storing all session-related info like
credentials, socket/xml stream/session state flags, roster items (in case of
client type connection) etc.
Session object have no means of discovering is any info is ready to be read.
Instead you should use poll() (recomended) or select() methods for this purpose.
Session can be one of two types: 'server' and 'client'. 'server' session handles
inbound connection and 'client' one used to create an outbound one.
Session instance have multitude of internal attributes. The most imporant is the 'peer' one.
It is set once the peer is authenticated (client).
"""
def __init__(self,socket,owner,xmlns=None,peer=None):
""" When the session is created it's type (client/server) is determined from the beginning.
socket argument is the pre-created socket-like object.
It must have the following methods: send, recv, fileno, close.
owner is the 'master' instance that have Dispatcher plugged into it and generally
will take care about all session events.
xmlns is the stream namespace that will be used. Client must set this argument
If server sets this argument than stream will be dropped if opened with some another namespace.
peer is the name of peer instance. This is the flag that differentiates client session from
server session. Client must set it to the name of the server that will be connected, server must
leave this argument alone.
"""
self.xmlns=xmlns
if peer:
self.TYP='client'
self.peer=peer
self._socket_state=SOCKET_UNCONNECTED
else:
self.TYP='server'
self.peer=None
self._socket_state=SOCKET_ALIVE
self._sock=socket
self._send=socket.send
self._recv=socket.recv
self.fileno=socket.fileno
self._registered=0
self.Dispatcher=owner.Dispatcher
self.DBG_LINE='session'
self.DEBUG=owner.Dispatcher.DEBUG
self._expected={}
self._owner=owner
if self.TYP=='server': self.ID=`random.random()`[2:]
else: self.ID=None
self.sendbuffer=''
self._stream_pos_queued=None
self._stream_pos_sent=0
self.deliver_key_queue=[]
self.deliver_queue_map={}
self.stanza_queue=[]
self._session_state=SESSION_NOT_AUTHED
self.waiting_features=[]
for feature in [NS_TLS,NS_SASL,NS_BIND,NS_SESSION]:
if feature in owner.features: self.waiting_features.append(feature)
self.features=[]
self.feature_in_process=None
self.slave_session=None
self.StartStream()
def StartStream(self):
""" This method is used to initialise the internal xml expat parser
and to send initial stream header (in case of client connection).
Should be used after initial connection and after every stream restart."""
self._stream_state=STREAM__NOT_OPENED
self.Stream=simplexml.NodeBuilder()
self.Stream._dispatch_depth=2
self.Stream.dispatch=self._dispatch
self.Parse=self.Stream.Parse
self.Stream.stream_footer_received=self._stream_close
if self.TYP=='client':
self.Stream.stream_header_received=self._catch_stream_id
self._stream_open()
else:
self.Stream.stream_header_received=self._stream_open
def receive(self):
""" Reads all pending incoming data.
Raises IOError on disconnection.
Blocks until at least one byte is read."""
try: received = self._recv(10240)
except: received = ''
if len(received): # length of 0 means disconnect
self.DEBUG(`self.fileno()`+' '+received,'got')
else:
self.DEBUG('Socket error while receiving data','error')
self.set_socket_state(SOCKET_DEAD)
raise IOError("Peer disconnected")
return received
def sendnow(self,chunk):
""" Put chunk into "immidiatedly send" queue.
Should only be used for auth/TLS stuff and like.
If you just want to shedule regular stanza for delivery use enqueue method.
"""
if isinstance(chunk,Node): chunk = chunk.__str__().encode('utf-8')
elif type(chunk)==type(u''): chunk = chunk.encode('utf-8')
self.enqueue(chunk)
def enqueue(self,stanza):
""" Takes Protocol instance as argument.
Puts stanza into "send" fifo queue. Items into the send queue are hold until
stream authenticated. After that this method is effectively the same as "sendnow" method."""
if isinstance(stanza,Protocol):
self.stanza_queue.append(stanza)
else: self.sendbuffer+=stanza
if self._socket_state>=SOCKET_ALIVE: self.push_queue()
def push_queue(self,failreason=ERR_RECIPIENT_UNAVAILABLE):
""" If stream is authenticated than move items from "send" queue to "immidiatedly send" queue.
Else if the stream is failed then return all queued stanzas with error passed as argument.
Otherwise do nothing."""
# If the stream authed - convert stanza_queue into sendbuffer and set the checkpoints
if self._stream_state>=STREAM__CLOSED or self._socket_state>=SOCKET_DEAD: # the stream failed. Return all stanzas that are still waiting for delivery.
self._owner.deactivatesession(self)
for key in self.deliver_key_queue: # Not sure. May be I
self._dispatch(Error(self.deliver_queue_map[key],failreason),trusted=1) # should simply re-dispatch it?
for stanza in self.stanza_queue: # But such action can invoke
self._dispatch(Error(stanza,failreason),trusted=1) # Infinite loops in case of S2S connection...
self.deliver_queue_map,self.deliver_key_queue,self.stanza_queue={},[],[]
return
elif self._session_state>=SESSION_AUTHED: # FIXME! äÏÌÖÅÎ ÂÙÔØ ËÁËÏÊ-ÔÏ ÄÒÕÇÏÊ ÆÌÁÇ.
#### LOCK_QUEUE
for stanza in self.stanza_queue:
txt=stanza.__str__().encode('utf-8')
self.sendbuffer+=txt
self._stream_pos_queued+=len(txt) # should be re-evaluated for SSL connection.
self.deliver_queue_map[self._stream_pos_queued]=stanza # position of the stream when stanza will be successfully and fully sent
self.deliver_key_queue.append(self._stream_pos_queued)
self.stanza_queue=[]
#### UNLOCK_QUEUE
def flush_queue(self):
""" Put the "immidiatedly send" queue content on the wire. Blocks until at least one byte sent."""
if self.sendbuffer:
try:
# LOCK_QUEUE
sent=self._send(self.sendbuffer) # âÌÏËÉÒÕÀÝÁÑ ÛÔÕÞËÁ!
except:
# UNLOCK_QUEUE
self.set_socket_state(SOCKET_DEAD)
self.DEBUG("Socket error while sending data",'error')
return self.terminate_stream()
self.DEBUG(`self.fileno()`+' '+self.sendbuffer[:sent],'sent')
self._stream_pos_sent+=sent
self.sendbuffer=self.sendbuffer[sent:]
self._stream_pos_delivered=self._stream_pos_sent # Should be acquired from socket somehow. Take SSL into account.
while self.deliver_key_queue and self._stream_pos_delivered>self.deliver_key_queue[0]:
del self.deliver_queue_map[self.deliver_key_queue[0]]
self.deliver_key_queue.remove(self.deliver_key_queue[0])
# UNLOCK_QUEUE
def _dispatch(self,stanza,trusted=0):
""" This is callback that is used to pass the received stanza forth to owner's dispatcher
_if_ the stream is authorised. Otherwise the stanza is just dropped.
The 'trusted' argument is used to emulate stanza receive.
This method is used internally.
"""
self._owner.packets+=1
print self._owner.packets
if self._stream_state==STREAM__OPENED or trusted: # if the server really should reject all stanzas after he is closed stream (himeself)?
self.DEBUG(stanza.__str__(),'dispatch')
stanza.trusted=trusted
return self.Dispatcher.dispatch(stanza,self)
def _catch_stream_id(self,ns=None,tag='stream',attrs={}):
""" This callback is used to detect the stream namespace of incoming stream. Used internally. """
if not attrs.has_key('id') or not attrs['id']:
return self.terminate_stream(STREAM_INVALID_XML)
self.ID=attrs['id']
if not attrs.has_key('version'): self._owner.Dialback(self)
def _stream_open(self,ns=None,tag='stream',attrs={}):
""" This callback is used to handle opening stream tag of the incoming stream.
In the case of client session it just make some validation.
Server session also sends server headers and if the stream valid the features node.
Used internally. """
text='<?xml version="1.0" encoding="utf-8"?>\n<stream:stream'
if self.TYP=='client':
text+=' to="%s"'%self.peer
else:
text+=' id="%s"'%self.ID
if not attrs.has_key('to'): text+=' from="%s"'%self._owner.servernames[0]
else: text+=' from="%s"'%attrs['to']
if attrs.has_key('xml:lang'): text+=' xml:lang="%s"'%attrs['xml:lang']
if self.xmlns: xmlns=self.xmlns
else: xmlns=NS_SERVER
text+=' xmlns:db="%s" xmlns:stream="%s" xmlns="%s"'%(NS_DIALBACK,NS_STREAMS,xmlns)
if attrs.has_key('version') or self.TYP=='client': text+=' version="1.0"'
self.sendnow(text+'>')
self.set_stream_state(STREAM__OPENED)
if self.TYP=='client': return
if tag<>'stream': return self.terminate_stream(STREAM_INVALID_XML)
if ns<>NS_STREAMS: return self.terminate_stream(STREAM_INVALID_NAMESPACE)
if self.Stream.xmlns<>self.xmlns: return self.terminate_stream(STREAM_BAD_NAMESPACE_PREFIX)
if not attrs.has_key('to'): return self.terminate_stream(STREAM_IMPROPER_ADDRESSING)
if attrs['to'] not in self._owner.servernames: return self.terminate_stream(STREAM_HOST_UNKNOWN)
self.ourname=attrs['to'].lower()
if self.TYP=='server' and attrs.has_key('version'):
# send features
features=Node('stream:features')
if NS_TLS in self.waiting_features:
features.NT.starttls.setNamespace(NS_TLS)
features.T.starttls.NT.required
if NS_SASL in self.waiting_features:
features.NT.mechanisms.setNamespace(NS_SASL)
for mec in self._owner.SASL.mechanisms:
features.T.mechanisms.NT.mechanism=mec
else:
if NS_BIND in self.waiting_features: features.NT.bind.setNamespace(NS_BIND)
if NS_SESSION in self.waiting_features: features.NT.session.setNamespace(NS_SESSION)
self.sendnow(features)
def feature(self,feature):
""" Declare some stream feature as activated one. """
if feature not in self.features: self.features.append(feature)
self.unfeature(feature)
def unfeature(self,feature):
""" Declare some feature as illegal. Illegal features can not be used.
Example: BIND feature becomes illegal after Non-SASL auth. """
if feature in self.waiting_features: self.waiting_features.remove(feature)
def _stream_close(self,unregister=1):
""" Write the closing stream tag and destroy the underlaying socket. Used internally. """
if self._stream_state>=STREAM__CLOSED: return
self.set_stream_state(STREAM__CLOSING)
self.sendnow('</stream:stream>')
self.set_stream_state(STREAM__CLOSED)
self.push_queue() # decompose queue really since STREAM__CLOSED
self._owner.flush_queues()
if unregister: self._owner.unregistersession(self)
self._destroy_socket()
def terminate_stream(self,error=None,unregister=1):
""" Notify the peer about stream closure.
Ensure that xmlstream is not brokes - i.e. if the stream isn't opened yet -
open it before closure.
If the error condition is specified than create a stream error and send it along with
closing stream tag.
Emulate receiving 'unavailable' type presence just before stream closure.
"""
if self._stream_state>=STREAM__CLOSING: return
if self._stream_state<STREAM__OPENED:
self.set_stream_state(STREAM__CLOSING)
self._stream_open()
else:
self.set_stream_state(STREAM__CLOSING)
p=Presence(typ='unavailable')
p.setNamespace(NS_CLIENT)
self._dispatch(p,trusted=1)
if error:
if isinstance(error,Node): self.sendnow(error)
else: self.sendnow(ErrorNode(error))
self._stream_close(unregister=unregister)
if self.slave_session:
self.slave_session.terminate_stream(STREAM_REMOTE_CONNECTION_FAILED)
def _destroy_socket(self):
""" Break cyclic dependancies to let python's GC free memory right now."""
self.Stream.dispatch=None
self.Stream.stream_footer_received=None
self.Stream.stream_header_received=None
self.Stream.destroy()
self._sock.close()
self.set_socket_state(SOCKET_DEAD)
def start_feature(self,f):
""" Declare some feature as "negotiating now" to prevent other features from start negotiating. """
if self.feature_in_process: raise "Starting feature %s over %s !"%(f,self.feature_in_process)
self.feature_in_process=f
def stop_feature(self,f):
""" Declare some feature as "negotiated" to allow other features start negotiating. """
if self.feature_in_process<>f: raise "Stopping feature %s instead of %s !"%(f,self.feature_in_process)
self.feature_in_process=None
def set_socket_state(self,newstate):
""" Change the underlaying socket state.
Socket starts with SOCKET_UNCONNECTED state
and then proceeds (possibly) to SOCKET_ALIVE
and then to SOCKET_DEAD """
if self._socket_state<newstate: self._socket_state=newstate
def set_session_state(self,newstate):
""" Change the session state.
Session starts with SESSION_NOT_AUTHED state
and then comes through
SESSION_AUTHED, SESSION_BOUND, SESSION_OPENED and SESSION_CLOSED states.
"""
if self._session_state<newstate:
if self._session_state<SESSION_AUTHED and \
newstate>=SESSION_AUTHED: self._stream_pos_queued=self._stream_pos_sent
self._session_state=newstate
def set_stream_state(self,newstate):
""" Change the underlaying XML stream state
Stream starts with STREAM__NOT_OPENED and then proceeds with
STREAM__OPENED, STREAM__CLOSING and STREAM__CLOSED states.
Note that some features (like TLS and SASL)
requires stream re-start so this state can have non-linear changes. """
if self._stream_state<newstate: self._stream_state=newstate
|