| Class | Qrack::Subscription |
| In: |
lib/qrack/subscription.rb
|
| Parent: | Object |
Subscription ancestor class
| ack | [RW] | |
| client | [R] | |
| consumer_tag | [RW] | |
| delivery_tag | [RW] | |
| exclusive | [RW] | |
| message_count | [R] | |
| message_max | [RW] | |
| queue | [R] | |
| timeout | [RW] |
# File lib/qrack/subscription.rb, line 8
8: def initialize(client, queue, opts = {})
9: @client = client
10: @queue = queue
11:
12: # Get timeout value
13: @timeout = opts[:timeout] || nil
14:
15: # Get maximum amount of messages to process
16: @message_max = opts[:message_max] || nil
17:
18: # If a consumer tag is not passed in the server will generate one
19: @consumer_tag = opts[:consumer_tag] || nil
20:
21: # Ignore the :nowait option if passed, otherwise program will hang waiting for a
22: # response from the server causing an error.
23: opts.delete(:nowait)
24:
25: # Do we want to have to provide an acknowledgement?
26: @ack = opts[:ack] || nil
27:
28: # Does this consumer want exclusive use of the queue?
29: @exclusive = opts[:exclusive] || false
30:
31: # Initialize message counter
32: @message_count = 0
33:
34: # Give queue reference to this subscription
35: @queue.subscription = self
36:
37: # Store options
38: @opts = opts
39:
40: end
# File lib/qrack/subscription.rb, line 42
42: def start(&blk)
43:
44: # Do not process any messages if zero message_max
45: if message_max == 0
46: return
47: end
48:
49: # Notify server about new consumer
50: setup_consumer
51:
52: # Start subscription loop
53: loop do
54:
55: begin
56: method = client.next_method(:timeout => timeout)
57: rescue Qrack::ClientTimeout
58: queue.unsubscribe()
59: break
60: end
61:
62: # Increment message counter
63: @message_count += 1
64:
65: # get delivery tag to use for acknowledge
66: queue.delivery_tag = method.delivery_tag if @ack
67:
68: header = client.next_payload
69:
70: # If maximum frame size is smaller than message payload body then message
71: # will have a message header and several message bodies
72: msg = ''
73: while msg.length < header.size
74: msg += client.next_payload
75: end
76:
77: # If block present, pass the message info to the block for processing
78: blk.call({:header => header, :payload => msg, :delivery_details => method.arguments}) if !blk.nil?
79:
80: # Exit loop if message_max condition met
81: if (!message_max.nil? and message_count == message_max)
82: # Stop consuming messages
83: queue.unsubscribe()
84: # Acknowledge receipt of the final message
85: queue.ack() if @ack
86: # Quit the loop
87: break
88: end
89:
90: # Have to do the ack here because the ack triggers the release of messages from the server
91: # if you are using Client#qos prefetch and you will get extra messages sent through before
92: # the unsubscribe takes effect to stop messages being sent to this consumer unless the ack is
93: # deferred.
94: queue.ack() if @ack
95:
96: end
97:
98: end