# File lib/memcache/event_machine.rb, line 67 def self.connect(host, port, timeout) EM.connect(host, port, self) do |conn| conn.pending_connect_timeout = timeout end end
# File lib/memcache/event_machine.rb, line 73 def initialize @connected = false @index = 0 @buf = '' end
# File lib/memcache/event_machine.rb, line 127 def can_read?(size) @buf.size >= @index + size end
# File lib/memcache/event_machine.rb, line 83 def close @connected = false close_connection(true) end
# File lib/memcache/event_machine.rb, line 79 def closed? !@connected end
# File lib/memcache/event_machine.rb, line 109 def gets while true # Read to ensure we have some data in the buffer line = read(2) # Reset the buffer index to zero @buf = @buf.slice(@index..-1) @index = 0 if eol = @buf.index(SEP) line << yank(eol + SEP.size) break else # EOL not in the current buffer line << yank(@buf.size) end end line end
# File lib/memcache/event_machine.rb, line 144 def post_init @connected = true succeed end
# File lib/memcache/event_machine.rb, line 92 def read(size) if can_read?(size) yank(size) else fiber = Fiber.current @size = size @callback = proc { |data| fiber.resume(data) } # TODO Can leak fiber if the connection dies while # this fiber is yielded, waiting for data Fiber.yield end end
EM callbacks
# File lib/memcache/event_machine.rb, line 133 def receive_data(data) @buf << data if @callback and can_read?(@size) callback = @callback data = yank(@size) @callback = @size = nil callback.call(data) end end
# File lib/memcache/event_machine.rb, line 149 def unbind if @connected @connected = false else fail end end
# File lib/memcache/event_machine.rb, line 88 def write(buf) send_data(buf) end
# File lib/memcache/event_machine.rb, line 161 def yank(len) data = @buf.slice(@index, len) @index += len @index = @buf.size if @index > @buf.size if @index >= BUFFER_SIZE @buf = @buf.slice(@index..-1) @index = 0 end data end