class Mongo::Collection::View::Aggregation
Provides behavior around an aggregation pipeline on a collection view.
@since 2.0.0
Constants
- REROUTE
The reroute message.
@since 2.1.0 @deprecated
Attributes
@return [ Array<Hash> ] pipeline The aggregation pipeline.
@return [ View
] view The collection view.
Public Class Methods
Initialize the aggregation for the provided collection view, pipeline and options.
@example Create the new aggregation view.
Aggregation.view.new(view, pipeline)
@param [ Collection::View
] view The collection view. @param [ Array<Hash> ] pipeline The pipeline of operations. @param [ Hash ] options The aggregation options.
@option options [ true, false ] :allow_disk_use Set to true if disk
usage is allowed during the aggregation.
@option options [ Integer ] :batch_size The number of documents to return
per batch.
@option options [ true, false ] :bypass_document_validation Whether or
not to skip document level validation.
@option options [ Hash ] :collation The collation to use. @option options [ String ] :comment Associate a comment with the aggregation. @option options [ String ] :hint The index to use for the aggregation. @option options [ Hash ] :let Mapping of variables to use in the pipeline.
See the server documentation for details.
@option options [ Integer ] :max_time_ms The maximum amount of time in
milliseconds to allow the aggregation to run.
@option options [ true, false ] :use_cursor Indicates whether the command
will request that the server provide results using a cursor. Note that as of server version 3.6, aggregations always provide results using a cursor and this option is therefore not valid.
@option options [ Session
] :session The session to use.
@since 2.0.0
# File lib/mongo/collection/view/aggregation.rb, line 96 def initialize(view, pipeline, options = {}) @view = view @pipeline = pipeline.dup @options = BSON::Document.new(options).freeze end
Public Instance Methods
Set to true if disk usage is allowed during the aggregation.
@example Set disk usage flag.
aggregation.allow_disk_use(true)
@param [ true, false ] value The flag value.
@return [ true, false, Aggregation
] The aggregation if a value was
set or the value if used as a getter.
@since 2.0.0
# File lib/mongo/collection/view/aggregation.rb, line 62 def allow_disk_use(value = nil) configure(:allow_disk_use, value) end
Get the explain plan for the aggregation.
@example Get the explain plan for the aggregation.
aggregation.explain
@return [ Hash ] The explain plan.
@since 2.0.0
# File lib/mongo/collection/view/aggregation.rb, line 110 def explain self.class.new(view, pipeline, options.merge(explain: true)).first end
Whether this aggregation will write its result to a database collection.
@return [ Boolean ] Whether the aggregation will write its result
to a collection.
@api private
# File lib/mongo/collection/view/aggregation.rb, line 120 def write? pipeline.any? { |op| op.key?('$out') || op.key?(:$out) || op.key?('$merge') || op.key?(:$merge) } end
Private Instance Methods
# File lib/mongo/collection/view/aggregation.rb, line 130 def aggregate_spec(server, session, read_preference) Builder::Aggregation.new( pipeline, view, options.merge(session: session, read_preference: read_preference) ).specification end
Skip, sort, limit, projection are specified as pipeline stages rather than as options.
# File lib/mongo/collection/view/aggregation.rb, line 191 def cache_options { namespace: collection.namespace, selector: pipeline, read_concern: view.read_concern, read_preference: view.read_preference, collation: options[:collation], # Aggregations can read documents from more than one collection, # so they will be cleared on every write operation. multi_collection: true, } end
Return effective read preference for the operation.
If the pipeline contains $merge or $out, and read preference specified by user is secondary or secondary_preferred, and selected server is below 5.0, than this method returns primary read preference, because the aggregation will be routed to primary. Otherwise return the original read preference.
@param [ Server
] server The server on which the operation
should be executed.
@return [ Hash | nil ] read preference hash that should be sent with
this command.
# File lib/mongo/collection/view/aggregation.rb, line 160 def effective_read_preference(server) return unless view.read_preference return view.read_preference unless write? return view.read_preference unless [:secondary, :secondary_preferred].include?(view.read_preference[:mode]) primary_read_preference = {mode: :primary} if server.primary? log_warn("Routing the Aggregation operation to the primary server") primary_read_preference elsif server.mongos? && !server.features.merge_out_on_secondary_enabled? log_warn("Routing the Aggregation operation to the primary server") primary_read_preference else view.read_preference end end
# File lib/mongo/collection/view/aggregation.rb, line 142 def initial_query_op(server, session, read_preference) Operation::Aggregate.new(aggregate_spec(server, session, read_preference)) end
# File lib/mongo/collection/view/aggregation.rb, line 138 def new(options) Aggregation.new(view, pipeline, options) end
# File lib/mongo/collection/view/aggregation.rb, line 178 def send_initial_query(server, session) initial_query_op( server, session, effective_read_preference(server) ).execute( server, context: Operation::Context.new(client: client, session: session) ) end
# File lib/mongo/collection/view/aggregation.rb, line 126 def server_selector @view.send(:server_selector) end