This file is indexed.

/usr/share/sumo/tools/turn-defs/connections.py is in sumo-tools 0.15.0~dfsg-2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
#!/usr/bin/python
#-*- encoding: utf8 -*-
  
""" Operations and classes necessary to operate on SUMO connections. """
  
import logging
import xml.dom.minidom
  
LOGGER = logging.getLogger(__name__)

class UniformDestinationWeightCalculator():
    """Calculates weight for each destination from given lane,
       redistributing weights uniformly: first redistributes the total
       traffic among incoming lanes uniformly and then divides the total
       traffic for a lane among all of the destination lanes.
       
       For example, assume we  have a junction with one incoming road In1
       with 3 incoming lanes and 2 outgoing roads Out1 and Out2,
       having 2 and 3 outgoing lanes, respectively.
  
                   ___________________
            lane 0 _______     _______ lane 0
       Out2 lane 1 _______     _______ lane 1  In1
            lane 2 _______     _______ lane 2
                          | | |
                          | | |
                          | | |
                      lane 0 1
                           Out1
  
      Assume that lanes are connected as follows:
      - In1, lane 0 is connected to Out2, lane 0
      - In1, lane 1 is connected to Out2, lane 1
      - In1, lane 2 is connected to Out2, lane 2
      
      - In1, lane 1 is connected to Out1, lane 0
      - In1, lane 2 is connected to Out1, lane 1
  
       This weight calculator will redistribute weights as follows:
  
       1. Distribute the incoming traffic on In1 uniformly on lanes 0, 1 and 2:
          as a result, each lane has been assigned 33,(3)% of the traffic.
       2. Since In1, lane 0 is connected to Out2, lane 0, whole traffic
          from In1, lane 0 is redirected to Out2, lane 0, which makes 33,(3)%
          of the total incoming traffic from In1.
       3. Since In1, lane 1 is connected both to Out2, lane 1 and Out1, lane 0,
          the traffic on that lane (that is, 33,(3)% of the total traffic) is
          spread uniformly among these two lanes, resulting in 16,(6)% of the total
          traffic for each destination lane.
       4. Similarly, In2, lane 2's traffic is spread uniformly among Out2, lane 2
          and Out 1, lane 1, resulting in 16,(6)% of the total traffic for each
          destination lane.
  
       This, if we ask the calculator for weight assigned to In1, lane 0, we get
       33,(3) as a result; if we ask for weight assigned to In1, lane 1, we get
       16,(6) as a result.
  
       Note: If you want to provide your own weight calculator (based on Gaussian
       distribution, for example), simply provide a class with calculate_weight
       method. The method's signature may need to be changed in that case. """
  
    # pylint: disable=R0903

    logger = logging.getLogger(__name__)

    def __init__(self):
        pass
  
    def calculate_weight(self,
                         source_lane_no,
                         source_total_lanes,
                         destination_total_lanes):
  
        """ Calculates the weight assigned to a single destination 
            from given lane in an uniform way. See class docs
            for explanation. """
  
        lane_weight = 100.0 / source_total_lanes
        destination_weight = lane_weight / destination_total_lanes
  
        self.logger.debug("Destination weight for lane %s/%s connected to %s "
            "destinations: %f" %
                (str(source_lane_no), str(source_total_lanes),
                    str(destination_total_lanes), destination_weight))
  
        return destination_weight
  
