| Class | Jabber::Bytestreams::IBB |
| In: |
lib/xmpp4r/bytestreams/helper/ibb/base.rb
|
| Parent: | Object |
In-Band Bytestreams (JEP-0047) implementation
Don‘t use directly, use IBBInitiator and IBBTarget
In-Band Bytestreams should only be used when transferring very small amounts of binary data, because it is slow and increases server load drastically.
Note that the constructor takes a lot of arguments. In-Band Bytestreams do not specify a way to initiate the stream, this should be done via Stream Initiation.
| NS_IBB | = | 'http://jabber.org/protocol/ibb' |
Create a new bytestream
Will register a <message/> callback to intercept data of this stream. This data will be buffered, you can retrieve it with receive
# File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 26
26: def initialize(stream, session_id, my_jid, peer_jid)
27: @stream = stream
28: @session_id = session_id
29: @my_jid = (my_jid.kind_of?(String) ? JID.new(my_jid) : my_jid)
30: @peer_jid = (peer_jid.kind_of?(String) ? JID.new(peer_jid) : peer_jid)
31:
32: @active = false
33: @seq_send = 0
34: @seq_recv = 0
35: @queue = []
36: @queue_lock = Mutex.new
37: @pending = Mutex.new
38: @pending.lock
39: @sendbuf = ''
40: @sendbuf_lock = Mutex.new
41:
42: @block_size = 4096 # Recommended by JEP0047
43: end
Close the stream
Waits for acknowledge from peer, may throw ErrorException
# File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 125
125: def close
126: if active?
127: flush
128: deactivate
129:
130: iq = Iq.new(:set, @peer_jid)
131: close = iq.add REXML::Element.new('close')
132: close.add_namespace IBB::NS_IBB
133: close.attributes['sid'] = @session_id
134:
135: @stream.send_with_id(iq) { |answer|
136: answer.type == :result
137: }
138: end
139: end
Empty the send-buffer by sending remaining data
# File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 69
69: def flush
70: @sendbuf_lock.synchronize {
71: while @sendbuf.size > 0
72: send_data(@sendbuf[0..@block_size-1])
73: @sendbuf = @sendbuf[@block_size..-1].to_s
74: end
75: }
76: end
Receive data
Will wait until the Message with the next sequence number is in the stanza queue.
# File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 83
83: def read
84: if active?
85: res = nil
86:
87: while res.nil?
88: @queue_lock.synchronize {
89: @queue.each { |item|
90: # Find next data
91: if item.type == :data and item.seq == @seq_recv.to_s
92: res = item
93: break
94: # No data? Find close
95: elsif item.type == :close and res.nil?
96: res = item
97: end
98: }
99:
100: @queue.delete_if { |item| item == res }
101: }
102:
103: # No data? Wait for next to arrive...
104: @pending.lock unless res
105: end
106:
107: if res.type == :data
108: @seq_recv += 1
109: @seq_recv = 0 if @seq_recv > 65535
110: res.data
111: elsif res.type == :close
112: deactivate
113: nil # Closed
114: end
115: else
116: nil
117: end
118: end
Send data
Data is buffered to match block_size in each packet. If you need the data to be sent immediately, use flush afterwards.
| buf: | [String] |
# File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 56
56: def write(buf)
57: @sendbuf_lock.synchronize {
58: @sendbuf += buf
59:
60: while @sendbuf.size >= @block_size
61: send_data(@sendbuf[0..@block_size-1])
62: @sendbuf = @sendbuf[@block_size..-1].to_s
63: end
64: }
65: end
# File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 179
179: def activate
180: unless active?
181: @stream.add_message_callback(200, self) { |msg|
182: data = msg.first_element('data')
183: if msg.from == @peer_jid and msg.to == @my_jid and data and data.attributes['sid'] == @session_id
184: if msg.type == nil
185: @queue_lock.synchronize {
186: @queue.push IBBQueueItem.new(:data, data.attributes['seq'], data.text.to_s)
187: @pending.unlock
188: }
189: elsif msg.type == :error
190: @queue_lock.synchronize {
191: @queue << IBBQueueItem.new(:close)
192: @pending.unlock
193: }
194: end
195: true
196: else
197: false
198: end
199: }
200:
201: @stream.add_iq_callback(200, self) { |iq|
202: close = iq.first_element('close')
203: if iq.type == :set and close and close.attributes['sid'] == @session_id
204: answer = iq.answer(false)
205: answer.type = :result
206: @stream.send(answer)
207:
208: @queue_lock.synchronize {
209: @queue << IBBQueueItem.new(:close)
210: @pending.unlock
211: }
212: true
213: else
214: false
215: end
216: }
217:
218: @active = true
219: end
220: end
# File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 222
222: def deactivate
223: if active?
224: @stream.delete_message_callback(self)
225: @stream.delete_iq_callback(self)
226:
227: @active = false
228: end
229: end
Send data directly
| data: | [String] |
# File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 146
146: def send_data(databuf)
147: if active?
148: msg = Message.new
149: msg.from = @my_jid
150: msg.to = @peer_jid
151:
152: data = msg.add REXML::Element.new('data')
153: data.add_namespace NS_IBB
154: data.attributes['sid'] = @session_id
155: data.attributes['seq'] = @seq_send.to_s
156: data.text = Base64::encode64 databuf
157:
158: # TODO: Implement AMP correctly
159: amp = msg.add REXML::Element.new('amp')
160: amp.add_namespace 'http://jabber.org/protocol/amp'
161: deliver_at = amp.add REXML::Element.new('rule')
162: deliver_at.attributes['condition'] = 'deliver-at'
163: deliver_at.attributes['value'] = 'stored'
164: deliver_at.attributes['action'] = 'error'
165: match_resource = amp.add REXML::Element.new('rule')
166: match_resource.attributes['condition'] = 'match-resource'
167: match_resource.attributes['value'] = 'exact'
168: match_resource.attributes['action'] = 'error'
169:
170: @stream.send(msg)
171:
172: @seq_send += 1
173: @seq_send = 0 if @seq_send > 65535
174: else
175: raise 'Attempt to send data when not activated'
176: end
177: end