| Class | Stomp::Connection |
| In: |
lib/stomp.rb
|
| Parent: | Object |
Low level connection which maps commands and supports synchronous receives
Create a connection, requires a login and passcode. Can accept a host (default is localhost), and port (default is 61613) to connect to
# File lib/stomp.rb, line 33
33: def initialize(login, passcode, host='localhost', port=61613, reliable=false, reconnectDelay=5)
34: @host = host
35: @port = port
36: @login = login
37: @passcode = passcode
38: @transmit_semaphore = Mutex.new
39: @read_semaphore = Mutex.new
40: @socket_semaphore = Mutex.new
41: @reliable = reliable
42: @reconnectDelay = reconnectDelay
43: @closed = FALSE
44: @subscriptions = {}
45: @failure = NIL
46: socket
47: end
# File lib/stomp.rb, line 26
26: def Connection.open(login = "", passcode = "", host='localhost', port=61613, reliable=FALSE, reconnectDelay=5)
27: Connection.new login, passcode, host, port, reliable, reconnectDelay
28: end
Receive a frame, block until the frame is received
# File lib/stomp.rb, line 156
156: def __old_receive
157: # The recive my fail so we may need to retry.
158: while TRUE
159: begin
160: s = socket
161: return _receive(s)
162: rescue
163: @failure = $!;
164: raise unless @reliable
165: $stderr.print "receive failed: " + $!;
166: end
167: end
168: end
Abort a transaction by name
# File lib/stomp.rb, line 106
106: def abort name, headers={}
107: headers[:transaction] = name
108: transmit "ABORT", headers
109: end
Acknowledge a message, used then a subscription has specified client acknowledgement ( connection.subscribe "/queue/a", :ack => ‘client‘g
Accepts a transaction header ( :transaction => ‘some_transaction_id’ )
# File lib/stomp.rb, line 94
94: def ack message_id, headers={}
95: headers['message-id'] = message_id
96: transmit "ACK", headers
97: end
Begin a transaction, requires a name for the transaction
# File lib/stomp.rb, line 85
85: def begin name, headers={}
86: headers[:transaction] = name
87: transmit "BEGIN", headers
88: end
Commit a transaction by name
# File lib/stomp.rb, line 100
100: def commit name, headers={}
101: headers[:transaction] = name
102: transmit "COMMIT", headers
103: end
Close this connection
# File lib/stomp.rb, line 142
142: def disconnect(headers = {})
143: transmit "DISCONNECT", headers
144: end
Return a pending message if one is available, otherwise return nil
# File lib/stomp.rb, line 148
148: def poll
149: @read_semaphore.synchronize do
150: return nil if @socket==NIL or !@socket.ready?
151: return receive
152: end
153: end
# File lib/stomp.rb, line 170
170: def receive
171: super_result = __old_receive()
172: if super_result.nil? && @reliable
173: $stderr.print "connection.receive returning EOF as nil - resetting connection.\n"
174: @socket = nil
175: super_result = __old_receive()
176: end
177: return super_result
178: end
Send message to destination
Accepts a transaction header ( :transaction => ‘some_transaction_id’ )
# File lib/stomp.rb, line 136
136: def send(destination, message, headers={})
137: headers[:destination] = destination
138: transmit "SEND", headers, message
139: end
# File lib/stomp.rb, line 49
49: def socket
50: # Need to look into why the following synchronize does not work.
51: #@read_semaphore.synchronize do
52: s = @socket;
53: while s == NIL or @failure != NIL
54: @failure = NIL
55: begin
56: s = TCPSocket.open @host, @port
57: _transmit(s, "CONNECT", {:login => @login, :passcode => @passcode})
58: @connect = _receive(s)
59: # replay any subscriptions.
60: @subscriptions.each { |k,v| _transmit(s, "SUBSCRIBE", v) }
61: rescue
62: @failure = $!;
63: s=NIL;
64: raise unless @reliable
65: $stderr.print "connect failed: " + $! +" will retry in #{@reconnectDelay}\n";
66: sleep(@reconnectDelay);
67: end
68: end
69: @socket = s
70: return s;
71: #end
72: end
Subscribe to a destination, must specify a name
# File lib/stomp.rb, line 112
112: def subscribe(name, headers = {}, subId=NIL)
113: headers[:destination] = name
114: transmit "SUBSCRIBE", headers
115:
116: # Store the sub so that we can replay if we reconnect.
117: if @reliable
118: subId = name if subId==NIL
119: @subscriptions[subId]=headers
120: end
121: end
Unsubscribe from a destination, must specify a name
# File lib/stomp.rb, line 124
124: def unsubscribe(name, headers = {}, subId=NIL)
125: headers[:destination] = name
126: transmit "UNSUBSCRIBE", headers
127: if @reliable
128: subId = name if subId==NIL
129: @subscriptions.delete(subId)
130: end
131: end
# File lib/stomp.rb, line 181
181: def _receive( s )
182: line = ' '
183: @read_semaphore.synchronize do
184: line = s.gets while line =~ /^\s*$/
185: return NIL if line == NIL
186: Message.new do |m|
187: m.command = line.chomp
188: m.headers = {}
189: until (line = s.gets.chomp) == ''
190: k = (line.strip[0, line.strip.index(':')]).strip
191: v = (line.strip[line.strip.index(':') + 1, line.strip.length]).strip
192: m.headers[k] = v
193: end
194:
195: if (m.headers['content-length'])
196: m.body = s.read m.headers['content-length'].to_i
197: c = RUBY_VERSION > '1.9' ? s.getc.ord : s.getc
198: raise "Invalid content length received" unless c == 0
199: else
200: m.body = ''
201: if RUBY_VERSION > '1.9'
202: until (c = s.getc.ord) == 0
203: m.body << c.chr
204: end
205: else
206: until (c = s.getc) == 0
207: m.body << c.chr
208: end
209: end
210: end
211: #c = s.getc
212: #raise "Invalid frame termination received" unless c == 10
213: end
214: end
215: end
# File lib/stomp.rb, line 234
234: def _transmit(s, command, headers={}, body='')
235: @transmit_semaphore.synchronize do
236: s.puts command
237: headers.each {|k,v| s.puts "#{k}:#{v}" }
238: s.puts "content-length: #{body.length}"
239: s.puts "content-type: text/plain; charset=UTF-8"
240: s.puts
241: s.write body
242: s.write "\0"
243: end
244: end
# File lib/stomp.rb, line 218
218: def transmit(command, headers={}, body='')
219: # The transmit my fail so we may need to retry.
220: while TRUE
221: begin
222: s = socket
223: _transmit(s, command, headers, body)
224: return
225: rescue
226: @failure = $!;
227: raise unless @reliable
228: $stderr.print "transmit failed: " + $!+"\n";
229: end
230: end
231: end