class Connections:
    """ Represents all of the connections in the network we have read. """
  
    logger = logging.getLogger(__name__)
  
    def __init__(self,
                 calculator = UniformDestinationWeightCalculator()):
        """ Constructor. Allows providing own destination weight calculator,
            which defaults to UniformDestinationWeightCalculator if not "
            provided. """
  
        self.connections_map = { }
        self.destination_weight_calculator = calculator
  
    def add(self,
            source,
            source_lane,
            destination):
  
        """ Adds a connection. If a connection is readded, 
            a warning is issued. """
  
        self.logger.debug("Adding connection %s (%s) -> %s" %
            (str(source), str(source_lane), str(destination)))
  
        if source not in self.connections_map:
            self.logger.debug("Created new mapping for %s" %
                (str(source)))
            self.connections_map[source] = {}
  
        if source_lane not in self.connections_map[source]:
            self.logger.debug("Created new mapping for %s / %s" %
                (str(source), str(source_lane)))
            self.connections_map[source][source_lane] = set()
  
        if destination in self.connections_map[source][source_lane]:
            self.logger.warn("Destination for %s (lane %s) readded: %s"
                % (str(source), str(source_lane), str(destination)))
  
        self.connections_map[source][source_lane].add(destination)
  
    def get_sources(self):
        """ Returns all of the incoming edges that this connections collection
            contains. Incoming edges are sorted alphabetically
            in ascending order. """
  
        sources = self.connections_map.keys()
        sources.sort()
        return sources
  
    def get_lanes(self, source):
        """ Returns all lanes that have connections for the given edge.
            The connection_source must have been added before. """
  
        return self.connections_map[source].keys()
  
    def get_destinations(self, source, source_lane):
        """ Returns all possible destinations that are achievable from given
            lane on given edge. The connection_source
            and connection_source_lane must have been added before. """
  
        return self.connections_map[source][source_lane]
  
    def calculate_destination_weight(self,
                                     source,
                                     source_lane,
                                     destination):
        """ Calculates weight assigned to the given destination using
            weight calculator provided in class' constructor.
            The connection_source, connection_source_lane
            and connection_destination must have been added before."""
  
        weight = self.destination_weight_calculator.calculate_weight(
            source_lane,
            len(self.get_lanes(source)),
            len(self.get_destinations(source, source_lane)))
  
        self.logger.debug("Destination weight for connection "
            "%s (%s) -> %s: %f" %
                (str(source), str(source_lane), str(destination), weight))
  
        return weight
  
class ConnectionXML():
    """ Represents a raw, xml-defined connection. """
  
    def __init__(self, connection_xml):
        """ Constructor. The connection_xml argument must be a well-formed
            connection XML entity string. See SUMO docs for information on
            connection XML format. """
  
        self.connection_xml = connection_xml
        
    def get_source(self):
        """ Returns the connection's incoming edge. Raises AttributeError
            if the incoming edge is missing. """
  
        source = self.connection_xml.getAttribute("from")
        if source is '':
            raise AttributeError
        return source
  
    def get_source_lane(self):
        """ Returns the connection's incoming edge's lane.
            Raises AttributeError if the incoming edge's lane is missing. """
  
        source_lane = self.connection_xml.getAttribute("fromLane")
        if source_lane is '':
            raise AttributeError
        return source_lane
  
    def get_destination(self):
        """ Returns the connection's outgoing edge. Raises AttributeError
            if the outgoing edge is missing. """
  
        destination = self.connection_xml.getAttribute("to")
        if destination is '':
            raise AttributeError
        return destination
      
  
class ConnectionsXMLReader():
    """ Reads the XML connection definitions. """
    # pylint: disable=R0903

    logger = logging.getLogger("ConnectionsXMLReader")
  
    def __init__(self, connections_xml_stream):
        """ Constructor. The connections_xml_stream argument may be
            a filename or an opened file. """
  
        self.connections_xml = xml.dom.minidom.parse(connections_xml_stream)
  
    def get_connections(self):
        """ Returns connections defined in the XML stream provided in
            the constructor. """
  
        return [ ConnectionXML(connection_xml) for connection_xml in
            self.connections_xml.getElementsByTagName("connection") ]
  
  
  
def from_stream(input_connections):
    """ Constructs Connections object from connections defined in the given
        stream. The input_connections argument may be a filename or an opened
        file. """
  
    LOGGER.info("Reading connections from input stream")

    connections = Connections()
    connections_xml_reader = ConnectionsXMLReader(input_connections)
    for xml_connection in connections_xml_reader.get_connections():
        connections.add(xml_connection.get_source(),
                        xml_connection.get_source_lane(),
                        xml_connection.get_destination())
   
    return connections
  
