class Dalli::Server

Constants

CAS_HEADER
DEFAULTS
DEFAULT_PORT
DEFAULT_WEIGHT
FLAG_COMPRESSED
FLAG_SERIALIZED

www.hjp.at/zettel/m/memcached_flags.rxml Looks like most clients use bit 0 to indicate native language serialization and bit 1 to indicate gzip compression.

FORMAT
KV_HEADER
MAX_ACCEPTABLE_EXPIRATION_INTERVAL

code.google.com/p/memcached/wiki/NewCommands#Standard_Protocol > An expiration time, in seconds. Can be up to 30 days. After 30 days, is treated as a unix timestamp of an exact date.

NORMAL_HEADER
OPCODES
OP_FORMAT
REQUEST
RESPONSE
RESPONSE_CODES

Attributes

hostname[RW]
options[RW]
port[RW]
sock[R]
socket_type[R]
weight[RW]

Public Class Methods

new(attribs, options = {}) click to toggle source
# File lib/dalli/server.rb, line 37
def initialize(attribs, options = {})
  @hostname, @port, @weight, @socket_type = parse_hostname(attribs)
  @fail_count = 0
  @down_at = nil
  @last_down_at = nil
  @options = DEFAULTS.merge(options)
  @sock = nil
  @msg = nil
  @error = nil
  @pid = nil
  @inprogress = nil
end

Public Instance Methods

alive?() click to toggle source
# File lib/dalli/server.rb, line 82
def alive?
  return true if @sock

  if @last_down_at && @last_down_at + options[:down_retry_delay] >= Time.now
    time = @last_down_at + options[:down_retry_delay] - Time.now
    Dalli.logger.debug { "down_retry_delay not reached for #{name} (%.3f seconds left)" % time }
    return false
  end

  connect
  !!@sock
rescue Dalli::NetworkError
  false
end
close() click to toggle source
# File lib/dalli/server.rb, line 97
def close
  return unless @sock
  @sock.close rescue nil
  @sock = nil
  @pid = nil
  @inprogress = false
end
compressor() click to toggle source
# File lib/dalli/server.rb, line 115
def compressor
  @options[:compressor]
end
lock!() click to toggle source
# File lib/dalli/server.rb, line 105
def lock!
end
multi_response_abort() click to toggle source

Abort an earlier multi_response_start. Used to signal an external timeout. The underlying socket is disconnected, and the exception is swallowed.

Returns nothing.

# File lib/dalli/server.rb, line 191
def multi_response_abort
  @multi_buffer = nil
  @position = nil
  @inprogress = false
  failure!(RuntimeError.new('External timeout'))
rescue NetworkError
  true
end
multi_response_completed?() click to toggle source

Did the last call to multi_response_start complete successfully?

# File lib/dalli/server.rb, line 133
def multi_response_completed?
  @multi_buffer.nil?
end
multi_response_nonblock() click to toggle source

Attempt to receive and parse as many key/value pairs as possible from this server. After multi_response_start, this should be invoked repeatedly whenever this server's socket is readable until multi_response_completed?.

Returns a Hash of kv pairs received.

# File lib/dalli/server.rb, line 143
def multi_response_nonblock
  raise 'multi_response has completed' if @multi_buffer.nil?

  @multi_buffer << @sock.read_available
  buf = @multi_buffer
  pos = @position
  values = {}

  while buf.bytesize - pos >= 24
    header = buf.slice(pos, 24)
    (key_length, _, body_length, cas) = header.unpack(KV_HEADER)

    if key_length == 0
      # all done!
      @multi_buffer = nil
      @position = nil
      @inprogress = false
      break

    elsif buf.bytesize - pos >= 24 + body_length
      flags = buf.slice(pos + 24, 4).unpack('N')[0]
      key = buf.slice(pos + 24 + 4, key_length)
      value = buf.slice(pos + 24 + 4 + key_length, body_length - key_length - 4) if body_length - key_length - 4 > 0

      pos = pos + 24 + body_length

      begin
        values[key] = [deserialize(value, flags), cas]
      rescue DalliError
      end

    else
      # not enough data yet, wait for more
      break
    end
  end
  @position = pos

  values
