/usr/lib/python3/dist-packages/pymongo/read_preferences.py is in python3-pymongo 2.7.2-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 | # Copyright 2012-2014 MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License",
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities for choosing which member of a replica set to read from."""
import random
from pymongo.errors import ConfigurationError
class ReadPreference:
"""An enum that defines the read preference modes supported by PyMongo.
Used in three cases:
:class:`~pymongo.mongo_client.MongoClient` connected to a single host:
* `PRIMARY`: Queries are allowed if the host is standalone or the replica
set primary.
* All other modes allow queries to standalone servers, to the primary, or
to secondaries.
:class:`~pymongo.mongo_client.MongoClient` connected to a mongos, with a
sharded cluster of replica sets:
* `PRIMARY`: Queries are sent to the primary of a shard.
* `PRIMARY_PREFERRED`: Queries are sent to the primary if available,
otherwise a secondary.
* `SECONDARY`: Queries are distributed among shard secondaries. An error
is raised if no secondaries are available.
* `SECONDARY_PREFERRED`: Queries are distributed among shard secondaries,
or the primary if no secondary is available.
* `NEAREST`: Queries are distributed among all members of a shard.
:class:`~pymongo.mongo_replica_set_client.MongoReplicaSetClient`:
* `PRIMARY`: Queries are sent to the primary of the replica set.
* `PRIMARY_PREFERRED`: Queries are sent to the primary if available,
otherwise a secondary.
* `SECONDARY`: Queries are distributed among secondaries. An error
is raised if no secondaries are available.
* `SECONDARY_PREFERRED`: Queries are distributed among secondaries,
or the primary if no secondary is available.
* `NEAREST`: Queries are distributed among all members.
"""
PRIMARY = 0
PRIMARY_PREFERRED = 1
SECONDARY = 2
SECONDARY_ONLY = 2
SECONDARY_PREFERRED = 3
NEAREST = 4
# For formatting error messages
modes = {
ReadPreference.PRIMARY: 'PRIMARY',
ReadPreference.PRIMARY_PREFERRED: 'PRIMARY_PREFERRED',
ReadPreference.SECONDARY: 'SECONDARY',
ReadPreference.SECONDARY_PREFERRED: 'SECONDARY_PREFERRED',
ReadPreference.NEAREST: 'NEAREST',
}
_mongos_modes = [
'primary',
'primaryPreferred',
'secondary',
'secondaryPreferred',
'nearest',
]
def mongos_mode(mode):
return _mongos_modes[mode]
def mongos_enum(enum):
return _mongos_modes.index(enum)
def select_primary(members):
for member in members:
if member.is_primary:
return member
return None
def select_member_with_tags(members, tags, secondary_only, latency):
candidates = []
for candidate in members:
if secondary_only and candidate.is_primary:
continue
if not (candidate.is_primary or candidate.is_secondary):
# In RECOVERING or similar state
continue
if candidate.matches_tags(tags):
candidates.append(candidate)
if not candidates:
return None
# ping_time is in seconds
fastest = min([candidate.get_avg_ping_time() for candidate in candidates])
near_candidates = [
candidate for candidate in candidates
if candidate.get_avg_ping_time() - fastest < latency / 1000.]
return random.choice(near_candidates)
def select_member(
members,
mode=ReadPreference.PRIMARY,
tag_sets=None,
latency=15
):
"""Return a Member or None.
"""
if tag_sets is None:
tag_sets = [{}]
# For brevity
PRIMARY = ReadPreference.PRIMARY
PRIMARY_PREFERRED = ReadPreference.PRIMARY_PREFERRED
SECONDARY = ReadPreference.SECONDARY
SECONDARY_PREFERRED = ReadPreference.SECONDARY_PREFERRED
NEAREST = ReadPreference.NEAREST
if mode == PRIMARY:
if tag_sets != [{}]:
raise ConfigurationError("PRIMARY cannot be combined with tags")
return select_primary(members)
elif mode == PRIMARY_PREFERRED:
# Recurse.
candidate_primary = select_member(members, PRIMARY, [{}], latency)
if candidate_primary:
return candidate_primary
else:
return select_member(members, SECONDARY, tag_sets, latency)
elif mode == SECONDARY:
for tags in tag_sets:
candidate = select_member_with_tags(members, tags, True, latency)
if candidate:
return candidate
return None
elif mode == SECONDARY_PREFERRED:
# Recurse.
candidate_secondary = select_member(
members, SECONDARY, tag_sets, latency)
if candidate_secondary:
return candidate_secondary
else:
return select_member(members, PRIMARY, [{}], latency)
elif mode == NEAREST:
for tags in tag_sets:
candidate = select_member_with_tags(members, tags, False, latency)
if candidate:
return candidate
# Ran out of tags.
return None
else:
raise ConfigurationError("Invalid mode %s" % repr(mode))
"""Commands that may be sent to replica-set secondaries, depending on
ReadPreference and tags. All other commands are always run on the primary.
"""
secondary_ok_commands = frozenset([
"group", "aggregate", "collstats", "dbstats", "count", "distinct",
"geonear", "geosearch", "geowalk", "mapreduce", "getnonce", "authenticate",
"text", "parallelcollectionscan"
])
class MovingAverage(object):
def __init__(self, samples):
"""Immutable structure to track a 5-sample moving average.
"""
self.samples = samples[-5:]
assert self.samples
self.average = sum(self.samples) / float(len(self.samples))
def clone_with(self, sample):
"""Get a copy of this instance plus a new sample"""
return MovingAverage(self.samples + [sample])
def get(self):
return self.average
|