import collectinghandler
import unittest
  
  
class UniformDestinationWeightCalculatorTestCase(unittest.TestCase):
    # pylint: disable=C,W,R,E,F 

    def setUp(self):
      self.calculator = UniformDestinationWeightCalculator()
  
    def tearDown(self):
      self.calculator = None
  
    def test_uniform_weight_spread_across_lanes(self):
      # 1 lane case 
      self.assertAlmostEqual(100.0, self.calculator.calculate_weight(0, 1, 1))
  
      # 2 lane case 
      self.assertAlmostEqual(50.0, self.calculator.calculate_weight(0, 2, 1))
      self.assertAlmostEqual(50.0, self.calculator.calculate_weight(1, 2, 1))
  
      # 2 lane case 
      self.assertAlmostEqual(100.0/3, self.calculator.calculate_weight(0, 3, 1))
      self.assertAlmostEqual(100.0/3, self.calculator.calculate_weight(1, 3, 1))
      self.assertAlmostEqual(100.0/3, self.calculator.calculate_weight(2, 3, 1))
  
    def test_uniform_weight_spread_across_destinations(self):
      # 1 destination
      self.assertAlmostEqual(100.0, self.calculator.calculate_weight(0, 1, 1))
  
      # 2 destinations
      self.assertAlmostEqual(100.0/2, self.calculator.calculate_weight(0, 1, 2))
  
      # 3 destinations
      self.assertAlmostEqual(100.0/3, self.calculator.calculate_weight(0, 1, 3))
  
  
class ConnectionsTestCase(unittest.TestCase):
    # pylint: disable=C,W,R,E,F 

    def setUp(self):
      self.connections = Connections()
  
    def tearDown(self):
      self.connections = None
  
    def assert_contains_source(self, source):
      self.assertTrue(source in
          self.connections.get_sources())
  
    def assert_contains_lane(self, source, lane):
      self.assertTrue(lane in
          self.connections.get_lanes(source))
  
    def assert_contains_destination(self, source, lane, destination):
      self.assertTrue(destination in
          self.connections.get_destinations(source, lane))
    
    def assert_sources_no(self, sources_no):
      self.assertEqual(sources_no,
                       len(self.connections.get_sources()))
  
    def assert_lanes_no(self, source, lanes_no):
      self.assertEqual(lanes_no,
                       len(self.connections.get_lanes(source)))
  
    def assert_destinations_no(self, source, lane, destinations_no):
      self.assertEqual(destinations_no,
                       len(self.connections.get_destinations(source, lane)))
    
    def test_map_empty_on_init(self):
      self.assert_sources_no(0)
  
    def test_add(self):
      self.connections.add("source", "source lane", "destination")
      
      self.assert_sources_no(1)
      self.assert_contains_source("source")
      
      self.assert_lanes_no("source", 1)
      self.assert_contains_lane("source", "source lane")
  
      self.assert_destinations_no("source", "source lane", 1)
      self.assert_contains_destination("source", "source lane", "destination") 
  
    def test_add_different_lanes(self):
      self.connections.add("source", "source lane 1", "destination")
      self.connections.add("source", "source lane 2", "destination")
  
      self.assert_sources_no(1)
      self.assert_contains_source("source")
      
      self.assert_lanes_no("source", 2)
      self.assert_contains_lane("source", "source lane 1")
      self.assert_contains_lane("source", "source lane 2")
  
      self.assert_destinations_no("source", "source lane 1", 1)
      self.assert_contains_destination("source", "source lane 1", "destination")
  
      self.assert_destinations_no("source", "source lane 2", 1)
      self.assert_contains_destination("source", "source lane 2", "destination")
   
    def test_add_different_destinations(self):
      self.connections.add("source", "source lane", "destination 1")
      self.connections.add("source", "source lane", "destination 2")
  
      self.assert_sources_no(1)
      self.assert_contains_source("source")
      
      self.assert_lanes_no("source", 1)
      self.assert_contains_lane("source", "source lane")
  
      self.assert_destinations_no("source", "source lane", 2)
      self.assert_contains_destination("source", "source lane", "destination 1")
      self.assert_contains_destination("source", "source lane", "destination 2")
   
    def test_readd_raises_warning(self):
      collecting_handler = collectinghandler.CollectingHandler()
      self.connections.logger.addHandler(collecting_handler)
  
      self.connections.add("source", "source lane", "destination")    
      self.connections.add("source", "source lane", "destination")
      
      self.assert_sources_no(1)
      self.assert_contains_source("source")
      
      self.assert_lanes_no("source", 1)
      self.assert_contains_lane("source", "source lane")
  
      self.assert_destinations_no("source", "source lane", 1)
      self.assert_contains_destination("source", "source lane", "destination")
  
      self.assertTrue(
          "Destination for source (lane source lane) readded: destination"
              in [record.getMessage() for record in collecting_handler.log_records])
  
  
