This file is indexed.

/usr/lib/python3/dist-packages/networkx/algorithms/flow/tests/test_maxflow_large_graph.py is in python3-networkx 1.11-1ubuntu2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# -*- coding: utf-8 -*-
"""Maximum flow algorithms test suite on large graphs.
"""

__author__ = """Loïc Séguin-C. <loicseguin@gmail.com>"""
# Copyright (C) 2010 Loïc Séguin-C. <loicseguin@gmail.com>
# All rights reserved.
# BSD license.
import os
from nose.tools import *

import networkx as nx
from networkx.algorithms.flow import build_flow_dict, build_residual_network
from networkx.algorithms.flow import (edmonds_karp, preflow_push, shortest_augmenting_path)

flow_funcs = [edmonds_karp, preflow_push, shortest_augmenting_path]

msg = "Assertion failed in function: {0}"

def gen_pyramid(N):
        # This graph admits a flow of value 1 for which every arc is at
        # capacity (except the arcs incident to the sink which have
        # infinite capacity).
        G = nx.DiGraph()

        for i in range(N - 1):
            cap = 1. / (i + 2)
            for j in range(i + 1):
                G.add_edge((i, j), (i + 1, j),
                           capacity = cap)
                cap = 1. / (i + 1) - cap
                G.add_edge((i, j), (i + 1, j + 1),
                        capacity = cap)
                cap = 1. / (i + 2) - cap

        for j in range(N):
            G.add_edge((N - 1, j), 't')

        return G


def read_graph(name):
    dirname = os.path.dirname(__file__)
    path = os.path.join(dirname, name + '.gpickle.bz2')
    return nx.read_gpickle(path)


def validate_flows(G, s, t, soln_value, R, flow_func):
    flow_value = R.graph['flow_value']
    flow_dict = build_flow_dict(G, R)
    assert_equal(soln_value, flow_value, msg=msg.format(flow_func.__name__))
    assert_equal(set(G), set(flow_dict), msg=msg.format(flow_func.__name__))
    for u in G:
        assert_equal(set(G[u]), set(flow_dict[u]),
                     msg=msg.format(flow_func.__name__))
    excess = dict((u, 0) for u in flow_dict)
    for u in flow_dict:
        for v, flow in flow_dict[u].items():
            ok_(flow <= G[u][v].get('capacity', float('inf')),
                msg=msg.format(flow_func.__name__))
            ok_(flow >= 0, msg=msg.format(flow_func.__name__))
            excess[u] -= flow
            excess[v] += flow
    for u, exc in excess.items():
        if u == s:
            assert_equal(exc, -soln_value, msg=msg.format(flow_func.__name__))
        elif u == t:
            assert_equal(exc, soln_value, msg=msg.format(flow_func.__name__))
        else:
            assert_equal(exc, 0, msg=msg.format(flow_func.__name__))


class TestMaxflowLargeGraph:

    def test_complete_graph(self):
        N = 50
        G = nx.complete_graph(N)
        nx.set_edge_attributes(G, 'capacity', 5)
        R = build_residual_network(G, 'capacity')
        kwargs = dict(residual=R)

        for flow_func in flow_funcs:
            kwargs['flow_func'] = flow_func
            flow_value = nx.maximum_flow_value(G, 1, 2, **kwargs)
            assert_equal(flow_value, 5 * (N - 1),
                         msg=msg.format(flow_func.__name__))

    def test_pyramid(self):
        N = 10
        #N = 100 # this gives a graph with 5051 nodes
        G = gen_pyramid(N)
        R = build_residual_network(G, 'capacity')
        kwargs = dict(residual=R)

        for flow_func in flow_funcs:
            kwargs['flow_func'] = flow_func
            flow_value = nx.maximum_flow_value(G, (0, 0), 't', **kwargs)
            assert_almost_equal(flow_value, 1.,
                                msg=msg.format(flow_func.__name__))

    def test_gl1(self):
        G = read_graph('gl1')
        s = 1
        t = len(G)
        R = build_residual_network(G, 'capacity')
        kwargs = dict(residual=R)

        for flow_func in flow_funcs:
            validate_flows(G, s, t, 156545, flow_func(G, s, t, **kwargs),
                           flow_func)

    def test_gw1(self):
        G = read_graph('gw1')
        s = 1
        t = len(G)
        R = build_residual_network(G, 'capacity')
        kwargs = dict(residual=R)

        for flow_func in flow_funcs:
            validate_flows(G, s, t, 1202018, flow_func(G, s, t, **kwargs),
                           flow_func)

    def test_wlm3(self):
        G = read_graph('wlm3')
        s = 1
        t = len(G)
        R = build_residual_network(G, 'capacity')
        kwargs = dict(residual=R)

        for flow_func in flow_funcs:
            validate_flows(G, s, t, 11875108, flow_func(G, s, t, **kwargs),
                           flow_func)

    def test_preflow_push_global_relabel(self):
        G = read_graph('gw1')
        R = preflow_push(G, 1, len(G), global_relabel_freq=50)
        assert_equal(R.graph['flow_value'], 1202018)