rescue SystemCallError, Timeout::Error, EOFError => e
  failure!(e)
end
multi_response_start() click to toggle source

Start reading key/value pairs from this connection. This is usually called after a series of GETKQ commands. A NOOP is sent, and the server begins flushing responses for kv pairs that were found.

Returns nothing.

# File lib/dalli/server.rb, line 124
def multi_response_start
  verify_state
  write_noop
  @multi_buffer = ''
  @position = 0
  @inprogress = true
end
name() click to toggle source
# File lib/dalli/server.rb, line 50
def name
  if socket_type == :unix
    hostname
  else
    "#{hostname}:#{port}"
  end
end
request(op, *args) click to toggle source

Chokepoint method for instrumentation

# File lib/dalli/server.rb, line 59
def request(op, *args)
  verify_state
  raise Dalli::NetworkError, "#{name} is down: #{@error} #{@msg}. If you are sure it is running, ensure memcached version is > 1.4." unless alive?
  begin
    send(op, *args)
  rescue Dalli::NetworkError
    raise
  rescue Dalli::MarshalError => ex
    Dalli.logger.error "Marshalling error for key '#{args.first}': #{ex.message}"
    Dalli.logger.error "You are trying to cache a Ruby object which cannot be serialized to memcached."
    Dalli.logger.error ex.backtrace.join("\n\t")
    false
  rescue Dalli::DalliError
    raise
  rescue Timeout::Error
    raise
  rescue => ex
    Dalli.logger.error "Unexpected exception during Dalli request: #{ex.class.name}: #{ex.message}"
    Dalli.logger.error ex.backtrace.join("\n\t")
    down!
  end
end
serializer() click to toggle source
# File lib/dalli/server.rb, line 111
def serializer
  @options[:serializer]
end
unlock!() click to toggle source
# File lib/dalli/server.rb, line 108
def unlock!
end

Private Instance Methods

add(key, value, ttl, options) click to toggle source
# File lib/dalli/server.rb, line 284
def add(key, value, ttl, options)
  (value, flags) = serialize(key, value, options)
  ttl = sanitize_ttl(ttl)

  guard_max_value(key, value) do
    req = [REQUEST, OPCODES[multi? ? :addq : :add], key.bytesize, 8, 0, 0, value.bytesize + key.bytesize + 8, 0, 0, flags, ttl, key, value].pack(FORMAT[:add])
    write(req)
    cas_response unless multi?
  end
end
append(key, value) click to toggle source
# File lib/dalli/server.rb, line 358
def append(key, value)
  write_append_prepend :append, key, value
end
cas(key) click to toggle source
# File lib/dalli/server.rb, line 376
def cas(key)
  req = [REQUEST, OPCODES[:get], key.bytesize, 0, 0, 0, key.bytesize, 0, 0, key].pack(FORMAT[:get])
  write(req)
  data_cas_response
end
cas_response() click to toggle source
# File lib/dalli/server.rb, line 500
def cas_response
  (_, _, status, count, _, cas) = read_header.unpack(CAS_HEADER)
  read(count) if count > 0  # this is potential data that we don't care about
  if status == 1
    nil
  elsif status == 2 || status == 5
    false # Not stored, normal status for add operation
  elsif status != 0
    raise Dalli::DalliError, "Response error #{status}: #{RESPONSE_CODES[status]}"
  else
    cas
  end
end
connect() click to toggle source
# File lib/dalli/server.rb, line 563
def connect
  Dalli.logger.debug { "Dalli::Server#connect #{name}" }

  begin
    @pid = Process.pid
    if socket_type == :unix
      @sock = KSocket::UNIX.open(hostname, self, options)
    else
      @sock = KSocket::TCP.open(hostname, port, self, options)
    end
    sasl_authentication if need_auth?
    @version = version # trigger actual connect
    up!
  rescue Dalli::DalliError # SASL auth failure
    raise
  rescue SystemCallError, Timeout::Error, EOFError, SocketError => e
    # SocketError = DNS resolution failure
    failure!(e)
  end
