This file is indexed.

/usr/share/pyshared/wokkel/server.py is in python-wokkel 0.7.1-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
# -*- test-case-name: wokkel.test.test_server -*-
#
# Copyright (c) Ralph Meijer.
# See LICENSE for details.

"""
XMPP Server-to-Server protocol.

This module implements several aspects of XMPP server-to-server communications
as described in XMPP Core (RFC 3920). Refer to that document for the meaning
of the used terminology.
"""

# hashlib is new in Python 2.5, try that first.
try:
    from hashlib import sha256
    digestmod = sha256
except ImportError:
    import Crypto.Hash.SHA256 as digestmod
    sha256 = digestmod.new

import hmac

from zope.interface import implements

from twisted.internet import defer, reactor
from twisted.names.srvconnect import SRVConnector
from twisted.python import log, randbytes
from twisted.words.protocols.jabber import error, ijabber, jid, xmlstream
from twisted.words.xish import domish

from wokkel.generic import DeferredXmlStreamFactory, XmlPipe, prepareIDNName

NS_DIALBACK = 'jabber:server:dialback'

def generateKey(secret, receivingServer, originatingServer, streamID):
    """
    Generate a dialback key for server-to-server XMPP Streams.

    The dialback key is generated using the algorithm described in
    U{XEP-0185<http://xmpp.org/extensions/xep-0185.html>}. The used
    terminology for the parameters is described in RFC-3920.

    @param secret: the shared secret known to the Originating Server and
                   Authoritive Server.
    @type secret: C{str}
    @param receivingServer: the Receiving Server host name.
    @type receivingServer: C{str}
    @param originatingServer: the Originating Server host name.
    @type originatingServer: C{str}
    @param streamID: the Stream ID as generated by the Receiving Server.
    @type streamID: C{str}
    @return: hexadecimal digest of the generated key.
    @type: C{str}
    """

    hashObject = sha256()
    hashObject.update(secret)
    hashedSecret = hashObject.hexdigest()
    message = " ".join([receivingServer, originatingServer, streamID])
    hash = hmac.HMAC(hashedSecret, message, digestmod=digestmod)
    return hash.hexdigest()


def trapStreamError(xs, observer):
    """
    Trap stream errors.

    This wraps an observer to catch exceptions. In case of a
    L{error.StreamError}, it is send over the given XML stream. All other
    exceptions yield a C{'internal-server-error'} stream error, that is
    sent over the stream, while the exception is logged.

    @return: Wrapped observer
    """

    def wrappedObserver(element):
        try:
            observer(element)
        except error.StreamError, exc:
            xs.sendStreamError(exc)
        except:
            log.err()
            exc = error.StreamError('internal-server-error')
            xs.sendStreamError(exc)

    return wrappedObserver


class XMPPServerConnector(SRVConnector):
    def __init__(self, reactor, domain, factory):
        SRVConnector.__init__(self, reactor, 'xmpp-server', domain, factory)


    def pickServer(self):
        host, port = SRVConnector.pickServer(self)

        if not self.servers and not self.orderedServers:
            # no SRV record, fall back..
            port = 5269

        return host, port


class DialbackFailed(Exception):
    pass



class OriginatingDialbackInitializer(object):
    """
    Server Dialback Initializer for the Orginating Server.
    """

    implements(ijabber.IInitiatingInitializer)

    _deferred = None

    def __init__(self, xs, thisHost, otherHost, secret):
        self.xmlstream = xs
        self.thisHost = thisHost
        self.otherHost = otherHost
        self.secret = secret


    def initialize(self):
        self._deferred = defer.Deferred()
        self.xmlstream.addObserver(xmlstream.STREAM_ERROR_EVENT,
                                   self.onStreamError)
        self.xmlstream.addObserver("/result[@xmlns='%s']" % NS_DIALBACK,
                                   self.onResult)

        key = generateKey(self.secret, self.otherHost,
                          self.thisHost, self.xmlstream.sid)

        result = domish.Element((NS_DIALBACK, 'result'))
        result['from'] = self.thisHost
        result['to'] = self.otherHost
        result.addContent(key)

        self.xmlstream.send(result)

        return self._deferred


    def onResult(self, result):
        self.xmlstream.removeObserver(xmlstream.STREAM_ERROR_EVENT,
                                      self.onStreamError)
        if result['type'] == 'valid':
            self.xmlstream.otherEntity = jid.internJID(self.otherHost)
            self._deferred.callback(None)
        else:
            self._deferred.errback(DialbackFailed())


    def onStreamError(self, failure):
        self.xmlstream.removeObserver("/result[@xmlns='%s']" % NS_DIALBACK,
                                      self.onResult)
        self._deferred.errback(failure)



