| Class | Jabber::Stream |
| In: |
lib/xmpp4r/stream.rb
|
| Parent: | Object |
The stream class manages a connection stream (a file descriptor using which XML messages are read and sent)
| DISCONNECTED | = | 1 |
| CONNECTED | = | 2 |
| fd | [R] | file descriptor used |
| status | [R] | connection status |
Create a new stream (just initializes)
# File lib/xmpp4r/stream.rb, line 34
34: def initialize(threaded = true)
35: @fd = nil
36: @status = DISCONNECTED
37: @xmlcbs = CallbackList::new
38: @stanzacbs = CallbackList::new
39: @messagecbs = CallbackList::new
40: @iqcbs = CallbackList::new
41: @presencecbs = CallbackList::new
42: unless threaded
43: $stderr.puts "Non-threaded mode is currently broken, re-enabling threaded"
44: threaded = true
45: end
46: @threaded = threaded
47: @stanzaqueue = []
48: @stanzaqueue_lock = Mutex::new
49: @exception_block = nil
50: @threadblocks = []
51: # @pollCounter = 10
52: @waiting_thread = nil
53: @wakeup_thread = nil
54: @streamid = nil
55: @features_lock = Mutex.new
56: end
Adds a callback block to process received Iqs
| priority: | [Integer] The callback‘s priority, the higher, the sooner |
| ref: | [String] The callback‘s reference |
| &block: | [Block] The optional block |
# File lib/xmpp4r/stream.rb, line 518
518: def add_iq_callback(priority = 0, ref = nil, &block)
519: @iqcbs.add(priority, ref, block)
520: end
Adds a callback block to process received Messages
| priority: | [Integer] The callback‘s priority, the higher, the sooner |
| ref: | [String] The callback‘s reference |
| &block: | [Block] The optional block |
# File lib/xmpp4r/stream.rb, line 464
464: def add_message_callback(priority = 0, ref = nil, &block)
465: @messagecbs.add(priority, ref, block)
466: end
Adds a callback block to process received Presences
| priority: | [Integer] The callback‘s priority, the higher, the sooner |
| ref: | [String] The callback‘s reference |
| &block: | [Block] The optional block |
# File lib/xmpp4r/stream.rb, line 500
500: def add_presence_callback(priority = 0, ref = nil, &block)
501: @presencecbs.add(priority, ref, block)
502: end
Adds a callback block to process received Stanzas
| priority: | [Integer] The callback‘s priority, the higher, the sooner |
| ref: | [String] The callback‘s reference |
| &block: | [Block] The optional block |
# File lib/xmpp4r/stream.rb, line 482
482: def add_stanza_callback(priority = 0, ref = nil, &block)
483: @stanzacbs.add(priority, ref, block)
484: end
Adds a callback block to process received XML messages
| priority: | [Integer] The callback‘s priority, the higher, the sooner |
| ref: | [String] The callback‘s reference |
| &block: | [Block] The optional block |
# File lib/xmpp4r/stream.rb, line 446
446: def add_xml_callback(priority = 0, ref = nil, &block)
447: @xmlcbs.add(priority, ref, block)
448: end
# File lib/xmpp4r/stream.rb, line 536
536: def close!
537: @parserThread.kill if @parserThread
538: # @pollThread.kill
539: @fd.close if @fd and !@fd.closed?
540: @status = DISCONNECTED
541: end
Delete a Stanza callback
| ref: | [String] The reference of the callback to delete |
# File lib/xmpp4r/stream.rb, line 490
490: def delete_stanza_callback(ref)
491: @stanzacbs.delete(ref)
492: end
Delete an XML-messages callback
| ref: | [String] The reference of the callback to delete |
# File lib/xmpp4r/stream.rb, line 454
454: def delete_xml_callback(ref)
455: @xmlcbs.delete(ref)
456: end
Returns if this connection is connected to a Jabber service
| return: | [Boolean] Connection status |
# File lib/xmpp4r/stream.rb, line 150
150: def is_connected?
151: return @status == CONNECTED
152: end
Returns if this connection is NOT connected to a Jabber service
| return: | [Boolean] Connection status |
# File lib/xmpp4r/stream.rb, line 158
158: def is_disconnected?
159: return @status == DISCONNECTED
160: end
Mounts a block to handle exceptions if they occur during the poll send. This will likely be the first indication that the socket dropped in a Jabber Session.
The block has to take three arguments:
# File lib/xmpp4r/stream.rb, line 108
108: def on_exception(&block)
109: @exception_block = block
110: end
This method is called by the parser when a failure occurs
# File lib/xmpp4r/stream.rb, line 114
114: def parse_failure(e)
115: Jabber::debuglog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}")
116:
117: # A new thread has to be created because close will cause the thread
118: # to commit suicide(???)
119: if @exception_block
120: # New thread, because close will kill the current thread
121: Thread.new {
122: close
123: @exception_block.call(e, self, :parser)
124: }
125: else
126: puts "Stream#parse_failure was called by XML parser. Dumping " +
127: "backtrace...\n" + e.exception + "\n"
128: puts e.backtrace
129: close
130: raise
131: end
132: end
This method is called by the parser upon receiving </stream:stream>
# File lib/xmpp4r/stream.rb, line 136
136: def parser_end
137: if @exception_block
138: Thread.new {
139: close
140: @exception_block.call(nil, self, :close)
141: }
142: else
143: close
144: end
145: end
Starts a polling thread to send "keep alive" data to prevent the Jabber connection from closing for inactivity.
Currently not working!
# File lib/xmpp4r/stream.rb, line 424
424: def poll
425: sleep 10
426: while true
427: sleep 2
428: # @pollCounter = @pollCounter - 1
429: # if @pollCounter < 0
430: # begin
431: # send(" \t ")
432: # rescue
433: # Thread.new {@exception_block.call if @exception_block}
434: # break
435: # end
436: # end
437: end
438: end
Process |max| XML stanzas and call listeners for all of them.
| max: | [Integer] the number of stanzas to process (nil means process |
all available)
# File lib/xmpp4r/stream.rb, line 281
281: def process(max = nil)
282: n = 0
283: @stanzaqueue_lock.lock
284: while @stanzaqueue.size > 0 and (max == nil or n < max)
285: e = @stanzaqueue.shift
286: @stanzaqueue_lock.unlock
287: process_one(e)
288: n += 1
289: @stanzaqueue_lock.lock
290: end
291: @stanzaqueue_lock.unlock
292: n
293: end
Processes a received REXML::Element and executes registered thread blocks and filters against it.
If in threaded mode, a new thread will be spawned for the call to receive_nonthreaded.
| element: | [REXML::Element] The received element |
# File lib/xmpp4r/stream.rb, line 169
169: def receive(element)
170: if @threaded
171: # Don't spawn a new thread here. An implicit feature
172: # of XMPP is constant order of stanzas.
173: receive_nonthreaded(element)
174: else
175: receive_nonthreaded(element)
176: end
177: end
Sends XML data to the socket and (optionally) waits to process received data.
| xml: | [String] The xml data to send |
| &block: | [Block] The optional block |
# File lib/xmpp4r/stream.rb, line 357
357: def send(xml, &block)
358: Jabber::debuglog("SENDING:\n#{xml}")
359: @threadblocks.unshift(ThreadBlock.new(block)) if block
360: Thread.critical = true # we don't want to be interupted before we stop!
361: begin
362: @fd << xml.to_s
363: @fd.flush
364: rescue Exception => e
365: Jabber::debuglog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}")
366:
367: if @exception_block
368: Thread.new { close!; @exception_block.call(e, self, :sending) }
369: else
370: puts "Exception caught while sending!"
371: close!
372: raise
373: end
374: end
375: Thread.critical = false
376: # The parser thread might be running this (think of a callback running send())
377: # If this is the case, we mustn't stop (or we would cause a deadlock)
378: Thread.stop if block and Thread.current != @parserThread
379: @pollCounter = 10
380: end
Send an XMMP stanza with an Jabber::XMLStanza#id. The id will be generated by Jabber::IdGenerator if not already set.
The block will be called once: when receiving a stanza with the same Jabber::XMLStanza#id. It must return true to complete this!
Be aware that if a stanza with type=‘error‘ is received the function does not yield but raises an ErrorException with the corresponding error element.
Please read the note about nesting at Stream#send
| xml: | [XMLStanza] |
# File lib/xmpp4r/stream.rb, line 395
395: def send_with_id(xml, &block)
396: if xml.id.nil?
397: xml.id = Jabber::IdGenerator.instance.generate_id
398: end
399:
400: error = nil
401: send(xml) do |received|
402: if received.kind_of? XMLStanza and received.id == xml.id
403: if received.type == :error
404: error = (received.error ? received.error : Error.new)
405: true
406: else
407: yield(received)
408: end
409: else
410: false
411: end
412: end
413:
414: unless error.nil?
415: raise ErrorException.new(error)
416: end
417: end
Start the XML parser on the fd
# File lib/xmpp4r/stream.rb, line 60
60: def start(fd)
61: @stream_mechanisms = []
62: @stream_features = {}
63:
64: @fd = fd
65: @parser = StreamParser.new(@fd, self)
66: @parserThread = Thread.new do
67: begin
68: @parser.parse
69: rescue Exception => e
70: Jabber::debuglog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}")
71:
72: if @exception_block
73: Thread.new { close; @exception_block.call(e, self, :start) }
74: else
75: puts "Exception caught in Parser thread!"
76: close
77: raise
78: end
79: end
80: end
81: # @pollThread = Thread.new do
82: # begin
83: # poll
84: # rescue
85: # puts "Exception caught in Poll thread, dumping backtrace and" +
86: # " exiting...\n" + $!.exception + "\n"
87: # puts $!.backtrace
88: # exit
89: # end
90: # end
91: @status = CONNECTED
92: end
Process an XML stanza and call the listeners for it. If no stanza is currently available, wait for max |time| seconds before returning.
| time: | [Integer] time to wait in seconds. If nil, wait infinitely. |
all available)
# File lib/xmpp4r/stream.rb, line 301
301: def wait_and_process(time = nil)
302: if time == 0
303: return process(1)
304: end
305: @stanzaqueue_lock.lock
306: if @stanzaqueue.size > 0
307: e = @stanzaqueue.shift
308: @stanzaqueue_lock.unlock
309: process_one(e)
310: return 1
311: end
312:
313: @waiting_thread = Thread.current
314: @wakeup_thread = Thread.new { sleep time ; @waiting_thread.wakeup if @waiting_thread }
315: @waiting_thread.stop
316: @wakeup_thread.kill if @wakeup_thread
317: @wakeup_thread = nil
318: @waiting_thread = nil
319:
320: @stanzaqueue_lock.lock
321: if @stanzaqueue.size > 0
322: e = @stanzaqueue.shift
323: @stanzaqueue_lock.unlock
324: process_one(e)
325: return 1
326: end
327: return 0
328: end
Process |element| until it is consumed. Returns element.consumed? element The element to process
# File lib/xmpp4r/stream.rb, line 261
261: def process_one(stanza)
262: Jabber::debuglog("PROCESSING:\n#{stanza.to_s}")
263: return true if @xmlcbs.process(stanza)
264: return true if @stanzacbs.process(stanza)
265: case stanza
266: when Message
267: return true if @messagecbs.process(stanza)
268: when Iq
269: return true if @iqcbs.process(stanza)
270: when Presence
271: return true if @presencecbs.process(stanza)
272: end
273: end
# File lib/xmpp4r/stream.rb, line 179
179: def receive_nonthreaded(element)
180: Jabber::debuglog("RECEIVED:\n#{element.to_s}")
181: case element.prefix
182: when 'stream'
183: case element.name
184: when 'stream'
185: stanza = element
186: @streamid = element.attributes['id']
187: unless element.attributes['version'] # isn't XMPP compliant, so
188: Jabber::debuglog("FEATURES: server not XMPP compliant, will not wait for features")
189: @features_lock.unlock # don't wait for <stream:features/>
190: end
191: when 'features'
192: stanza = element
193: element.each { |e|
194: if e.name == 'mechanisms' and e.namespace == 'urn:ietf:params:xml:ns:xmpp-sasl'
195: e.each_element('mechanism') { |mech|
196: @stream_mechanisms.push(mech.text)
197: }
198: else
199: @stream_features[e.name] = e.namespace
200: end
201: }
202: Jabber::debuglog("FEATURES: received")
203: @features_lock.unlock
204: else
205: stanza = element
206: end
207: else
208: case element.name
209: when 'message'
210: stanza = Message::import(element)
211: when 'iq'
212: stanza = Iq::import(element)
213: when 'presence'
214: stanza = Presence::import(element)
215: else
216: stanza = element
217: end
218: end
219:
220: # Iterate through blocked threads (= waiting for an answer)
221: #
222: # We're dup'ping the @threadblocks here, so that we won't end up in an
223: # endless loop if Stream#send is being nested. That means, the nested
224: # threadblock won't receive the stanza currently processed, but the next
225: # one.
226: threadblocks = @threadblocks.dup
227: threadblocks.each { |threadblock|
228: exception = nil
229: r = false
230: begin
231: r = threadblock.call(stanza)
232: rescue Exception => e
233: exception = e
234: end
235:
236: if r == true
237: @threadblocks.delete(threadblock)
238: threadblock.wakeup
239: return
240: elsif exception
241: @threadblocks.delete(threadblock)
242: threadblock.raise(exception)
243: end
244: }
245:
246: if @threaded
247: process_one(stanza)
248: else
249: # stanzaqueue will be read when the user call process
250: @stanzaqueue_lock.lock
251: @stanzaqueue.push(stanza)
252: @stanzaqueue_lock.unlock
253: @waiting_thread.wakeup if @waiting_thread
254: end
255: end