end
data_cas_response() click to toggle source
# File lib/dalli/server.rb, line 442
def data_cas_response
  (extras, _, status, count, _, cas) = read_header.unpack(CAS_HEADER)
  data = read(count) if count > 0
  if status == 1
    nil
  elsif status != 0
    raise Dalli::DalliError, "Response error #{status}: #{RESPONSE_CODES[status]}"
  elsif data
    flags = data[0...extras].unpack('N')[0]
    value = data[extras..-1]
    data = deserialize(value, flags)
  end
  [data, cas]
end
decr(key, count, ttl, default) click to toggle source
# File lib/dalli/server.rb, line 329
def decr(key, count, ttl, default)
  decr_incr :decr, key, count, ttl, default
end
decr_incr(opcode, key, count, ttl, default) click to toggle source
# File lib/dalli/server.rb, line 318
def decr_incr(opcode, key, count, ttl, default)
  expiry = default ? sanitize_ttl(ttl) : 0xFFFFFFFF
  default ||= 0
  (h, l) = split(count)
  (dh, dl) = split(default)
  req = [REQUEST, OPCODES[opcode], key.bytesize, 20, 0, 0, key.bytesize + 20, 0, 0, h, l, dh, dl, expiry, key].pack(FORMAT[opcode])
  write(req)
  body = generic_response
  body ? body.unpack('Q>').first : body
end
delete(key, cas) click to toggle source
# File lib/dalli/server.rb, line 306
def delete(key, cas)
  req = [REQUEST, OPCODES[multi? ? :deleteq : :delete], key.bytesize, 0, 0, 0, key.bytesize, 0, cas, key].pack(FORMAT[:delete])
  write(req)
  generic_response unless multi?
end
deserialize(value, flags) click to toggle source
# File lib/dalli/server.rb, line 428
def deserialize(value, flags)
  value = self.compressor.decompress(value) if (flags & FLAG_COMPRESSED) != 0
  value = self.serializer.load(value) if (flags & FLAG_SERIALIZED) != 0
  value
