| Class | Stomp::Client |
| In: |
lib/stomp.rb
|
| Parent: | Object |
Accepts a username (default ""), password (default ""), host (default localhost), and port (default 61613)
# File lib/stomp.rb, line 269
269: def initialize user="", pass="", host="localhost", port=61613, reliable=false
270: if user =~ /stomp:\/\/(\w+):(\d+)/
271: user = ""
272: pass = ""
273: host = $1
274: port = $2
275: reliable = false
276: elsif user =~ /stomp:\/\/(\w+):(\w+)@(\w+):(\d+)/
277: user = $1
278: pass = $2
279: host = $3
280: port = $4
281: reliable = false
282: end
283:
284: @id_mutex = Mutex.new
285: @ids = 1
286: @connection = Connection.open user, pass, host, port, reliable
287: @listeners = {}
288: @receipt_listeners = {}
289: @running = true
290: @replay_messages_by_txn = Hash.new
291: @listener_thread = Thread.start do
292: while @running
293: message = @connection.receive
294: case
295: when message == NIL
296: break
297: when message.command == 'MESSAGE'
298: if listener = @listeners[message.headers['destination']]
299: listener.call(message)
300: end
301: when message.command == 'RECEIPT'
302: if listener = @receipt_listeners[message.headers['receipt-id']]
303: listener.call(message)
304: end
305: end
306: end
307: end
308: end
Accepts a username (default ""), password (default ""), host (default localhost), and port (default 61613)
# File lib/stomp.rb, line 318
318: def self.open user="", pass="", host="localhost", port=61613, reliable=false
319: Client.new user, pass, host, port, reliable
320: end
Abort a transaction by name
# File lib/stomp.rb, line 328
328: def abort name, headers={}
329: @connection.abort name, headers
330:
331: # lets replay any ack'd messages in this transaction
332: replay_list = @replay_messages_by_txn[name]
333: if replay_list
334: replay_list.each do |message|
335: if listener = @listeners[message.headers['destination']]
336: listener.call(message)
337: end
338: end
339: end
340: end
Acknowledge a message, used then a subscription has specified client acknowledgement ( connection.subscribe "/queue/a", :ack => ‘client‘g
Accepts a transaction header ( :transaction => ‘some_transaction_id’ )
# File lib/stomp.rb, line 369
369: def acknowledge message, headers={}
370: txn_id = headers[:transaction]
371: if txn_id
372: # lets keep around messages ack'd in this transaction in case we rollback
373: replay_list = @replay_messages_by_txn[txn_id]
374: if replay_list == nil
375: replay_list = []
376: @replay_messages_by_txn[txn_id] = replay_list
377: end
378: replay_list << message
379: end
380: if block_given?
381: headers['receipt'] = register_receipt_listener lambda {|r| yield r}
382: end
383: @connection.ack message.headers['message-id'], headers
384: end
Begin a transaction by name
# File lib/stomp.rb, line 323
323: def begin name, headers={}
324: @connection.begin name, headers
325: end
Close out resources in use by this client
# File lib/stomp.rb, line 405
405: def close
406: @connection.disconnect
407: @running = false
408: end
Commit a transaction by name
# File lib/stomp.rb, line 343
343: def commit name, headers={}
344: txn_id = headers[:transaction]
345: @replay_messages_by_txn.delete(txn_id)
346: @connection.commit name, headers
347: end
Join the listener thread for this client, generally used to wait for a quit signal
# File lib/stomp.rb, line 312
312: def join
313: @listener_thread.join
314: end
Send message to destination
If a block is given a receipt will be requested and passed to the block on receipt
Accepts a transaction header ( :transaction => ‘some_transaction_id’ )
# File lib/stomp.rb, line 392
392: def send destination, message, headers = {}
393: if block_given?
394: headers['receipt'] = register_receipt_listener lambda {|r| yield r}
395: end
396: @connection.send destination, message, headers
397: end
Subscribe to a destination, must be passed a block which will be used as a callback listener
Accepts a transaction header ( :transaction => ‘some_transaction_id’ )
# File lib/stomp.rb, line 353
353: def subscribe destination, headers={}
354: raise "No listener given" unless block_given?
355: @listeners[destination] = lambda {|msg| yield msg}
356: @connection.subscribe destination, headers
357: end
Unsubecribe from a channel
# File lib/stomp.rb, line 360
360: def unsubscribe name, headers={}
361: @connection.unsubscribe name, headers
362: @listeners[name] = nil
363: end