# File lib/mongo/server_selector/selectable.rb, line 332 def primary(candidates) candidates.select do |server| server.primary? end end
module Mongo::ServerSelector::Selectable
Provides common behavior for filtering a list of servers by server mode or tag set.
@since 2.0.0
Attributes
@return [ Integer ] max_staleness
The maximum replication lag, in seconds, that a
secondary can suffer and still be eligible for a read.
@since 2.4.0
@return [ Hash ] options The options.
@return [ Array ] tag_sets
The tag sets used to select servers.
Public Class Methods
Initialize the server selector.
@example Initialize the selector.
Mongo::ServerSelector::Secondary.new(:tag_sets => [{'dc' => 'nyc'}])
@example Initialize the preference with no options.
Mongo::ServerSelector::Secondary.new
@param [ Hash ] options The server preference options.
@option options [ Integer ] :local_threshold The local threshold boundary for
nearest selection in seconds.
@option options [ Integer ] max_staleness
The maximum replication lag,
in seconds, that a secondary can suffer and still be eligible for a read. A value of -1 is treated identically to nil, which is to not have a maximum staleness.
@raise [ Error::InvalidServerPreference
] If tag sets are specified
but not allowed.
@since 2.0.0
# File lib/mongo/server_selector/selectable.rb, line 44 def initialize(options = nil) options = options ? options.dup : {} if options[:max_staleness] == -1 options.delete(:max_staleness) end @options = options.freeze @tag_sets = (options[:tag_sets] || []).freeze @max_staleness = options[:max_staleness] validate! end
Public Instance Methods
Check equality of two server selector.
@example Check server selector equality.
preference == other
@param [ Object ] other The other preference.
@return [ true, false ] Whether the objects are equal.
@since 2.0.0
# File lib/mongo/server_selector/selectable.rb, line 77 def ==(other) name == other.name && tag_sets == other.tag_sets && max_staleness == other.max_staleness end
Get the potential candidates to select from the cluster.
@example Get the server candidates.
selectable.candidates(cluster)
@param [ Cluster
] cluster The cluster.
@return [ Array<Server> ] The candidate servers.
@since 2.4.0
# File lib/mongo/server_selector/selectable.rb, line 308 def candidates(cluster) if cluster.single? cluster.servers.each { |server| validate_max_staleness_support!(server) } elsif cluster.sharded? local_threshold = local_threshold_with_cluster(cluster) near_servers(cluster.servers, local_threshold).each do |server| validate_max_staleness_support!(server) end else validate_max_staleness_value!(cluster) unless cluster.unknown? select(cluster.servers) end end
Inspect the server selector.
@example Inspect the server selector.
selector.inspect
@return [ String ] The inspection.
@since 2.2.0
# File lib/mongo/server_selector/selectable.rb, line 91 def inspect "#<#{self.class.name}:0x#{object_id} tag_sets=#{tag_sets.inspect} max_staleness=#{max_staleness.inspect}>" end
Get the local threshold boundary for nearest selection in seconds.
@example Get the local threshold.
selector.local_threshold
@return [ Float ] The local threshold.
@since 2.0.0
@deprecated This setting is now taken from the cluster options when a server is selected.
Will be removed in 3.0.
# File lib/mongo/server_selector/selectable.rb, line 294 def local_threshold @local_threshold ||= (options[:local_threshold] || ServerSelector::LOCAL_THRESHOLD) end
# File lib/mongo/server_selector/selectable.rb, line 279 def local_threshold_with_cluster(cluster) options[:local_threshold] || cluster.options[:local_threshold] || LOCAL_THRESHOLD end
Select a server from the specified cluster, taking into account mongos pinning for the specified session.
If the session is given and has a pinned server, this server is the only server considered for selection. If the server is of type mongos, it is returned immediately; otherwise monitoring checks on this server are initiated to update its status, and if the server becomes a mongos within the server selection timeout, it is returned.
If no session is given or the session does not have a pinned server, normal server selection process is performed among all servers in the specified cluster matching the preference of this server selector object. Monitoring
checks are initiated on servers in the cluster until a suitable server is found, up to the server selection timeout.
If a suitable server is not found within the server selection timeout, this method raises Error::NoServerAvailable
.
@param [ Mongo::Cluster
] cluster The cluster from which to select
an eligible server.
@param [ true, false ] ping Whether to ping the server before selection.
Deprecated and ignored.
@param [ Session
| nil ] session Optional session to take into account
for mongos pinning. Added in version 2.10.0.
@return [ Mongo::Server
] A server matching the server preference.
@raise [ Error::NoServerAvailable
] No server was found matching the
specified preference / pinning requirement in the server selection timeout.
@raise [ Error::LintError
] An unexpected condition was detected, and
lint mode is enabled.
@since 2.0.0
# File lib/mongo/server_selector/selectable.rb, line 129 def select_server(cluster, ping = nil, session = nil) server_selection_timeout = cluster.options[:server_selection_timeout] || SERVER_SELECTION_TIMEOUT # Special handling for zero timeout: if we have to select a server, # and the timeout is zero, fail immediately (since server selection # will take some non-zero amount of time in any case). if server_selection_timeout == 0 msg = "Failing server selection due to zero timeout. " + " Requested #{name} in cluster: #{cluster.summary}" raise Error::NoServerAvailable.new(self, cluster, msg) end deadline = Time.now + server_selection_timeout if session && session.pinned_server if Mongo::Lint.enabled? unless cluster.sharded? raise Error::LintError, "Session has a pinned server in a non-sharded topology: #{topology}" end end if !session.in_transaction? session.unpin end if server = session.pinned_server # Here we assume that a mongos stays in the topology indefinitely. # This will no longer be the case once SRV polling is implemented. unless server.mongos? while (time_remaining = deadline - Time.now) > 0 wait_for_server_selection(cluster, time_remaining) end unless server.mongos? msg = "The session being used is pinned to the server which is not a mongos: #{server.summary} " + "(after #{server_selection_timeout} seconds)" raise Error::NoServerAvailable.new(self, cluster, msg) end end return server end end if cluster.replica_set? validate_max_staleness_value_early! end if cluster.addresses.empty? if Lint.enabled? unless cluster.servers.empty? raise Error::LintError, "Cluster has no addresses but has servers: #{cluster.servers.map(&:inspect).join(', ')}" end end msg = "Cluster has no addresses, and therefore will never have a server" raise Error::NoServerAvailable.new(self, cluster, msg) end =begin Add this check in version 3.0.0 unless cluster.connected? msg = 'Cluster is disconnected' raise Error::NoServerAvailable.new(self, cluster, msg) end =end loop do servers = candidates(cluster) if Lint.enabled? servers.each do |server| # It is possible for a server to have a nil average RTT here # because the ARTT comes from description which may be updated # by a background thread while server selection is running. # Currently lint mode is not a public feature, if/when this # changes (https://jira.mongodb.org/browse/RUBY-1576) the # requirement for ARTT to be not nil would need to be removed. if server.average_round_trip_time.nil? raise Error::LintError, "Server #{server.address} has nil average rtt" end end end if servers && !servers.compact.empty? unless cluster.topology.compatible? raise Error::UnsupportedFeatures, cluster.topology.compatibility_error.to_s end # This list of servers may be ordered in a specific way # by the selector (e.g. for secondary preferred, the first # server may be a secondary and the second server may be primary) # and we should take the first server here respecting the order server = servers.first if cluster.topology.single? && cluster.topology.replica_set_name && cluster.topology.replica_set_name != server.description.replica_set_name then msg = "Cluster topology specifies replica set name #{cluster.topology.replica_set_name}, but the server has replica set name #{server.description.replica_set_name || '<nil>'}" raise Error::NoServerAvailable.new(self, cluster, msg) end if session && session.starting_transaction? && cluster.sharded? session.pin(server) end return server end cluster.scan!(false) time_remaining = deadline - Time.now if time_remaining > 0 wait_for_server_selection(cluster, time_remaining) # If we wait for server selection, perform another round of # attempting to locate a suitable server. Otherwise server selection # can raise NoServerAvailable message when the diagnostics # reports an available server of the requested type. else break end end msg = "No #{name} server is available in cluster: #{cluster.summary} " + "with timeout=#{server_selection_timeout}, " + "LT=#{local_threshold_with_cluster(cluster)}" msg += server_selection_diagnostic_message(cluster) raise Error::NoServerAvailable.new(self, cluster, msg) rescue Error::NoServerAvailable => e if session && session.in_transaction? && !session.committing_transaction? e.add_label('TransientTransactionError') end if session && session.committing_transaction? e.add_label('UnknownTransactionCommitResult') end raise e end
Get the timeout for server selection.
@example Get the server selection timeout, in seconds.
selector.server_selection_timeout
@return [ Float ] The timeout.
@since 2.0.0
@deprecated This setting is now taken from the cluster options when a server is selected.
Will be removed in 3.0.
# File lib/mongo/server_selector/selectable.rb, line 274 def server_selection_timeout @server_selection_timeout ||= (options[:server_selection_timeout] || ServerSelector::SERVER_SELECTION_TIMEOUT) end
Private Instance Methods
# File lib/mongo/server_selector/selectable.rb, line 417 def filter_stale_servers(candidates, primary = nil) return candidates unless @max_staleness # last_scan is filled out by the Monitor, and can be nil if a server # had its description manually set rather than being normally updated # via the SDAM flow. We don't handle the possibility of a nil # last_scan here. if primary candidates.select do |server| validate_max_staleness_support!(server) staleness = (server.last_scan - server.last_write_date) - (primary.last_scan - primary.last_write_date) + server.cluster.heartbeat_interval staleness <= @max_staleness end else max_write_date = candidates.collect(&:last_write_date).max candidates.select do |server| validate_max_staleness_support!(server) staleness = max_write_date - server.last_write_date + server.cluster.heartbeat_interval staleness <= @max_staleness end end end
Select the servers matching the defined tag sets.
@param [ Array ] candidates List of candidate servers from which those
matching the defined tag sets should be selected.
@return [ Array ] The servers matching the defined tag sets.
@since 2.0.0
# File lib/mongo/server_selector/selectable.rb, line 408 def match_tag_sets(candidates) matches = [] tag_sets.find do |tag_set| matches = candidates.select { |server| server.matches_tag_set?(tag_set) } !matches.empty? end matches || [] end
Select the near servers from a list of provided candidates, taking the
local threshold into account.
@param [ Array ] candidates List of candidate servers to select the
near servers from.
@param [ Integer ] local_threshold
Local threshold. This parameter
will be required in driver version 3.0.
@return [ Array ] The near servers.
@since 2.0.0
# File lib/mongo/server_selector/selectable.rb, line 368 def near_servers(candidates = [], local_threshold = nil) return candidates if candidates.empty? # Average RTT on any server may change at any time by the server # monitor's background thread. ARTT may also become nil if the # server is marked unknown. Take a snapshot of ARTTs for the duration # of this method. candidates = candidates.map do |server| {server: server, artt: server.average_round_trip_time} end.reject do |candidate| candidate[:artt].nil? end return candidates if candidates.empty? nearest_candidate = candidates.min_by do |candidate| candidate[:artt] end # Default for legacy signarure local_threshold ||= self.local_threshold threshold = nearest_candidate[:artt] + local_threshold candidates.select do |candidate| candidate[:artt] <= threshold end.map do |candidate| candidate[:server] end.shuffle! end
Select the primary from a list of provided candidates.
@param [ Array ] candidates List of candidate servers to select the
primary from.
@return [ Array ] The primary.
@since 2.0.0
Select the secondaries from a list of provided candidates.
@param [ Array ] candidates List of candidate servers to select the
secondaries from.
@return [ Array ] The secondary servers.
@since 2.0.0
# File lib/mongo/server_selector/selectable.rb, line 346 def secondaries(candidates) matching_servers = candidates.select(&:secondary?) matching_servers = filter_stale_servers(matching_servers, primary(candidates).first) matching_servers = match_tag_sets(matching_servers) unless tag_sets.empty? # Per server selection spec the server selected MUST be a random # one matching staleness and latency requirements. # Selectors always pass the output of #secondaries to #nearest # which shuffles the server list, fulfilling this requirement. matching_servers end
Creates a diagnostic message when server selection fails.
The diagnostic message includes the following information, as applicable:
-
Servers having dead monitor threads
-
Cluster
is disconnected
If none of the conditions for diagnostic messages apply, an empty string is returned.
@param [ Cluster
] cluster The cluster on which server selection was
performed.
@return [ String ] The diagnostic message.
# File lib/mongo/server_selector/selectable.rb, line 526 def server_selection_diagnostic_message(cluster) msg = '' dead_monitors = [] cluster.servers_list.each do |server| thread = server.monitor.instance_variable_get('@thread') if thread.nil? || !thread.alive? dead_monitors << server end end if dead_monitors.any? msg += ". The following servers have dead monitor threads: #{dead_monitors.map(&:summary).join(', ')}" end unless cluster.connected? msg += ". The cluster is disconnected (client may have been closed)" end msg end
# File lib/mongo/server_selector/selectable.rb, line 442 def validate! if !@tag_sets.all? { |set| set.empty? } && !tags_allowed? raise Error::InvalidServerPreference.new(Error::InvalidServerPreference::NO_TAG_SUPPORT) elsif @max_staleness && !max_staleness_allowed? raise Error::InvalidServerPreference.new(Error::InvalidServerPreference::NO_MAX_STALENESS_SUPPORT) end end
# File lib/mongo/server_selector/selectable.rb, line 450 def validate_max_staleness_support!(server) if @max_staleness && !server.features.max_staleness_enabled? raise Error::InvalidServerPreference.new(Error::InvalidServerPreference::NO_MAX_STALENESS_WITH_LEGACY_SERVER) end end
# File lib/mongo/server_selector/selectable.rb, line 466 def validate_max_staleness_value!(cluster) if @max_staleness heartbeat_interval = cluster.heartbeat_interval unless @max_staleness >= [ SMALLEST_MAX_STALENESS_SECONDS, min_cluster_staleness = heartbeat_interval + Cluster::IDLE_WRITE_PERIOD_SECONDS, ].max msg = "`max_staleness` value (#{@max_staleness}) is too small - it must be at least " + "`Mongo::ServerSelector::SMALLEST_MAX_STALENESS_SECONDS` (#{ServerSelector::SMALLEST_MAX_STALENESS_SECONDS}) and (the cluster's heartbeat_frequency " + "setting + `Mongo::Cluster::IDLE_WRITE_PERIOD_SECONDS`) (#{min_cluster_staleness})" raise Error::InvalidServerPreference.new(msg) end end end
# File lib/mongo/server_selector/selectable.rb, line 456 def validate_max_staleness_value_early! if @max_staleness unless @max_staleness >= SMALLEST_MAX_STALENESS_SECONDS msg = "`max_staleness` value (#{@max_staleness}) is too small - it must be at least " + "`Mongo::ServerSelector::SMALLEST_MAX_STALENESS_SECONDS` (#{ServerSelector::SMALLEST_MAX_STALENESS_SECONDS})" raise Error::InvalidServerPreference.new(msg) end end end
Waits for server state changes in the specified cluster.
If the cluster has a server selection semaphore, waits on that semaphore up to the specified remaining time. Any change in server state resulting from SDAM will immediately wake up this method and cause it to return.
If the cluster des not have a server selection semaphore, waits the smaller of 0.25 seconds and the specified remaining time. This functionality is provided for backwards compatibilty only for applications directly invoking the server selection process. If lint mode is enabled and the cluster does not have a server selection semaphore, Error::LintError
will be raised.
@param [ Cluster
] cluster The cluster to wait for. @param [ Numeric ] time_remaining Maximum time to wait, in seconds.
# File lib/mongo/server_selector/selectable.rb, line 497 def wait_for_server_selection(cluster, time_remaining) if cluster.server_selection_semaphore # Since the semaphore may have been signaled between us checking # the servers list earlier and the wait call below, we should not # wait for the full remaining time - wait for up to 1 second, then # recheck the state. cluster.server_selection_semaphore.wait([time_remaining, 1].min) else if Lint.enabled? raise Error::LintError, 'Waiting for server selection without having a server selection semaphore' end sleep [time_remaining, 0.25].min end end