class ReceivingDialbackInitializer(object):
    """
    Server Dialback Initializer for the Receiving Server.
    """

    implements(ijabber.IInitiatingInitializer)

    _deferred = None

    def __init__(self, xs, thisHost, otherHost, originalStreamID, key):
        self.xmlstream = xs
        self.thisHost = thisHost
        self.otherHost = otherHost
        self.originalStreamID = originalStreamID
        self.key = key


    def initialize(self):
        self._deferred = defer.Deferred()
        self.xmlstream.addObserver(xmlstream.STREAM_ERROR_EVENT,
                                   self.onStreamError)
        self.xmlstream.addObserver("/verify[@xmlns='%s']" % NS_DIALBACK,
                                   self.onVerify)

        verify = domish.Element((NS_DIALBACK, 'verify'))
        verify['from'] = self.thisHost
        verify['to'] = self.otherHost
        verify['id'] = self.originalStreamID
        verify.addContent(self.key)

        self.xmlstream.send(verify)
        return self._deferred


    def onVerify(self, verify):
        self.xmlstream.removeObserver(xmlstream.STREAM_ERROR_EVENT,
                                      self.onStreamError)
        if verify['id'] != self.originalStreamID:
            self.xmlstream.sendStreamError(error.StreamError('invalid-id'))
            self._deferred.errback(DialbackFailed())
        elif verify['to'] != self.thisHost:
            self.xmlstream.sendStreamError(error.StreamError('host-unknown'))
            self._deferred.errback(DialbackFailed())
        elif verify['from'] != self.otherHost:
            self.xmlstream.sendStreamError(error.StreamError('invalid-from'))
            self._deferred.errback(DialbackFailed())
        elif verify['type'] == 'valid':
            self._deferred.callback(None)
        else:
            self._deferred.errback(DialbackFailed())


    def onStreamError(self, failure):
        self.xmlstream.removeObserver("/verify[@xmlns='%s']" % NS_DIALBACK,
                                      self.onVerify)
        self._deferred.errback(failure)



class XMPPServerConnectAuthenticator(xmlstream.ConnectAuthenticator):
    """
    Authenticator for an outgoing XMPP server-to-server connection.

    This authenticator connects to C{otherHost} (the Receiving Server) and then
    initiates dialback as C{thisHost} (the Originating Server) using
    L{OriginatingDialbackInitializer}.

    @ivar thisHost: The domain this server connects from (the Originating
                    Server) .
    @ivar otherHost: The domain of the server this server connects to (the
                     Receiving Server).
    @ivar secret: The shared secret that is used for verifying the validity
                  of this new connection.
    """
    namespace = 'jabber:server'

    def __init__(self, thisHost, otherHost, secret):
        self.thisHost = thisHost
        self.otherHost = otherHost
        self.secret = secret
        xmlstream.ConnectAuthenticator.__init__(self, otherHost)


    def connectionMade(self):
        self.xmlstream.thisEntity = jid.internJID(self.thisHost)
        self.xmlstream.prefixes = {xmlstream.NS_STREAMS: 'stream',
                                   NS_DIALBACK: 'db'}
        xmlstream.ConnectAuthenticator.connectionMade(self)


    def associateWithStream(self, xs):
        xmlstream.ConnectAuthenticator.associateWithStream(self, xs)
        init = OriginatingDialbackInitializer(xs, self.thisHost,
                                              self.otherHost, self.secret)
        xs.initializers = [init]



class XMPPServerVerifyAuthenticator(xmlstream.ConnectAuthenticator):
    """
    Authenticator for an outgoing connection to verify an incoming connection.

    This authenticator connects to C{otherHost} (the Authoritative Server) and
    then initiates dialback as C{thisHost} (the Receiving Server) using
    L{ReceivingDialbackInitializer}.

    @ivar thisHost: The domain this server connects from (the Receiving
                    Server) .
    @ivar otherHost: The domain of the server this server connects to (the
                     Authoritative Server).
    @ivar originalStreamID: The stream ID of the incoming connection that is
                            being verified.
    @ivar key: The key provided by the Receving Server to be verified.
    """
    namespace = 'jabber:server'

    def __init__(self, thisHost, otherHost, originalStreamID, key):
        self.thisHost = thisHost
        self.otherHost = otherHost
        self.originalStreamID = originalStreamID
        self.key = key
        xmlstream.ConnectAuthenticator.__init__(self, otherHost)


    def connectionMade(self):
        self.xmlstream.thisEntity = jid.internJID(self.thisHost)
        self.xmlstream.prefixes = {xmlstream.NS_STREAMS: 'stream',
                                   NS_DIALBACK: 'db'}
        xmlstream.ConnectAuthenticator.connectionMade(self)


    def associateWithStream(self, xs):
        xmlstream.ConnectAuthenticator.associateWithStream(self, xs)
        init = ReceivingDialbackInitializer(xs, self.thisHost, self.otherHost,
                                            self.originalStreamID, self.key)
        xs.initializers = [init]



