/usr/lib/python3/dist-packages/morse/middleware/pocolibs/overlays/rflex_overlay.py is in python3-morse-simulator 1.4-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 | import logging; logger = logging.getLogger("morse." + __name__)
from morse.core.services import service, async_service, interruptible
from morse.core.overlay import MorseOverlay
from morse.core import status
from morse.middleware.pocolibs.actuators.genpos import GenPosPoster
from morse.middleware.pocolibs_datastream import PosterNotFound, DummyPoster
class RflexModule(MorseOverlay):
def __init__(self, overlaid_object):
# Call the constructor of the parent class
MorseOverlay.__init__(self, overlaid_object)
self._clean_track = False
self._cntrl = DummyPoster("rflexCntrl")
def interrupt(self):
self.overlaid_object.stop()
if self._clean_track:
self.overlaid_object.input_functions.pop()
MorseOverlay.interrupt(self)
@service
def InitClient(self, *args):
pass
@service
def EndClient(self, *args):
pass
@service
def SetWdogRef(self, *args):
pass
@service
def GetWdogRef(self, *args):
pass
@service
def SetMode(self, mode):
self._mode = mode
@service
def GetMode(self):
return status.SUCCESS, self._mode
@service
def PomTagging(self, *args):
pass
@service
def SetPos(self, *args):
pass
@service
def SetPosFromMEPoster(self, *args):
pass
@service
def SetVarparams(self, *args):
pass
@async_service
def Stop(self, *args):
self.overlaid_object.stop()
del self.overlaid_object.input_functions[:]
self.completed(status.SUCCESS)
@async_service
def TrackEnd(self, *args):
self.overlaid_object.stop()
self.completed(status.SUCCESS)
@async_service
def GotoSpeed(self, numRef, updatedPeriod, v, vt, w, *args):
self.overlaid_object.set_speed(float(v), float(w))
self.completed(status.SUCCESS)
@interruptible
@async_service
def TrackSpeedStart(self, poster_name):
try:
poster = GenPosPoster(self.overlaid_object,
{ 'poster' : poster_name, 'delay' : False })
except PosterNotFound:
return self.completed(status.FAILED, ["POSTER_NOT_FOUND"])
self._clean_track = True
self.overlaid_object.input_functions.append(poster.default)
@service
def GetGeoConfig(self, *args):
pass
@service
def SetGeoConfig(self, *args):
pass
@service
def SonarOn(self, *args):
pass
@service
def SonarOff(self, *args):
pass
@service
def BrakeOn(self, *args):
pass
@service
def BrakeOff(self, *args):
pass
@service
def GetJoystick(self, *args):
pass
@service
def MonitorBattery(self, *args):
pass
@service
def Gyro(self, *args):
pass
@service
def GetGyro(self, *args):
pass
@service
def SetOdometryMethod(self, *args):
pass
@service
def Log(self, *args):
pass
@service
def StopLog(self, *args):
pass
def name(self):
return "rflex"
|