class Mongo::BulkWrite

Constants

SINGLE_STATEMENT_OPS

Attributes

collection[R]

@return [ Mongo::Collection ] collection The collection.

options[R]

@return [ Hash, BSON::Document ] options The options.

requests[R]

@return [ Array<Hash, BSON::Document> ] requests The requests.

Public Class Methods

new(collection, requests, options = {}) click to toggle source

Create the new bulk write operation.

@api private

@example Create an ordered bulk write.

Mongo::BulkWrite.new(collection, [{ insert_one: { _id: 1 }}])

@example Create an unordered bulk write.

Mongo::BulkWrite.new(collection, [{ insert_one: { _id: 1 }}], ordered: false)

@example Create an ordered mixed bulk write.

Mongo::BulkWrite.new(
  collection,
  [
    { insert_one: { _id: 1 }},
    { update_one: { filter: { _id: 0 }, update: { '$set' => { name: 'test' }}}},
    { delete_one: { filter: { _id: 2 }}}
  ]
)

@param [ Mongo::Collection ] collection The collection. @param [ Array<Hash, BSON::Document> ] requests The requests. @param [ Hash, BSON::Document ] options The options.

@since 2.1.0

# File lib/mongo/bulk_write.rb, line 115
def initialize(collection, requests, options = {})
  @collection = collection
  @requests = requests
  @options = options || {}
end

Public Instance Methods

execute() click to toggle source

Execute the bulk write operation.

@example Execute the bulk write.

bulk_write.execute

@return [ Mongo::BulkWrite::Result ] The result.

@since 2.1.0

# File lib/mongo/bulk_write.rb, line 55
def execute
  operation_id = Monitoring.next_operation_id
  result_combiner = ResultCombiner.new
  operations = op_combiner.combine

  client.send(:with_session, @options) do |session|
    operations.each do |operation|
      if single_statement?(operation)
        write_concern = write_concern(session)
        write_with_retry(session, write_concern) do |server, txn_num|
          execute_operation(
              operation.keys.first,
              operation.values.flatten,
              server,
              operation_id,
              result_combiner,
              session,
              txn_num)
        end
      else
        nro_write_with_retry(session, write_concern) do |server|
          execute_operation(
              operation.keys.first,
              operation.values.flatten,
              server,
              operation_id,
              result_combiner,
              session)
        end
      end
    end
  end
  result_combiner.result
end
ordered?() click to toggle source

Is the bulk write ordered?

@api private

@example Is the bulk write ordered?

bulk_write.ordered?

@return [ true, false ] If the bulk write is ordered.

@since 2.1.0

# File lib/mongo/bulk_write.rb, line 131
def ordered?
  @ordered ||= options.fetch(:ordered, true)
end
write_concern(session = nil) click to toggle source

Get the write concern for the bulk write.

@api private

@example Get the write concern.

bulk_write.write_concern

@return [ WriteConcern ] The write concern.

@since 2.1.0

# File lib/mongo/bulk_write.rb, line 145
def write_concern(session = nil)
  @write_concern ||= options[:write_concern] ?
    WriteConcern.get(options[:write_concern]) :
    collection.write_concern_with_session(session)
end

Private Instance Methods

base_spec(operation_id, session) click to toggle source
# File lib/mongo/bulk_write.rb, line 161
def base_spec(operation_id, session)
  {
    :db_name => database.name,
    :coll_name => collection.name,
    :write_concern => write_concern(session),
    :ordered => ordered?,
    :operation_id => operation_id,
    :bypass_document_validation => !!options[:bypass_document_validation],
    :options => options,
    :id_generator => client.options[:id_generator],
    :session => session
  }
end
delete_many(documents, server, operation_id, session, txn_num) click to toggle source
# File lib/mongo/bulk_write.rb, line 209
def delete_many(documents, server, operation_id, session, txn_num)
  spec = base_spec(operation_id, session).merge(:deletes => documents)
  Operation::Delete.new(spec).bulk_execute(server)
end
delete_one(documents, server, operation_id, session, txn_num) click to toggle source
# File lib/mongo/bulk_write.rb, line 204
def delete_one(documents, server, operation_id, session, txn_num)
  spec = base_spec(operation_id, session).merge(:deletes => documents, :txn_num => txn_num)
  Operation::Delete.new(spec).bulk_execute(server)
end
execute_operation(name, values, server, operation_id, result_combiner, session, txn_num = nil) click to toggle source
# File lib/mongo/bulk_write.rb, line 175
def execute_operation(name, values, server, operation_id, result_combiner, session, txn_num = nil)
  raise Error::UnsupportedCollation.new if op_combiner.has_collation && !server.features.collation_enabled?
  raise Error::UnsupportedArrayFilters.new if op_combiner.has_array_filters && !server.features.array_filters_enabled?
  unpin_maybe(session) do
    if values.size > server.max_write_batch_size
      split_execute(name, values, server, operation_id, result_combiner, session, txn_num)
    else
      result = send(name, values, server, operation_id, session, txn_num)
      result_combiner.combine!(result, values.size)
    end
  end
rescue Error::MaxBSONSize, Error::MaxMessageSize => e
  raise e if values.size <= 1
  unpin_maybe(session) do
    split_execute(name, values, server, operation_id, result_combiner, session, txn_num)
  end
end
insert_one(documents, server, operation_id, session, txn_num) click to toggle source
# File lib/mongo/bulk_write.rb, line 214
def insert_one(documents, server, operation_id, session, txn_num)
  spec = base_spec(operation_id, session).merge(:documents => documents, :txn_num => txn_num)
  Operation::Insert.new(spec).bulk_execute(server)
end
op_combiner() click to toggle source
# File lib/mongo/bulk_write.rb, line 193
def op_combiner
  @op_combiner ||= ordered? ? OrderedCombiner.new(requests) : UnorderedCombiner.new(requests)
end
replace_one(documents, server, operation_id, session, txn_num)
Alias for: update_one
single_statement?(operation) click to toggle source
# File lib/mongo/bulk_write.rb, line 157
def single_statement?(operation)
  SINGLE_STATEMENT_OPS.include?(operation.keys.first)
end
split_execute(name, values, server, operation_id, result_combiner, session, txn_num) click to toggle source
# File lib/mongo/bulk_write.rb, line 197
def split_execute(name, values, server, operation_id, result_combiner, session, txn_num)
  execute_operation(name, values.shift(values.size / 2), server, operation_id, result_combiner, session, txn_num)

  txn_num = session.next_txn_num if txn_num
  execute_operation(name, values, server, operation_id, result_combiner, session, txn_num)
end
update_many(documents, server, operation_id, session, txn_num) click to toggle source
# File lib/mongo/bulk_write.rb, line 225
def update_many(documents, server, operation_id, session, txn_num)
  spec = base_spec(operation_id, session).merge(:updates => documents)
  Operation::Update.new(spec).bulk_execute(server)
end
update_one(documents, server, operation_id, session, txn_num) click to toggle source
# File lib/mongo/bulk_write.rb, line 219
def update_one(documents, server, operation_id, session, txn_num)
  spec = base_spec(operation_id, session).merge(:updates => documents, :txn_num => txn_num)
  Operation::Update.new(spec).bulk_execute(server)
end
Also aliased as: replace_one