This file is indexed.

/usr/include/caf/io/basp_broker.hpp is in libcaf-dev 0.13.2-3.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
/******************************************************************************
 *                       ____    _    _____                                   *
 *                      / ___|  / \  |  ___|    C++                           *
 *                     | |     / _ \ | |_       Actor                         *
 *                     | |___ / ___ \|  _|      Framework                     *
 *                      \____/_/   \_|_|                                      *
 *                                                                            *
 * Copyright (C) 2011 - 2015                                                  *
 * Dominik Charousset <dominik.charousset (at) haw-hamburg.de>                *
 *                                                                            *
 * Distributed under the terms and conditions of the BSD 3-Clause License or  *
 * (at your option) under the terms and conditions of the Boost Software      *
 * License 1.0. See accompanying files LICENSE and LICENSE_ALTERNATIVE.       *
 *                                                                            *
 * If you did not receive a copy of the license files, see                    *
 * http://opensource.org/licenses/BSD-3-Clause and                            *
 * http://www.boost.org/LICENSE_1_0.txt.                                      *
 ******************************************************************************/

#ifndef CAF_IO_BASP_BROKER_HPP
#define CAF_IO_BASP_BROKER_HPP

#include <map>
#include <set>
#include <string>
#include <future>
#include <vector>

#include "caf/actor_namespace.hpp"
#include "caf/binary_serializer.hpp"
#include "caf/binary_deserializer.hpp"
#include "caf/forwarding_actor_proxy.hpp"

#include "caf/io/basp.hpp"
#include "caf/io/broker.hpp"

namespace caf {
namespace io {

/**
 * A broker implementation for the Binary Actor System Protocol (BASP).
 */
class basp_broker : public broker, public actor_namespace::backend {
 public:
  basp_broker(middleman& parent_ref);

  ~basp_broker();

  behavior make_behavior() override;

  void add_published_actor(accept_handle hdl, const abstract_actor_ptr& whom,
                           uint16_t port);

  bool remove_published_actor(const abstract_actor_ptr& whom);

  bool remove_published_actor(const abstract_actor_ptr& whom, uint16_t port);

  actor_proxy_ptr make_proxy(const node_id&, actor_id) override;

  class payload_writer {
   public:
    payload_writer() = default;
    payload_writer(const payload_writer&) = default;
    payload_writer& operator=(const payload_writer&) = default;
    virtual ~payload_writer();
    virtual void write(binary_serializer&) = 0;
  };

  void dispatch(connection_handle hdl, uint32_t operation,
                const node_id& src_node, actor_id src_actor,
                const node_id& dest_node, actor_id dest_actor,
                uint64_t op_data = 0, payload_writer* writer = nullptr);

  node_id dispatch(uint32_t operation, const node_id& src_node,
                   actor_id src_actor, const node_id& dest_node,
                   actor_id dest_actor, uint64_t op_data = 0,
                   payload_writer* writer = nullptr);

  // dispatches a message from a local actor to a remote node
  node_id dispatch(const actor_addr& from, const actor_addr& to,
                   message_id mid, const message& msg);

  struct client_handshake_data {
    int64_t request_id;
    actor client;
    std::set<std::string> expected_ifs;
  };

  inline actor_namespace& get_namespace() {
    return m_namespace;
  }

 protected:
  void on_exit() override;

 private:
  void erase_proxy(const node_id& nid, actor_id aid);

  // dispatches a message from a remote node to a local actor
  void local_dispatch(const basp::header& msg, message&& payload);

  enum connection_state {
    // client just started, await handshake from server
    await_server_handshake,
    // server accepted new connection and sent handshake, await response
    await_client_handshake,
    // connection established, read series of broker messages
    await_header,
    // currently waiting for payload of a received message
    await_payload,
    // connection is going to be shut down because of an error
    close_connection
  };