class ConnectionXMLTestCase(unittest.TestCase):
    # pylint: disable=C,W,R,E,F 

    SOURCE_ATTRIBUTE = "from"
    SOURCE_LANE_ATTRIBUTE= "fromLane"
    DESTINATION_ATTRIBUTE = "to"
    DESTINATION_LANE_ATTRIBUTE = "toLane"
   
    def setUp(self):
      # Shared resources.
      self.from_value = "source"
      self.to_value = "destination"
      self.from_lane_value = "source lane"
      self.to_lane_value = "destination lane"
      
    def set_up_new_connection_element(self):
      connection_element = xml.dom.minidom.Element("connection")
      connection_element.setAttribute(self.SOURCE_ATTRIBUTE,
          self.from_value)
      connection_element.setAttribute(self.SOURCE_LANE_ATTRIBUTE,
          self.from_lane_value)
      connection_element.setAttribute(self.DESTINATION_ATTRIBUTE,
          self.to_value)
      connection_element.setAttribute(self.DESTINATION_LANE_ATTRIBUTE,
          self.to_lane_value)
      return connection_element
  
    def set_up_connection_xml_without(self, attribute):
      connection_element = self.set_up_new_connection_element()
      connection_element.removeAttribute(attribute)
      self.set_up_connection_xml(connection_element)
  
    def set_up_connection_xml(self, connection_element):
      self.connection_xml = ConnectionXML(connection_element)
  
    def tearDown(self):
      self.connection_xml = None
  
    def test_get_source(self):
      self.set_up_connection_xml(self.set_up_new_connection_element())
      self.assertEqual(self.from_value, self.connection_xml.get_source())
  
    def test_get_source_with_missing_source(self):
      self.set_up_connection_xml_without(self.SOURCE_ATTRIBUTE)
      self.assertRaises(AttributeError, self.connection_xml.get_source)
  
    def test_get_source_lane(self):
      self.set_up_connection_xml(self.set_up_new_connection_element())
      self.assertEqual(self.from_lane_value, self.connection_xml.get_source_lane())
  
    def test_get_source_lane_with_missing_source_lane(self):
      self.set_up_connection_xml_without(self.SOURCE_LANE_ATTRIBUTE)
      self.assertRaises(AttributeError, self.connection_xml.get_source_lane)
  
    def test_get_destination(self):
      self.set_up_connection_xml(self.set_up_new_connection_element())
      self.assertEqual(self.to_value, self.connection_xml.get_destination())
  
    def test_get_destination_with_missing_destination(self):
      self.set_up_connection_xml_without(self.DESTINATION_ATTRIBUTE)
      self.assertRaises(AttributeError, self.connection_xml.get_destination)
  
if __name__ == "__main__":
    unittest.main()