/usr/lib/topic_tools/transform is in topic-tools 1.11.16-3.
This file is owned by root:root, with mode 0o755.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110  | #!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@author: enriquefernandez
Allows to take a topic or one of it fields and output it on another topic
after performing a valid python operation.
The operations are done on the message, which is taken in the variable 'm'.
* Examples (note that numpy is imported by default):
$ rosrun topic_tools transform /imu/orientation/x /x_str std_msgs/String 'str(m)'
$ rosrun topic_tools transform /imu/orientation/x /x_in_degrees std_msgs/Float64 -- '-numpy.rad2deg(m)'
$ rosrun topic_tools transform /imu/orientation /norm std_msgs/Float64 'numpy.sqrt(numpy.sum(numpy.array([m.x, m.y, m.z, m.w])))'
$ rosrun topic_tools transform /imu/orientation /norm std_msgs/Float64 'numpy.linalg.norm([m.x, m.y, m.z, m.w])'
$ rosrun topic_tools transform /imu/orientation /euler geometry_msgs/Vector3 'tf.transformations.euler_from_quaternion([m.x, m.y, m.z, m.w])' --import tf
"""
from __future__ import print_function
import roslib
import rospy
import rostopic
import argparse
import importlib
import sys
class TopicOp:
    def __init__(self):
        parser = argparse.ArgumentParser(
            formatter_class=argparse.RawTextHelpFormatter,
            description='Apply a Python operation to a topic.\n\n'
                        'A node is created that subscribes to a topic,\n'
                        'applies a Python expression to the topic (or topic\n'
                        'field) message \'m\', and publishes the result\n'
                        'through another topic.\n\n'
                        'Usage:\n\trosrun topic_tools transform '
                        '<input> <output topic> <output type> '
                        '[<expression on m>] [--import numpy tf]\n\n'
                        'Example:\n\trosrun topic_tools transform /imu/orientation '
                        '/norm std_msgs/Float64 '
                        '\'sqrt(sum(array([m.x, m.y, m.z, m.w])))\'')
        parser.add_argument('input', help='Input topic or topic field.')
        parser.add_argument('output_topic', help='Output topic.')
        parser.add_argument('output_type', help='Output topic type.')
        parser.add_argument(
            'expression', default='m',
            help='Python expression to apply on the input message \'m\'.'
        )
        parser.add_argument(
            '-i', '--import', dest='modules', nargs='+', default=['numpy'],
            help='List of Python modules to import.'
        )
        parser.add_argument(
            '--wait-for-start', action='store_true',
            help='Wait for input messages.'
        )
        # get and strip out ros args first
        argv = rospy.myargv()
        args = parser.parse_args(argv[1:])
        self.modules = {}
        for module in args.modules:
            try:
                mod = importlib.import_module(module)
            except ImportError:
                print('Failed to import module: %s' % module, file=sys.stderr)
            else:
                self.modules[module] = mod
        self.expression = args.expression
        input_class, input_topic, self.input_fn = rostopic.get_topic_class(
            args.input, blocking=args.wait_for_start)
        if input_topic is None:
            print('ERROR: Wrong input topic (or topic field): %s' % args.input, file=sys.stderr)
            sys.exit(1)
        self.output_class = roslib.message.get_message_class(args.output_type)
        self.sub = rospy.Subscriber(input_topic, input_class, self.callback)
        self.pub = rospy.Publisher(args.output_topic, self.output_class, queue_size=1)
    def callback(self, m):
        if self.input_fn is not None:
            m = self.input_fn(m)
        try:
            res = eval("{}".format(self.expression), self.modules, {'m': m})
        except NameError as e:
            print("Expression using variables other than 'm': %s" % e.message, file=sys.stderr)
        except UnboundLocalError as e:
            print('Wrong expression:%s' % e.message, file=sys.stderr)
        except Exception:
            raise
        else:
            if not isinstance(res, (list, tuple)):
                res = [res]
            self.pub.publish(*res)
if __name__ == '__main__':
    rospy.init_node('transform', anonymous=True)
    app = TopicOp()
    rospy.spin()
 |