rescue TypeError
  raise if $!.message !~ /needs to have method `_load'|exception class\/object expected|instance of IO needed|incompatible marshal file format/
  raise UnmarshalError, "Unable to unmarshal value: #{$!.message}"
rescue ArgumentError
  raise if $!.message !~ /undefined class|marshal data too short/
  raise UnmarshalError, "Unable to unmarshal value: #{$!.message}"
rescue Zlib::Error
  raise UnmarshalError, "Unable to uncompress value: #{$!.message}"
end
down!() click to toggle source
# File lib/dalli/server.rb, line 223
def down!
  close

  @last_down_at = Time.now

  if @down_at
    time = Time.now - @down_at
    Dalli.logger.debug { "#{name} is still down (for %.3f seconds now)" % time }
  else
    @down_at = @last_down_at
    Dalli.logger.warn { "#{name} is down" }
  end

  @error = $! && $!.class.name
  @msg = @msg || ($! && $!.message && !$!.message.empty? && $!.message)
  raise Dalli::NetworkError, "#{name} is down: #{@error} #{@msg}"
end
failure!(exception) click to toggle source
# File lib/dalli/server.rb, line 209
def failure!(exception)
  message = "#{name} failed (count: #{@fail_count}) #{exception.class}: #{exception.message}"
  Dalli.logger.info { message }

  @fail_count += 1
  if @fail_count >= options[:socket_max_failures]
    down!
  else
    close
    sleep(options[:socket_failure_delay]) if options[:socket_failure_delay]
    raise Dalli::NetworkError, "Socket operation failed, retrying..."
  end
end
flush(ttl) click to toggle source
# File lib/dalli/server.rb, line 312
def flush(ttl)
  req = [REQUEST, OPCODES[:flush], 0, 4, 0, 0, 4, 0, 0, 0].pack(FORMAT[:flush])
  write(req)
  generic_response
end
generic_response(unpack=false) click to toggle source
# File lib/dalli/server.rb, line 482
def generic_response(unpack=false)
  (extras, _, status, count) = read_header.unpack(NORMAL_HEADER)
  data = read(count) if count > 0
  if status == 1
    nil
  elsif status == 2 || status == 5
    false # Not stored, normal status for add operation
  elsif status != 0
    raise Dalli::DalliError, "Response error #{status}: #{RESPONSE_CODES[status]}"
  elsif data
    flags = data[0...extras].unpack('N')[0]
    value = data[extras..-1]
    unpack ? deserialize(value, flags) : value
  else
    true
  end
end
get(key) click to toggle source
# File lib/dalli/server.rb, line 258
def get(key)
  req = [REQUEST, OPCODES[:get], key.bytesize, 0, 0, 0, key.bytesize, 0, 0, key].pack(FORMAT[:get])
  write(req)
  generic_response(true)
end
guard_max_value(key, value) { || ... } click to toggle source
# File lib/dalli/server.rb, line 461
def guard_max_value(key, value)
  if value.bytesize <= @options[:value_max_bytes]
    yield
  else
    Dalli.logger.warn "Value for #{key} over max size: #{@options[:value_max_bytes]} <= #{value.bytesize}"
    false
  end
end
incr(key, count, ttl, default) click to toggle source
# File lib/dalli/server.rb, line 333
def incr(key, count, ttl, default)
  decr_incr :incr, key, count, ttl, default
end
keyvalue_response() click to toggle source
# File lib/dalli/server.rb, line 514
def keyvalue_response
  hash = {}
  loop do
    (key_length, _, body_length, _) = read_header.unpack(KV_HEADER)
    return hash if key_length == 0
    key = read(key_length)
    value = read(body_length - key_length) if body_length - key_length > 0
    hash[key] = value
  end
end
multi?() click to toggle source
# File lib/dalli/server.rb, line 254
def multi?
  Thread.current[:dalli_multi]
end
multi_response() click to toggle source
# File lib/dalli/server.rb, line 525
def multi_response
  hash = {}
  loop do
    (key_length, _, body_length, _) = read_header.unpack(KV_HEADER)
    return hash if key_length == 0
    flags = read(4).unpack('N')[0]
    key = read(key_length)
    value = read(body_length - key_length - 4) if body_length - key_length - 4 > 0
    hash[key] = deserialize(value, flags)
  end
end
need_auth?() click to toggle source

SASL authentication support for NorthScale

# File lib/dalli/server.rb, line 658
def need_auth?
  @options[:username] || ENV['MEMCACHE_USERNAME']
end
noop() click to toggle source

Noop is a keepalive operation but also used to demarcate the end of a set of pipelined commands. We need to read all the responses at once.

# File lib/dalli/server.rb, line 353
def noop
  write_noop
  multi_response
end
parse_hostname(str) click to toggle source
# File lib/dalli/server.rb, line 701
def parse_hostname(str)
  res = str.match(/\A(\[([\h:]+)\]|[^:]+)(?::(\d+))?(?::(\d+))?\z/)
  raise Dalli::DalliError, "Could not parse hostname #{str}" if res.nil? || res[1] == '[]'
  hostnam = res[2] || res[1]
  if hostnam =~ /\A\//
    socket_type = :unix
    # in case of unix socket, allow only setting of weight, not port
    raise Dalli::DalliError, "Could not parse hostname #{str}" if res[4]
    weigh = res[3]
  else
    socket_type = :tcp
    por = res[3] || DEFAULT_PORT
    por = Integer(por)
    weigh = res[4]
  end
  weigh ||= DEFAULT_WEIGHT
  weigh = Integer(weigh)
  return hostnam, por, weigh, socket_type
end
password() click to toggle source
# File lib/dalli/server.rb, line 666
def password
  @options[:password] || ENV['MEMCACHE_PASSWORD']
end
prepend(key, value) click to toggle source
# File lib/dalli/server.rb, line 362
def prepend(key, value)
  write_append_prepend :prepend, key, value
end
read(count) click to toggle source
# File lib/dalli/server.rb, line 548
def read(count)
  begin
    @inprogress = true
    data = @sock.readfull(count)
    @inprogress = false
    data
  rescue SystemCallError, Timeout::Error, EOFError => e
    failure!(e)
  end
end
read_header() click to toggle source
# File lib/dalli/server.rb, line 559
def read_header
  read(24) || raise(Dalli::NetworkError, 'No response')
end
replace(key, value, ttl, cas, options) click to toggle source
# File lib/dalli/server.rb, line 295
def replace(key, value, ttl, cas, options)
  (value, flags) = serialize(key, value, options)
  ttl = sanitize_ttl(ttl)

  guard_max_value(key, value) do
    req = [REQUEST, OPCODES[multi? ? :replaceq : :replace], key.bytesize, 8, 0, 0, value.bytesize + key.bytesize + 8, 0, cas, flags, ttl, key, value].pack(FORMAT[:replace])
    write(req)
    cas_response unless multi?
  end
end
reset_stats() click to toggle source
# File lib/dalli/server.rb, line 372
def reset_stats
  write_generic [REQUEST, OPCODES[:stat], 'reset'.bytesize, 0, 0, 0, 'reset'.bytesize, 0, 0, 'reset'].pack(FORMAT[:stat])
end
sanitize_ttl(ttl) click to toggle source
# File lib/dalli/server.rb, line 473
def sanitize_ttl(ttl)
  if ttl > MAX_ACCEPTABLE_EXPIRATION_INTERVAL
    Dalli.logger.debug "Expiration interval too long for Memcached, converting to an expiration timestamp"
    Time.now.to_i + ttl.to_i
  else
    ttl.to_i
  end
end
sasl_authentication() click to toggle source
# File lib/dalli/server.rb, line 670
def sasl_authentication
  Dalli.logger.info { "Dalli/SASL authenticating as #{username}" }

  # negotiate
  req = [REQUEST, OPCODES[:auth_negotiation], 0, 0, 0, 0, 0, 0, 0].pack(FORMAT[:noop])
  write(req)

  (extras, type, status, count) = read_header.unpack(NORMAL_HEADER)
  raise Dalli::NetworkError, "Unexpected message format: #{extras} #{count}" unless extras == 0 && count > 0
  content = read(count).gsub(/\u0000/, ' ')
  return (Dalli.logger.debug("Authentication not required/supported by server")) if status == 0x81
  mechanisms = content.split(' ')
  raise NotImplementedError, "Dalli only supports the PLAIN authentication mechanism" if !mechanisms.include?('PLAIN')

  # request
  mechanism = 'PLAIN'
  msg = "\x0#{username}\x0#{password}"
  req = [REQUEST, OPCODES[:auth_request], mechanism.bytesize, 0, 0, 0, mechanism.bytesize + msg.bytesize, 0, 0, mechanism, msg].pack(FORMAT[:auth_request])
  write(req)

  (extras, type, status, count) = read_header.unpack(NORMAL_HEADER)
  raise Dalli::NetworkError, "Unexpected message format: #{extras} #{count}" unless extras == 0 && count > 0
  content = read(count)
  return Dalli.logger.info("Dalli/SASL: #{content}") if status == 0

  raise Dalli::DalliError, "Error authenticating: #{status}" unless status == 0x21
  raise NotImplementedError, "No two-step authentication mechanisms supported"
  # (step, msg) = sasl.receive('challenge', content)
  # raise Dalli::NetworkError, "Authentication failed" if sasl.failed? || step != 'response'
end
send_multiget(keys) click to toggle source
# File lib/dalli/server.rb, line 264
def send_multiget(keys)
  req = ""
  keys.each do |key|
    req << [REQUEST, OPCODES[:getkq], key.bytesize, 0, 0, 0, key.bytesize, 0, 0, key].pack(FORMAT[:getkq])
  end
  # Could send noop here instead of in multi_response_start
  write(req)
end
serialize(key, value, options=nil) click to toggle source
# File lib/dalli/server.rb, line 397
def serialize(key, value, options=nil)
  marshalled = false
  value = unless options && options[:raw]
    marshalled = true
    begin
      self.serializer.dump(value)
    rescue TimeoutError => e
      raise e
    rescue => ex
      # Marshalling can throw several different types of generic Ruby exceptions.
      # Convert to a specific exception so we can special case it higher up the stack.
      exc = Dalli::MarshalError.new(ex.message)
      exc.set_backtrace ex.backtrace
      raise exc
    end
  else
    value.to_s
  end
  compressed = false
  if @options[:compress] && value.bytesize >= @options[:compression_min_size] &&
    (!@options[:compression_max_size] || value.bytesize <= @options[:compression_max_size])
    value = self.compressor.compress(value)
    compressed = true
  end

  flags = 0
  flags |= FLAG_COMPRESSED if compressed
  flags |= FLAG_SERIALIZED if marshalled
  [value, flags]
end
set(key, value, ttl, cas, options) click to toggle source
# File lib/dalli/server.rb, line 273
def set(key, value, ttl, cas, options)
  (value, flags) = serialize(key, value, options)
  ttl = sanitize_ttl(ttl)

  guard_max_value(key, value) do
    req = [REQUEST, OPCODES[multi? ? :setq : :set], key.bytesize, 8, 0, 0, value.bytesize + key.bytesize + 8, 0, cas, flags, ttl, key, value].pack(FORMAT[:set])
    write(req)
    cas_response unless multi?
  end
end
split(n) click to toggle source
# File lib/dalli/server.rb, line 584
def split(n)
  [n >> 32, 0xFFFFFFFF & n]
end
stats(info='') click to toggle source
# File lib/dalli/server.rb, line 366
def stats(info='')
  req = [REQUEST, OPCODES[:stat], info.bytesize, 0, 0, 0, info.bytesize, 0, 0, info].pack(FORMAT[:stat])
  write(req)
  keyvalue_response
end
touch(key, ttl) click to toggle source
# File lib/dalli/server.rb, line 386
def touch(key, ttl)
  ttl = sanitize_ttl(ttl)
  write_generic [REQUEST, OPCODES[:touch], key.bytesize, 4, 0, 0, key.bytesize + 4, 0, 0, ttl, key].pack(FORMAT[:touch])
end
up!() click to toggle source
# File lib/dalli/server.rb, line 241
def up!
  if @down_at
    time = Time.now - @down_at
    Dalli.logger.warn { "#{name} is back (downtime was %.3f seconds)" % time }
  end

  @fail_count = 0
  @down_at = nil
  @last_down_at = nil
  @msg = nil
  @error = nil
end
username() click to toggle source
# File lib/dalli/server.rb, line 662
def username
  @options[:username] || ENV['MEMCACHE_USERNAME']
end
verify_state() click to toggle source

NOTE: Additional public methods should be overridden in Dalli::Threadsafe

# File lib/dalli/server.rb, line 204
def verify_state
  failure!(RuntimeError.new('Already writing to socket')) if @inprogress
  failure!(RuntimeError.new('Cannot share client between multiple processes')) if @pid && @pid != Process.pid
end
version() click to toggle source
# File lib/dalli/server.rb, line 382
def version
  write_generic [REQUEST, OPCODES[:version], 0, 0, 0, 0, 0, 0, 0].pack(FORMAT[:noop])
end
write(bytes) click to toggle source
# File lib/dalli/server.rb, line 537
def write(bytes)
  begin
    @inprogress = true
    result = @sock.write(bytes)
    @inprogress = false
    result
  rescue SystemCallError, Timeout::Error => e
    failure!(e)
  end
end
write_append_prepend(opcode, key, value) click to toggle source
# File lib/dalli/server.rb, line 337
def write_append_prepend(opcode, key, value)
  write_generic [REQUEST, OPCODES[opcode], key.bytesize, 0, 0, 0, value.bytesize + key.bytesize, 0, 0, key, value].pack(FORMAT[opcode])
end
write_generic(bytes) click to toggle source
# File lib/dalli/server.rb, line 341
def write_generic(bytes)
  write(bytes)
  generic_response
end
write_noop() click to toggle source
# File lib/dalli/server.rb, line 346
def write_noop
  req = [REQUEST, OPCODES[:noop], 0, 0, 0, 0, 0, 0, 0].pack(FORMAT[:noop])
  write(req)
end