class Mongo::Server::ConnectionBase
This class encapsulates common connection functionality.
@note Although methods of this module are part of the public API,
the fact that these methods are defined on this module and not on the classes which include this module is not part of the public API.
@api semipublic
Constants
- DEFAULT_MAX_BSON_OBJECT_SIZE
The maximum allowed size in bytes that a user-supplied document may take up when serialized, if the server’s hello response does not include maxBsonObjectSize field.
The commands that are sent to the server may exceed this size by
MAX_BSON_COMMAND_OVERHEAD
.@api private
- MAX_BSON_COMMAND_OVERHEAD
The additional overhead allowed for command data (i.e. fields added to the command document by the driver, as opposed to documents provided by the user) when serializing a complete command to BSON.
@api private
- REDUCED_MAX_BSON_SIZE
@api private
Attributes
Returns the server description for this connection, derived from the hello response for the handshake performed on this connection.
@note A connection object that hasn’t yet connected (handshaken and
authenticated, if authentication is required) does not have a description. While handshaking and authenticating the driver must be using global defaults, in particular not assuming that the properties of a particular connection are the same as properties of other connections made to the same address (since the server on the other end could have been shut down and a different server version could have been launched).
@return [ Server::Description
] Server
description for this connection. @api private
@return [ Hash ] options The passed in options.
@return [ Server
] The server that this connection is for.
@api private
Public Instance Methods
# File lib/mongo/server/connection_base.rb, line 107 def app_metadata @app_metadata ||= begin same = true AppMetadata::AUTH_OPTION_KEYS.each do |key| if @server.options[key] != options[key] same = false break end end if same @server.app_metadata else AppMetadata.new(options.merge(purpose: @server.app_metadata.purpose)) end end end
Dispatch a single message to the connection. If the message requires a response, a reply will be returned.
@example Dispatch the message.
connection.dispatch([ insert ])
@note This method is named dispatch since ‘send’ is a core Ruby method on
all objects.
@note For backwards compatibility, this method accepts the messages
as an array. However, exactly one message must be given per invocation.
@param [ Array<Message> ] messages A one-element array containing
the message to dispatch.
@param [ Operation::Context
] context The operation context. @param [ Hash ] options
@option options [ Boolean ] :deserialize_as_bson Whether to deserialize
the response to this message using BSON objects in place of native Ruby types wherever possible.
@return [ Protocol::Message
| nil ] The reply if needed.
@raise [ Error::SocketError
| Error::SocketTimeoutError
] When there is a network error.
@since 2.0.0
# File lib/mongo/server/connection_base.rb, line 150 def dispatch(messages, context, options = {}) # The monitoring code does not correctly handle multiple messages, # and the driver internally does not send more than one message at # a time ever. Thus prohibit multiple message use for now. if messages.length != 1 raise ArgumentError, 'Can only dispatch one message at a time' end if description.unknown? raise Error::InternalDriverError, "Cannot dispatch a message on a connection with unknown description: #{description.inspect}" end message = messages.first deliver(message, context, options) end
Connection
pool generation from which this connection was created. May be nil.
@return [ Integer | nil ] Connection
pool generation.
# File lib/mongo/server/connection_base.rb, line 100 def generation # If the connection is to a load balancer, @generation is set # after handshake completes. If the connection is to another server # type, generation is specified during connection creation. @generation || options[:generation] end
@return [ nil | Object ] The service id, if any.
# File lib/mongo/server/connection_base.rb, line 92 def service_id description&.service_id end
Private Instance Methods
@raise [ Error::SocketError
| Error::SocketTimeoutError
] When there is a network error.
# File lib/mongo/server/connection_base.rb, line 167 def deliver(message, context, options = {}) if Lint.enabled? && !@socket raise Error::LintError, "Trying to deliver a message over a disconnected connection (to #{address})" end buffer = serialize(message, context) ensure_connected do |socket| operation_id = Monitoring.next_operation_id started_event = command_started(address, operation_id, message.payload, socket_object_id: socket.object_id, connection_id: id, connection_generation: generation, server_connection_id: description.server_connection_id, service_id: description.service_id, ) start = Utils.monotonic_time result = nil begin result = add_server_diagnostics do socket.write(buffer.to_s) if message.replyable? Protocol::Message.deserialize(socket, max_message_size, message.request_id, options) else nil end end rescue Exception => e total_duration = Utils.monotonic_time - start command_failed(nil, address, operation_id, message.payload, e.message, total_duration, started_event: started_event, service_id: description.service_id, ) raise else total_duration = Utils.monotonic_time - start command_completed(result, address, operation_id, message.payload, total_duration, started_event: started_event, service_id: description.service_id, ) end if result && context.decrypt? result = result.maybe_decrypt(context) end result end end
# File lib/mongo/server/connection_base.rb, line 214 def serialize(message, context, buffer = BSON::ByteBuffer.new) # Driver specifications only mandate the fixed 16MiB limit for # serialized BSON documents. However, the server returns its # active serialized BSON document size limit in the hello response, # which is +max_bson_object_size+ below. The +DEFAULT_MAX_BSON_OBJECT_SIZE+ # is the 16MiB value mandated by the specifications which we use # only as the default if the server's hello did not contain # maxBsonObjectSize. max_bson_size = max_bson_object_size || DEFAULT_MAX_BSON_OBJECT_SIZE if context.encrypt? # The client-side encryption specification requires bulk writes to # be split at a reduced maxBsonObjectSize. If this message is a bulk # write and its size exceeds the reduced size limit, the serializer # will raise an exception, which is caught by BulkWrite. BulkWrite # will split the operation into individual writes, which will # not be subject to the reduced maxBsonObjectSize. if message.bulk_write? # Make the new maximum size equal to the specified reduced size # limit plus the 16KiB overhead allowance. max_bson_size = REDUCED_MAX_BSON_SIZE end end # RUBY-2234: It is necessary to check that the message size does not # exceed the maximum bson object size before compressing and serializing # the final message. # # This is to avoid the case where the user performs a bulk write # larger than 16MiB which, when compressed, becomes smaller than 16MiB. # If the driver does not split the bulk writes prior to compression, # the entire operation will be sent to the server, which will raise an # error because the uncompressed operation exceeds the maximum bson size. # # To address this problem, we serialize the message prior to compression # and raise an exception if the serialized message exceeds the maximum # bson size. if max_message_size # Create a separate buffer that contains the un-compressed message # for the purpose of checking its size. Write any pre-existing contents # from the original buffer into the temporary one. temp_buffer = BSON::ByteBuffer.new # TODO: address the fact that this line mutates the buffer. temp_buffer.put_bytes(buffer.get_bytes(buffer.length)) message.serialize(temp_buffer, max_bson_size, MAX_BSON_COMMAND_OVERHEAD) if temp_buffer.length > max_message_size raise Error::MaxMessageSize.new(max_message_size) end end # RUBY-2335: When the un-compressed message is smaller than the maximum # bson size limit, the message will be serialized twice. The operations # layer should be refactored to allow compression on an already- # serialized message. final_message = message.maybe_compress(compressor, options[:zlib_compression_level]) final_message.serialize(buffer, max_bson_size, MAX_BSON_COMMAND_OVERHEAD) buffer end