/usr/lib/topic_tools/transform is in topic-tools 1.11.16-3.
This file is owned by root:root, with mode 0o755.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 | #!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@author: enriquefernandez
Allows to take a topic or one of it fields and output it on another topic
after performing a valid python operation.
The operations are done on the message, which is taken in the variable 'm'.
* Examples (note that numpy is imported by default):
$ rosrun topic_tools transform /imu/orientation/x /x_str std_msgs/String 'str(m)'
$ rosrun topic_tools transform /imu/orientation/x /x_in_degrees std_msgs/Float64 -- '-numpy.rad2deg(m)'
$ rosrun topic_tools transform /imu/orientation /norm std_msgs/Float64 'numpy.sqrt(numpy.sum(numpy.array([m.x, m.y, m.z, m.w])))'
$ rosrun topic_tools transform /imu/orientation /norm std_msgs/Float64 'numpy.linalg.norm([m.x, m.y, m.z, m.w])'
$ rosrun topic_tools transform /imu/orientation /euler geometry_msgs/Vector3 'tf.transformations.euler_from_quaternion([m.x, m.y, m.z, m.w])' --import tf
"""
from __future__ import print_function
import roslib
import rospy
import rostopic
import argparse
import importlib
import sys
class TopicOp:
def __init__(self):
parser = argparse.ArgumentParser(
formatter_class=argparse.RawTextHelpFormatter,
description='Apply a Python operation to a topic.\n\n'
'A node is created that subscribes to a topic,\n'
'applies a Python expression to the topic (or topic\n'
'field) message \'m\', and publishes the result\n'
'through another topic.\n\n'
'Usage:\n\trosrun topic_tools transform '
'<input> <output topic> <output type> '
'[<expression on m>] [--import numpy tf]\n\n'
'Example:\n\trosrun topic_tools transform /imu/orientation '
'/norm std_msgs/Float64 '
'\'sqrt(sum(array([m.x, m.y, m.z, m.w])))\'')
parser.add_argument('input', help='Input topic or topic field.')
parser.add_argument('output_topic', help='Output topic.')
parser.add_argument('output_type', help='Output topic type.')
parser.add_argument(
'expression', default='m',
help='Python expression to apply on the input message \'m\'.'
)
parser.add_argument(
'-i', '--import', dest='modules', nargs='+', default=['numpy'],
help='List of Python modules to import.'
)
parser.add_argument(
'--wait-for-start', action='store_true',
help='Wait for input messages.'
)
# get and strip out ros args first
argv = rospy.myargv()
args = parser.parse_args(argv[1:])
self.modules = {}
for module in args.modules:
try:
mod = importlib.import_module(module)
except ImportError:
print('Failed to import module: %s' % module, file=sys.stderr)
else:
self.modules[module] = mod
self.expression = args.expression
input_class, input_topic, self.input_fn = rostopic.get_topic_class(
args.input, blocking=args.wait_for_start)
if input_topic is None:
print('ERROR: Wrong input topic (or topic field): %s' % args.input, file=sys.stderr)
sys.exit(1)
self.output_class = roslib.message.get_message_class(args.output_type)
self.sub = rospy.Subscriber(input_topic, input_class, self.callback)
self.pub = rospy.Publisher(args.output_topic, self.output_class, queue_size=1)
def callback(self, m):
if self.input_fn is not None:
m = self.input_fn(m)
try:
res = eval("{}".format(self.expression), self.modules, {'m': m})
except NameError as e:
print("Expression using variables other than 'm': %s" % e.message, file=sys.stderr)
except UnboundLocalError as e:
print('Wrong expression:%s' % e.message, file=sys.stderr)
except Exception:
raise
else:
if not isinstance(res, (list, tuple)):
res = [res]
self.pub.publish(*res)
if __name__ == '__main__':
rospy.init_node('transform', anonymous=True)
app = TopicOp()
rospy.spin()
|