class XMPPServerListenAuthenticator(xmlstream.ListenAuthenticator):
    """
    Authenticator for an incoming XMPP server-to-server connection.

    This authenticator handles two types of incoming connections. Regular
    server-to-server connections are from the Originating Server to the
    Receiving Server, where this server is the Receiving Server. These
    connections start out by receiving a dialback key, verifying the
    key with the Authoritative Server, and then accept normal XMPP stanzas.

    The other type of connections is from a Receiving Server to an
    Authoritative Server, where this server acts as the Authoritative Server.
    These connections are used to verify the validity of an outgoing connection
    from this server. In this case, this server receives a verification
    request, checks the key and then returns the result.

    @ivar service: The service that keeps the list of domains we accept
                   connections for.
    """
    namespace = 'jabber:server'

    def __init__(self, service):
        xmlstream.ListenAuthenticator.__init__(self)
        self.service = service


    def streamStarted(self, rootElement):
        xmlstream.ListenAuthenticator.streamStarted(self, rootElement)

        # Compatibility fix for pre-8.2 implementations of ListenAuthenticator
        if not self.xmlstream.sid:
            self.xmlstream.sid = randbytes.secureRandom(8).encode('hex')

        if self.xmlstream.thisEntity:
            targetDomain = self.xmlstream.thisEntity.host
        else:
            targetDomain = self.service.defaultDomain

        def prepareStream(domain):
            self.xmlstream.namespace = self.namespace
            self.xmlstream.prefixes = {xmlstream.NS_STREAMS: 'stream',
                                       NS_DIALBACK: 'db'}
            if domain:
                self.xmlstream.thisEntity = jid.internJID(domain)

        try:
            if xmlstream.NS_STREAMS != rootElement.uri or \
               self.namespace != self.xmlstream.namespace or \
               ('db', NS_DIALBACK) not in rootElement.localPrefixes.iteritems():
                raise error.StreamError('invalid-namespace')

            if targetDomain and targetDomain not in self.service.domains:
                raise error.StreamError('host-unknown')
        except error.StreamError, exc:
            prepareStream(self.service.defaultDomain)
            self.xmlstream.sendStreamError(exc)
            return

        self.xmlstream.addObserver("//verify[@xmlns='%s']" % NS_DIALBACK,
                                   trapStreamError(self.xmlstream,
                                                   self.onVerify))
        self.xmlstream.addObserver("//result[@xmlns='%s']" % NS_DIALBACK,
                                   self.onResult)

        prepareStream(targetDomain)
        self.xmlstream.sendHeader()

        if self.xmlstream.version >= (1, 0):
            features = domish.Element((xmlstream.NS_STREAMS, 'features'))
            self.xmlstream.send(features)


    def onVerify(self, verify):
        try:
            receivingServer = jid.JID(verify['from']).host
            originatingServer = jid.JID(verify['to']).host
        except (KeyError, jid.InvalidFormat):
            raise error.StreamError('improper-addressing')

        if originatingServer not in self.service.domains:
            raise error.StreamError('host-unknown')

        if (self.xmlstream.otherEntity and
            receivingServer != self.xmlstream.otherEntity.host):
            raise error.StreamError('invalid-from')

        streamID = verify.getAttribute('id', '')
        key = unicode(verify)

        calculatedKey = generateKey(self.service.secret, receivingServer,
                                    originatingServer, streamID)
        validity = (key == calculatedKey) and 'valid' or 'invalid'

        reply = domish.Element((NS_DIALBACK, 'verify'))
        reply['from'] = originatingServer
        reply['to'] = receivingServer
        reply['id'] = streamID
        reply['type'] = validity
        self.xmlstream.send(reply)


    def onResult(self, result):
        def reply(validity):
            reply = domish.Element((NS_DIALBACK, 'result'))
            reply['from'] = result['to']
            reply['to'] = result['from']
            reply['type'] = validity
            self.xmlstream.send(reply)

        def valid(xs):
            reply('valid')
            if not self.xmlstream.thisEntity:
                self.xmlstream.thisEntity = jid.internJID(receivingServer)
            self.xmlstream.otherEntity = jid.internJID(originatingServer)
            self.xmlstream.dispatch(self.xmlstream,
                                    xmlstream.STREAM_AUTHD_EVENT)

        def invalid(failure):
            log.err(failure)
            reply('invalid')

        receivingServer = result['to']
        originatingServer = result['from']
        key = unicode(result)

        d = self.service.validateConnection(receivingServer, originatingServer,
                                            self.xmlstream.sid, key)
        d.addCallbacks(valid, invalid)
        return d



