/usr/include/qpid/SessionState.h is in libqpidcommon2-dev 0.14-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 | #ifndef QPID_SESSIONSTATE_H
#define QPID_SESSIONSTATE_H
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include <qpid/SessionId.h>
#include <qpid/framing/SequenceNumber.h>
#include <qpid/framing/SequenceSet.h>
#include <qpid/framing/AMQFrame.h>
#include <qpid/framing/FrameHandler.h>
#include <boost/operators.hpp>
#include <boost/range/iterator_range.hpp>
#include <vector>
#include <iosfwd>
#include <qpid/CommonImportExport.h>
namespace qpid {
using framing::SequenceNumber;
using framing::SequenceSet;
/** A point in the session. Points to command id + offset */
struct SessionPoint : boost::totally_ordered1<SessionPoint> {
QPID_COMMON_EXTERN SessionPoint(SequenceNumber command = 0, uint64_t offset = 0);
SequenceNumber command;
uint64_t offset;
/** Advance past frame f */
QPID_COMMON_EXTERN void advance(const framing::AMQFrame& f);
QPID_COMMON_EXTERN bool operator<(const SessionPoint&) const;
QPID_COMMON_EXTERN bool operator==(const SessionPoint&) const;
};
QPID_COMMON_EXTERN std::ostream& operator<<(std::ostream&, const SessionPoint&);
/**
* Support for session idempotence barrier and resume as defined in
* AMQP 0-10.
*
* We only issue/use contiguous confirmations, out-of-order confirmation
* is ignored. Out of order completion is fully supported.
*
* Raises NotImplemented if the command point is set greater than the
* max currently received command data, either explicitly via
* session.command-point or implicitly via session.gap.
*
* Partial replay is not supported, replay always begins on a command
* boundary, and we never confirm partial commands.
*
* The SessionPoint data structure does store offsets so this class
* could be extended to support partial replay without
* source-incompatbile API changes.
*/
class SessionState {
typedef std::vector<framing::AMQFrame> ReplayList;
public:
typedef boost::iterator_range<ReplayList::iterator> ReplayRange;
struct Configuration {
QPID_COMMON_EXTERN Configuration(size_t flush=1024*1024, size_t hard=0);
size_t replayFlushLimit; // Flush when the replay list >= N bytes. 0 disables.
size_t replayHardLimit; // Kill session if replay list > N bytes. 0 disables.
};
QPID_COMMON_EXTERN SessionState(const SessionId& =SessionId(), const Configuration& =Configuration());
QPID_COMMON_EXTERN virtual ~SessionState();
bool hasState() const;
const SessionId& getId() const { return id; }
QPID_COMMON_EXTERN virtual uint32_t getTimeout() const;
QPID_COMMON_EXTERN virtual void setTimeout(uint32_t seconds);
bool operator==(const SessionId& other) const { return id == other; }
bool operator==(const SessionState& other) const { return id == other.id; }
// ==== Functions for sender state.
/** Record frame f for replay. Should not be called during replay. */
QPID_COMMON_EXTERN virtual void senderRecord(const framing::AMQFrame& f);
/** @return true if we should send flush for confirmed and completed commands. */
QPID_COMMON_EXTERN virtual bool senderNeedFlush() const;
/** Called when flush for confirmed and completed commands is sent to peer. */
QPID_COMMON_EXTERN virtual void senderRecordFlush();
/** True if we should reply to the next incoming completed command */
QPID_COMMON_EXTERN virtual bool senderNeedKnownCompleted() const;
/** Called when knownCompleted is sent to peer. */
QPID_COMMON_EXTERN virtual void senderRecordKnownCompleted();
/** Called when the peer confirms up to comfirmed. */
QPID_COMMON_EXTERN virtual void senderConfirmed(const SessionPoint& confirmed);
/** Called when the peer indicates commands completed */
QPID_COMMON_EXTERN virtual void senderCompleted(const SequenceSet& commands);
/** Point from which the next new (not replayed) data will be sent. */
QPID_COMMON_EXTERN virtual SessionPoint senderGetCommandPoint();
/** Set of outstanding incomplete commands */
QPID_COMMON_EXTERN virtual SequenceSet senderGetIncomplete() const;
/** Point from which we can replay. */
QPID_COMMON_EXTERN virtual SessionPoint senderGetReplayPoint() const;
/** Peer expecting commands from this point.
*@return Range of frames to be replayed.
*/
QPID_COMMON_EXTERN virtual ReplayRange senderExpected(const SessionPoint& expected);
// ==== Functions for receiver state
/** Set the command point. */
QPID_COMMON_EXTERN virtual void receiverSetCommandPoint(const SessionPoint& point);
/** Returns true if frame should be be processed, false if it is a duplicate. */
QPID_COMMON_EXTERN virtual bool receiverRecord(const framing::AMQFrame& f);
/** Command completed locally */
QPID_COMMON_EXTERN virtual void receiverCompleted(SequenceNumber command, bool cumulative=false);
/** Peer has indicated commands are known completed */
QPID_COMMON_EXTERN virtual void receiverKnownCompleted(const SequenceSet& commands);
/** True if the next completed control should set the timely-reply argument
* to request a knonw-completed response.
*/
QPID_COMMON_EXTERN virtual bool receiverNeedKnownCompleted() const;
/** Get the incoming command point */
QPID_COMMON_EXTERN virtual const SessionPoint& receiverGetExpected() const;
/** Get the received high-water-mark, may be > getExpected() during replay */
QPID_COMMON_EXTERN virtual const SessionPoint& receiverGetReceived() const;
/** Completed received commands that the peer may not know about. */
QPID_COMMON_EXTERN virtual const SequenceSet& receiverGetUnknownComplete() const;
/** Incomplete received commands. */
QPID_COMMON_EXTERN virtual const SequenceSet& receiverGetIncomplete() const;
/** ID of the command currently being handled. */
QPID_COMMON_EXTERN virtual SequenceNumber receiverGetCurrent() const;
/** Set the state variables, used to create a session that will resume
* from some previously established point.
*/
QPID_COMMON_EXTERN virtual void setState(
const SequenceNumber& replayStart,
const SequenceNumber& sendCommandPoint,
const SequenceSet& sentIncomplete,
const SequenceNumber& expected,
const SequenceNumber& received,
const SequenceSet& unknownCompleted,
const SequenceSet& receivedIncomplete
);
/**
* So called 'push' bridges work by faking a subscribe request
* (and the accompanying flows etc) to the local broker to initiate
* the outflow of messages for the bridge.
*
* As the peer doesn't send these it cannot include them in its
* session state. To keep the session state on either side of the
* bridge in sync, this hack allows the tracking of state for
* received messages to be disabled for the faked commands and
* subsequently re-enabled.
*/
QPID_COMMON_EXTERN void disableReceiverTracking();
QPID_COMMON_EXTERN void enableReceiverTracking();
private:
struct SendState {
SendState();
// invariant: replayPoint <= flushPoint <= sendPoint
SessionPoint replayPoint; // Can replay from this point
SessionPoint flushPoint; // Point of last flush
SessionPoint sendPoint; // Send from this point
ReplayList replayList; // Starts from replayPoint.
size_t unflushedSize; // Un-flushed bytes in replay list.
size_t replaySize; // Total bytes in replay list.
SequenceSet incomplete; // Commands sent and not yet completed.
size_t bytesSinceKnownCompleted; // Bytes sent since we last issued a knownCompleted.
} sender;
struct ReceiveState {
ReceiveState();
SessionPoint expected; // Expected from here
SessionPoint received; // Received to here. Invariant: expected <= received.
SequenceSet unknownCompleted; // Received & completed, may not not known-complete by peer.
SequenceSet incomplete; // Incomplete received commands.
size_t bytesSinceKnownCompleted; // Bytes sent since we last issued a knownCompleted.
} receiver;
SessionId id;
uint32_t timeout;
Configuration config;
bool stateful;
bool receiverTrackingDisabled;//very nasty hack for 'push' bridges
};
inline bool operator==(const SessionId& id, const SessionState& s) { return s == id; }
} // namespace qpid
#endif /*!QPID_SESSIONSTATE_H*/
|