class Mongo::PoolManager

Attributes

arbiters[R]
connection[R]
hosts[R]
max_bson_size[R]
members[R]
nodes[R]
primary[R]
primary_pool[R]
read_pool[R]
secondaries[R]
secondary_pool[R]
secondary_pools[R]
tag_map[R]
tags_to_pools[R]

Public Class Methods

new(connection, seeds=[]) click to toggle source

Create a new set of connection pools.

The pool manager will by default use the original seed list passed to the connection objects, accessible via connection.seeds. In addition, the user may pass an additional list of seeds nodes discovered in real time. The union of these lists will be used when attempting to connect, with the newly-discovered nodes being used first.

# File lib/mongo/util/pool_manager.rb, line 15
def initialize(connection, seeds=[])
  @connection = connection
  @original_seeds = connection.seeds
  @seeds = seeds
  @previously_connected = false
end

Public Instance Methods

check_connection_health() click to toggle source

We're healthy if all members are pingable and if the view of the replica set returned by isMaster is equivalent to our view. If any of these isn't the case, set @refresh_required to true, and return.

# File lib/mongo/util/pool_manager.rb, line 44
def check_connection_health
  begin
    seed = get_valid_seed_node
  rescue ConnectionFailure
    @refresh_required = true
    return
  end

  config = seed.set_config
  if !config
    @refresh_required = true
    seed.close
    return
  end

  if config['hosts'].length != @members.length
    @refresh_required = true
    seed.close
    return
  end

  config['hosts'].each do |host|
    member = @members.detect do |m|
      m.address == host
    end

    if member && validate_existing_member(member)
      next
    else
      @refresh_required = true
      seed.close
      return false
    end
  end

  seed.close
end
close(opts={}) click to toggle source
# File lib/mongo/util/pool_manager.rb, line 91
def close(opts={})
  begin
    if @primary_pool
      @primary_pool.close(opts)
    end

    if @secondary_pools
      @secondary_pools.each do |pool|
        pool.close(opts)
      end
    end

    if @members
      @members.each do |member|
        member.close
      end
    end

    rescue ConnectionFailure
  end
end
closed?() click to toggle source
# File lib/mongo/util/pool_manager.rb, line 87
def closed?
  pools.all? { |pool| pool.closed? }
end
connect() click to toggle source
# File lib/mongo/util/pool_manager.rb, line 26
def connect
  close if @previously_connected

  initialize_data
  members = connect_to_members
  initialize_pools(members)
  cache_discovered_seeds(members)
  set_read_pool
  set_tag_mappings

  @members = members
  @previously_connected = true
end
inspect() click to toggle source
# File lib/mongo/util/pool_manager.rb, line 22
def inspect
  "<Mongo::PoolManager:0x#{self.object_id.to_s(16)} @seeds=#{@seeds}>"
end
refresh_required?() click to toggle source

The replica set connection should initiate a full refresh.

# File lib/mongo/util/pool_manager.rb, line 83
def refresh_required?
  @refresh_required
end
seeds() click to toggle source

The set of nodes that this class has discovered and successfully connected to.

# File lib/mongo/util/pool_manager.rb, line 115
def seeds
  @seeds || []
end

Private Instance Methods

assign_primary(member) click to toggle source
# File lib/mongo/util/pool_manager.rb, line 207
def assign_primary(member)
  member.last_state = :primary
  @primary = member.host_port
  @primary_pool = Pool.new(self.connection, member.host, member.port,
                          :size => self.connection.pool_size,
                          :timeout => self.connection.pool_timeout,
                          :node => member)
  associate_tags_with_pool(member.tags, @primary_pool)
end
assign_secondary(member) click to toggle source
# File lib/mongo/util/pool_manager.rb, line 217
def assign_secondary(member)
  member.last_state = :secondary
  @secondaries << member.host_port
  pool = Pool.new(self.connection, member.host, member.port,
                               :size => self.connection.pool_size,
                               :timeout => self.connection.pool_timeout,
                               :node => member)
  @secondary_pools << pool
  associate_tags_with_pool(member.tags, pool)
end
associate_tags_with_pool(tags, pool) click to toggle source
# File lib/mongo/util/pool_manager.rb, line 183
def associate_tags_with_pool(tags, pool)
  tags.each_key do |key|
    @tags_to_pools[{key => tags[key]}] ||= []
    @tags_to_pools[{key => tags[key]}] << pool
  end
end
cache_discovered_seeds(members) click to toggle source
# File lib/mongo/util/pool_manager.rb, line 300
def cache_discovered_seeds(members)
  @seeds = members.map { |n| n.host_port }
