class Mongo::Grid::FSBucket::Stream::Read
A stream that reads files from the FSBucket
.
@since 2.1.0
Attributes
@return [ BSON::ObjectId, Object ] file_id
The id of the file being read.
@since 2.1.0
@return [ FSBucket
] fs The fs bucket from which this stream reads.
@since 2.1.0
@return [ Hash ] options The stream options.
@since 2.1.0
Public Class Methods
Create a stream for reading files from the FSBucket
.
@example Create the stream.
Stream::Read.new(fs, options)
@param [ FSBucket
] fs The GridFS bucket object. @param [ Hash ] options The read stream options.
@option options [ BSON::Document ] :file_info_doc For internal
driver use only. A BSON document to use as file information.
@since 2.1.0
# File lib/mongo/grid/stream/read.rb, line 53 def initialize(fs, options) @fs = fs @options = options.dup @file_id = @options.delete(:file_id) @options.freeze @open = true end
Public Instance Methods
Close the read stream.
If the stream is already closed, this method does nothing.
@example Close the stream.
stream.close
@return [ BSON::ObjectId, Object ] The file id.
@since 2.1.0
# File lib/mongo/grid/stream/read.rb, line 116 def close if @open view.close_query @open = false end file_id end
Is the stream closed.
@example Is the stream closd.
stream.closed?
@return [ true, false ] Whether the stream is closed.
@since 2.1.0
# File lib/mongo/grid/stream/read.rb, line 132 def closed? !@open end
Iterate through chunk data streamed from the FSBucket
.
@example Iterate through the chunk data.
stream.each do |data| buffer << data end
@return [ Enumerator ] The enumerator.
@raise [ Error::MissingFileChunk
] If a chunk is found out of sequence.
@yieldparam [ Hash ] Each chunk of file data.
@since 2.1.0
# File lib/mongo/grid/stream/read.rb, line 75 def each ensure_readable! info = file_info num_chunks = (info.length + info.chunk_size - 1) / info.chunk_size if block_given? view.each_with_index.reduce(0) do |length_read, (doc, index)| chunk = Grid::File::Chunk.new(doc) validate!(index, num_chunks, chunk, length_read) data = chunk.data.data yield data length_read += data.size end else view.to_enum end end
Get the files collection file information document for the file being read.
@note The file information is cached in the stream. Subsequent
calls to file_info will return the same information that the first call returned, and will not query the database again.
@return [ File::Info
] The file information object.
@since 2.1.0
# File lib/mongo/grid/stream/read.rb, line 158 def file_info @file_info ||= begin doc = options[:file_info_doc] || fs.files_collection.find(_id: file_id).first if doc File::Info.new(Options::Mapper.transform(doc, File::Info::MAPPINGS.invert)) else nil end end end
Read
all file data.
@example Read
the file data.
stream.read
@return [ String ] The file data.
@raise [ Error::MissingFileChunk
] If a chunk is found out of sequence.
@since 2.1.0
# File lib/mongo/grid/stream/read.rb, line 102 def read to_a.join end
Get the read preference used when streaming.
@example Get the read preference.
stream.read_preference
@return [ Mongo::ServerSelector
] The read preference.
@since 2.1.0
# File lib/mongo/grid/stream/read.rb, line 144 def read_preference @read_preference ||= options[:read] || fs.read_preference end
Private Instance Methods
# File lib/mongo/grid/stream/read.rb, line 175 def ensure_file_info! raise Error::FileNotFound.new(file_id, :id) unless file_info end
# File lib/mongo/grid/stream/read.rb, line 171 def ensure_open! raise Error::ClosedStream.new if closed? end
# File lib/mongo/grid/stream/read.rb, line 179 def ensure_readable! ensure_open! ensure_file_info! end
# File lib/mongo/grid/stream/read.rb, line 201 def raise_unexpected_chunk_length!(chunk) close raise Error::UnexpectedChunkLength.new(file_info.chunk_size, chunk) end
# File lib/mongo/grid/stream/read.rb, line 196 def validate!(index, num_chunks, chunk, length_read) validate_n!(index, chunk) validate_length!(index, num_chunks, chunk, length_read) end
# File lib/mongo/grid/stream/read.rb, line 206 def validate_length!(index, num_chunks, chunk, length_read) if num_chunks > 0 && chunk.data.data.size > 0 raise Error::ExtraFileChunk.new unless index < num_chunks if index == num_chunks - 1 unless chunk.data.data.size + length_read == file_info.length raise_unexpected_chunk_length!(chunk) end elsif chunk.data.data.size != file_info.chunk_size raise_unexpected_chunk_length!(chunk) end end end
# File lib/mongo/grid/stream/read.rb, line 219 def validate_n!(index, chunk) unless index == chunk.n close raise Error::MissingFileChunk.new(index, chunk) end end
# File lib/mongo/grid/stream/read.rb, line 184 def view @view ||= begin opts = if read_preference options.merge(read: read_preference) else options end fs.chunks_collection.find({ :files_id => file_id }, opts).sort(:n => 1) end end