/usr/share/pyshared/txosc/sync.py is in python-txosc 0.2.0-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 | #!/usr/bin/env python
# Copyright (c) 2009 Alexandre Quessy, Arjan Scherpenisse
# See LICENSE for details.
# TODO: # -*- test-case-name: txosc.test.test_sync -*-
"""
Synchronous blocking OSC sender without Twisted.
Twisted is not used in this file. You don't even need to repy on Twisted to use
it and the txosc.osc module. That is enough to send OSC messages in a simple
script.
"""
import socket
import struct
#TODO: receiver
#TODO: bidirectional sender-receiver
# self._socket.recv(self.buffer_size)
# self.buffer_size = 1024
class _Sender(object):
def __init__(self):
self._socket = None
def send(self, element):
binary = element.toBinary()
self._actually_send(binary)
def _actually_send(self, binary_data):
"""
@param binary_data: Binary blob of an element to send.
"""
raise NotImplementedError("This method must be overriden in child classes.")
def close(self):
raise NotImplementedError("This method must be overriden in child classes.")
class TcpSender(_Sender):
"""
Send OSC over TCP using low-level Python socket tools.
"""
def __init__(self, address, port):
_Sender.__init__(self)
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.address = address
self.port = port
self._socket.connect((self.address, self.port))
def _actually_send(self, binary_data):
#For TCP, we need to pack the data with its size first
data = struct.pack(">i", len(binary_data)) + binary_data
self._socket.send(data)
def close(self):
self._socket.close()
UDP_MODE_MULTICAST = "multicast"
UDP_MODE_BROADCAST = "broadcast"
class UdpSender(_Sender):
"""
Uses UDP.
Mode can be either UDP_MODE_BROADCAST or UDP_MODE_MULTICAST or None.
If using UDP_MODE_MULTICAST, you must set the multicast_group.
FIXME: Right now, the data it sends is badly formatted.
"""
def __init__(self, address, port, mode=None, multicast_group=None):
"""
@param multicast_group: IP of the multicast group.
@type multicast_group: C{str}
"""
_Sender.__init__(self)
self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.address = socket.gethostbyname(address)
self.port = port
self._socket.bind(('', 0))
self.mode = mode
self.multicast_group = None
if self.mode == UDP_MODE_MULTICAST:
if multicast_group is None:
raise RuntimeError("You must set the multicast group.")
self.multicast_group = multicast_group
ttl = struct.pack('b', 1)
self._socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl)
# TODO
elif self.mode == UDP_MODE_BROADCAST:
if multicast_group is not None:
raise RuntimeError("Not using the multicast mode.")
self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
else:
if multicast_group is not None:
raise RuntimeError("Not using the multicast mode.")
def _actually_send(self, binary_data):
# For UDP, we just send the data.
# No need to pack it with its size.
if self.mode == UDP_MODE_BROADCAST:
self._socket.sendto(binary_data, ('<broadcast>', self.port))
elif self.mode == UDP_MODE_MULTICAST:
self._socket.sendto(binary_data, (self.multicast_group, self.port))
else:
self._socket.sendto(binary_data, (self.address, self.port))
def close(self):
self._socket.close()
|