end
connect_to_members() click to toggle source

Connect to each member of the replica set as reported by the given seed node, and return as a list of Mongo::Node objects.

# File lib/mongo/util/pool_manager.rb, line 163
def connect_to_members
  members = []

  seed = get_valid_seed_node

  seed.node_list.each do |host|
    node = Mongo::Node.new(self.connection, host)
    if node.connect && node.set_config
      members << node
    end
  end
  seed.close

  if members.empty?
    raise ConnectionFailure, "Failed to connect to any given member."
  end

  members
end
get_valid_seed_node() click to toggle source

Iterate through the list of provided seed nodes until we've gotten a response from the replica set we're trying to connect to.

If we don't get a response, raise an exception.

# File lib/mongo/util/pool_manager.rb, line 280
def get_valid_seed_node
  seed_list.each do |seed|
    node = Mongo::Node.new(self.connection, seed)
    if !node.connect
      next
    elsif node.set_config
      return node
    else
      node.close
    end
  end

  raise ConnectionFailure, "Cannot connect to a replica set using seeds " +
    "#{seed_list.map {|s| "#{s[0]}:#{s[1]}" }.join(', ')}"
end
initialize_data() click to toggle source
# File lib/mongo/util/pool_manager.rb, line 145
def initialize_data
  @primary = nil
  @primary_pool = nil
  @read_pool = nil
  @arbiters = []
  @secondaries = []
  @secondary_pool = nil
  @secondary_pools = []
  @hosts = Set.new
  @members = Set.new
  @tags_to_pools = {}
  @tag_map = {}
  @refresh_required = false
end
initialize_pools(members) click to toggle source

Initialize the connection pools for the primary and secondary nodes.

# File lib/mongo/util/pool_manager.rb, line 191
def initialize_pools(members)
  members.each do |member|
    @hosts << member.host_string

    if member.primary?
      assign_primary(member)
    elsif member.secondary? && !@secondaries.include?(member.host_port)
      assign_secondary(member)
    end
  end

  @max_bson_size = members.first.config['maxBsonObjectSize'] ||
    Mongo::DEFAULT_MAX_BSON_SIZE
  @arbiters = members.first.arbiters
end
nearby_pool_from_set(pool_set) click to toggle source
# File lib/mongo/util/pool_manager.rb, line 255
def nearby_pool_from_set(pool_set)
  ping_ranges = Array.new(3) { |i| Array.new }
    pool_set.each do |pool|
      case pool.ping_time
        when 0..150
          ping_ranges[0] << pool
        when 150..1000
          ping_ranges[1] << pool
        else
          ping_ranges[2] << pool
      end
    end

    for list in ping_ranges do
      break if !list.empty?
    end

  list[rand(list.length)]
end
pools() click to toggle source
# File lib/mongo/util/pool_manager.rb, line 121
def pools
  [@primary_pool, *@secondary_pools]
end
seed_list() click to toggle source
# File lib/mongo/util/pool_manager.rb, line 296
def seed_list
  @seeds | @original_seeds
end
set_read_pool() click to toggle source

Pick a node from the set of possible secondaries. If more than one node is available, use the ping time to figure out which nodes to choose from.

# File lib/mongo/util/pool_manager.rb, line 243
def set_read_pool
  if @secondary_pools.empty?
    @read_pool = @primary_pool
  elsif @secondary_pools.size == 1
    @read_pool = @secondary_pools[0]
    @secondary_pool = @read_pool
  else
    @read_pool = nearby_pool_from_set(@secondary_pools)
    @secondary_pool = @read_pool
  end
end
set_tag_mappings() click to toggle source

If there's more than one pool associated with a given tag, choose a close one using the bucket method.

# File lib/mongo/util/pool_manager.rb, line 230
def set_tag_mappings
  @tags_to_pools.each do |key, pool_list|
    if pool_list.length == 1
      @tag_map[key] = pool_list.first
    else
      @tag_map[key] = nearby_pool_from_set(pool_list)
    end
  end
end
validate_existing_member(member) click to toggle source
# File lib/mongo/util/pool_manager.rb, line 125
def validate_existing_member(member)
  config = member.set_config
  if !config
    return false
  else
    if member.primary?
      if member.last_state == :primary
        return true
      else # This node is now primary, but didn't used to be.
        return false
      end
    elsif member.last_state == :secondary &&
      member.secondary?
      return true
    else # This node isn't what it used to be.
      return false
    end
  end
end