  struct connection_context {
    connection_state state;
    connection_handle hdl;
    node_id remote_id;
    optional<client_handshake_data> handshake_data;
    basp::header hdr;
    // keep a reference to the published actor of
    // the remote node to prevent this particular
    // proxy instance from expiring; this prevents
    // a bug where re-using an "old" connection via
    // remote_actor() could return an expired proxy
    actor published_actor;
  };

  void read(binary_deserializer& bs, basp::header& msg);

  void write(binary_serializer& bs, const basp::header& msg);

  void send_kill_proxy_instance(const node_id& nid, actor_id aid,
                                uint32_t reason);

  connection_state handle_basp_header(connection_context& ctx,
                                      const buffer_type* payload = nullptr);

  optional<skip_message_t> add_monitor(connection_context& ctx, actor_id aid);

  optional<skip_message_t> kill_proxy(connection_context& ctx, actor_id aid,
                                      std::uint32_t reason);

  void new_data(connection_context& ctx, buffer_type& buf);

  void init_handshake_as_client(connection_context& ctx);

  void init_handshake_as_server(connection_context& ctx,
                                actor_addr published_actor);

  void serialize_msg(const actor_addr& sender, message_id mid,
                     const message& msg, buffer_type& wr_buf);

  bool try_set_default_route(const node_id& nid, connection_handle hdl);

  void add_route(const node_id& nid, connection_handle hdl);

  struct connection_info {
    connection_handle hdl;
    node_id node;
    inline bool invalid() const {
      return hdl.invalid();
    }
    inline bool operator==(const connection_info& other) const {
      return hdl == other.hdl && node == other.node;
    }
    inline bool operator<(const connection_info& other) const {
      return hdl < other.hdl;
    }
  };

  connection_info get_route(const node_id& dest);

  // node_id of the target, sender of the request, and original request ID
  using pending_request = std::tuple<node_id, actor_addr, message_id>;
  using pending_requests = std::vector<pending_request>;
  using pending_request_iter = pending_requests::iterator;

  void fail_pending_requests(pending_request_iter first,
                             pending_request_iter last,
                             uint32_t reason);

  // fails all pending requests (total network failure or shutdown)
  void fail_pending_requests(uint32_t reason);

  // fails all pending requests from `addr` (disconnect from node)
  void fail_pending_requests(const node_id& addr, uint32_t reason);

  struct connection_info_less {
    inline bool operator()(const connection_info& lhs,
                           const connection_info& rhs) const {
      return lhs.hdl < rhs.hdl;
    }
    inline bool operator()(const connection_info& lhs,
                           const connection_handle& rhs) const {
      return lhs.hdl < rhs;
    }
  };

  using blacklist_entry = std::pair<node_id, connection_handle>;

  // (default route, [alternative routes])
  using routing_table_entry = std::pair<connection_info,
                                        std::set<connection_info>>;

  struct blacklist_less {
    inline bool operator()(const blacklist_entry& lhs,
                           const blacklist_entry& rhs) const {
      if (lhs.first < rhs.first) {
        return lhs.second < rhs.second;
      }
      return false;
    }
  };

  // dest => hops
  using routing_table = std::map<node_id, routing_table_entry>;

  actor_namespace m_namespace; // manages proxies
  std::map<connection_handle, connection_context> m_ctx;
  std::map<accept_handle, std::pair<abstract_actor_ptr, uint16_t>> m_acceptors;
  std::map<uint16_t, accept_handle> m_open_ports;
  routing_table m_routes; // stores non-direct routes
  std::set<blacklist_entry, blacklist_less> m_blacklist; // stores invalidated
                                                         // routes
  // a simple "bag" for our pending requests
  std::vector<pending_request> m_pending_requests;

  // needed to keep track to which node we are talking to at the moment
  connection_context* m_current_context;

  // cache some UTIs to make serialization a bit faster
  const uniform_type_info* m_meta_hdr;
  const uniform_type_info* m_meta_msg;
  const uniform_type_info* m_meta_id_type;
};

} // namespace io
} // namespace caf

#endif // CAF_IO_BASP_BROKER_HPP