class DeferredS2SClientFactory(DeferredXmlStreamFactory):
    """
    Deferred firing factory for initiating XMPP server-to-server connection.

    The deferred has its callbacks called upon succesful authentication with
    the other server. In case of failed authentication or connection, the
    deferred will have its errbacks called instead.
    """

    logTraffic = False

    def __init__(self, authenticator):
        DeferredXmlStreamFactory.__init__(self, authenticator)

        self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
                          self.onConnectionMade)

        self.serial = 0


    def onConnectionMade(self, xs):
        xs.serial = self.serial
        self.serial += 1

        def logDataIn(buf):
            log.msg("RECV (%d): %r" % (xs.serial, buf))

        def logDataOut(buf):
            log.msg("SEND (%d): %r" % (xs.serial, buf))

        if self.logTraffic:
            xs.rawDataInFn = logDataIn
            xs.rawDataOutFn = logDataOut



def initiateS2S(factory):
    domain = prepareIDNName(factory.authenticator.otherHost)
    c = XMPPServerConnector(reactor, domain, factory)
    c.connect()
    return factory.deferred



class XMPPS2SServerFactory(xmlstream.XmlStreamServerFactory):
    """
    XMPP Server-to-Server Server factory.

    This factory accepts XMPP server-to-server connections.
    """

    logTraffic = False

    def __init__(self, service):
        self.service = service

        def authenticatorFactory():
            return XMPPServerListenAuthenticator(service)

        xmlstream.XmlStreamServerFactory.__init__(self, authenticatorFactory)
        self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
                          self.onConnectionMade)
        self.addBootstrap(xmlstream.STREAM_AUTHD_EVENT,
                          self.onAuthenticated)

        self.serial = 0


    def onConnectionMade(self, xs):
        """
        Called when a server-to-server connection was made.

        This enables traffic debugging on incoming streams.
        """
        xs.serial = self.serial
        self.serial += 1

        def logDataIn(buf):
            log.msg("RECV (%d): %r" % (xs.serial, buf))

        def logDataOut(buf):
            log.msg("SEND (%d): %r" % (xs.serial, buf))

        if self.logTraffic:
            xs.rawDataInFn = logDataIn
            xs.rawDataOutFn = logDataOut

        xs.addObserver(xmlstream.STREAM_ERROR_EVENT, self.onError)


    def onAuthenticated(self, xs):
        thisHost = xs.thisEntity.host
        otherHost = xs.otherEntity.host

        log.msg("Incoming connection %d from %r to %r established" %
                (xs.serial, otherHost, thisHost))

        xs.addObserver(xmlstream.STREAM_END_EVENT, self.onConnectionLost,
                                                   0, xs)
        xs.addObserver('/*', self.onElement, 0, xs)


    def onConnectionLost(self, xs, reason):
        thisHost = xs.thisEntity.host
        otherHost = xs.otherEntity.host

        log.msg("Incoming connection %d from %r to %r disconnected" %
                (xs.serial, otherHost, thisHost))


    def onError(self, reason):
        log.err(reason, "Stream Error")


    def onElement(self, xs, element):
        """
        Called when an element was received from one of the connected streams.

        """
        if element.handled:
            return
        else:
            self.service.dispatch(xs, element)



