| Class | Bunny::Subscription09 |
| In: |
lib/bunny/subscription09.rb
|
| Parent: | Qrack::Subscription |
Asks the server to start a "consumer", which is a transient request for messages from a specific queue. Consumers last as long as the channel they were created on, or until the client cancels them with an unsubscribe. Every time a message reaches the queue it is passed to the blk for processing. If error occurs, Bunny::ProtocolError is raised.
Passes a hash of message information to the block, if one has been supplied. The hash contains :header, :payload and :delivery_details. The structure of the data is as follows -
:header has instance variables -
@klass
@size
@weight
@properties is a hash containing -
:content_type
:delivery_mode
:priority
:payload contains the message contents
:delivery details is a hash containing -
:consumer_tag :delivery_tag :redelivered :exchange :routing_key
If the :timeout option is specified then Qrack::ClientTimeout is raised if method times out waiting to receive the next message from the queue.
my_queue.subscribe(:timeout => 5) {|msg| puts msg[:payload]}
my_queue.subscribe(:message_max => 10, :ack => true) {|msg| puts msg[:payload]}
# File lib/bunny/subscription09.rb, line 64
64: def setup_consumer
65: client.send_frame(
66: Qrack::Protocol09::Basic::Consume.new({ :deprecated_ticket => 0,
67: :queue => queue.name,
68: :consumer_tag => consumer_tag,
69: :no_ack => !ack,
70: :exclusive => exclusive,
71: :nowait => false}.merge(@opts))
72: )
73:
74: method = client.next_method
75:
76: client.check_response(method, Qrack::Protocol09::Basic::ConsumeOk,
77: "Error subscribing to queue #{queue.name}")
78:
79: @consumer_tag = method.consumer_tag
80:
81: end