class ServerService(object):
    """
    Service for managing XMPP server to server connections.
    """

    logTraffic = False

    def __init__(self, router, domain=None, secret=None):
        self.router = router

        self.defaultDomain = domain
        self.domains = set()
        if self.defaultDomain:
            self.domains.add(self.defaultDomain)

        if secret is not None:
            self.secret = secret
        else:
            self.secret = randbytes.secureRandom(16).encode('hex')

        self._outgoingStreams = {}
        self._outgoingQueues = {}
        self._outgoingConnecting = set()
        self.serial = 0

        pipe = XmlPipe()
        self.xmlstream = pipe.source
        self.router.addRoute(None, pipe.sink)
        self.xmlstream.addObserver('/*', self.send)


    def outgoingInitialized(self, xs):
        thisHost = xs.thisEntity.host
        otherHost = xs.otherEntity.host

        log.msg("Outgoing connection %d from %r to %r established" %
                (xs.serial, thisHost, otherHost))

        self._outgoingStreams[thisHost, otherHost] = xs
        xs.addObserver(xmlstream.STREAM_END_EVENT,
                       lambda _: self.outgoingDisconnected(xs))

        if (thisHost, otherHost) in self._outgoingQueues:
            for element in self._outgoingQueues[thisHost, otherHost]:
                xs.send(element)
            del self._outgoingQueues[thisHost, otherHost]


    def outgoingDisconnected(self, xs):
        thisHost = xs.thisEntity.host
        otherHost = xs.otherEntity.host

        log.msg("Outgoing connection %d from %r to %r disconnected" %
                (xs.serial, thisHost, otherHost))

        del self._outgoingStreams[thisHost, otherHost]


    def initiateOutgoingStream(self, thisHost, otherHost):
        """
        Initiate an outgoing XMPP server-to-server connection.
        """

        def resetConnecting(_):
            self._outgoingConnecting.remove((thisHost, otherHost))

        if (thisHost, otherHost) in self._outgoingConnecting:
            return

        authenticator = XMPPServerConnectAuthenticator(thisHost,
                                                       otherHost,
                                                       self.secret)
        factory = DeferredS2SClientFactory(authenticator)
        factory.addBootstrap(xmlstream.STREAM_AUTHD_EVENT,
                             self.outgoingInitialized)
        factory.logTraffic = self.logTraffic

        self._outgoingConnecting.add((thisHost, otherHost))

        d = initiateS2S(factory)
        d.addBoth(resetConnecting)
        return d


    def validateConnection(self, thisHost, otherHost, sid, key):
        """
        Validate an incoming XMPP server-to-server connection.
        """

        def connected(xs):
            # Set up stream for immediate disconnection.
            def disconnect(_):
                xs.transport.loseConnection()
            xs.addObserver(xmlstream.STREAM_AUTHD_EVENT, disconnect)
            xs.addObserver(xmlstream.INIT_FAILED_EVENT, disconnect)

        authenticator = XMPPServerVerifyAuthenticator(thisHost, otherHost,
                                                      sid, key)
        factory = DeferredS2SClientFactory(authenticator)
        factory.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, connected)
        factory.logTraffic = self.logTraffic

        d = initiateS2S(factory)
        return d


    def send(self, stanza):
        """
        Send stanza to the proper XML Stream.

        This uses addressing embedded in the stanza to find the correct stream
        to forward the stanza to.
        """

        otherHost = jid.internJID(stanza["to"]).host
        thisHost = jid.internJID(stanza["from"]).host

        if (thisHost, otherHost) not in self._outgoingStreams:
            # There is no connection with the destination (yet). Cache the
            # outgoing stanza until the connection has been established.
            # XXX: If the connection cannot be established, the queue should
            #      be emptied at some point.
            if (thisHost, otherHost) not in self._outgoingQueues:
                self._outgoingQueues[(thisHost, otherHost)] = []
            self._outgoingQueues[(thisHost, otherHost)].append(stanza)
            self.initiateOutgoingStream(thisHost, otherHost)
        else:
            self._outgoingStreams[(thisHost, otherHost)].send(stanza)


    def dispatch(self, xs, stanza):
        """
        Send on element to be routed within the server.
        """
        stanzaFrom = stanza.getAttribute('from')
        stanzaTo = stanza.getAttribute('to')

        if not stanzaFrom or not stanzaTo:
            xs.sendStreamError(error.StreamError('improper-addressing'))
        else:
            try:
                sender = jid.internJID(stanzaFrom)
                jid.internJID(stanzaTo)
            except jid.InvalidFormat:
                log.msg("Dropping error stanza with malformed JID")

            if sender.host != xs.otherEntity.host:
                xs.sendStreamError(error.StreamError('invalid-from'))
            else:
                self.xmlstream.send(stanza)