| Class | Bunny::Exchange09 |
| In: |
lib/bunny/exchange09.rb
|
| Parent: | Object |
Exchanges are the routing and distribution hub of AMQP. All messages that Bunny sends to an AMQP broker/server have to pass through an exchange in order to be routed to a destination queue. The AMQP specification defines the types of exchange that you can create.
At the time of writing there are four (4) types of exchange defined -
AMQP-compliant brokers/servers are required to provide default exchanges for the direct and fanout exchange types. All default exchanges are prefixed with ‘amq.’, for example -
If you want more information about exchanges, please consult the documentation for your target broker/server or visit the AMQP website to find the version of the specification that applies to your target broker/server.
| client | [R] | |
| key | [R] | |
| name | [R] | |
| opts | [R] | |
| type | [R] |
# File lib/bunny/exchange09.rb, line 36
36: def initialize(client, name, opts = {})
37: # check connection to server
38: raise Bunny::ConnectionError, 'Not connected to server' if client.status == :not_connected
39:
40: @client, @name, @opts = client, name, opts
41:
42: # set up the exchange type catering for default names
43: if name.match(/^amq\./)
44: new_type = name.sub(/amq\./, '')
45: # handle 'amq.match' default
46: new_type = 'headers' if new_type == 'match'
47: @type = new_type.to_sym
48: else
49: @type = opts[:type] || :direct
50: end
51:
52: @key = opts[:key]
53: @client.exchanges[@name] ||= self
54:
55: # ignore the :nowait option if passed, otherwise program will hang waiting for a
56: # response that will not be sent by the server
57: opts.delete(:nowait)
58:
59: unless name == "amq.#{type}" or name == ''
60: client.send_frame(
61: Qrack::Protocol09::Exchange::Declare.new(
62: { :exchange => name, :type => type, :nowait => false,
63: :deprecated_ticket => 0, :deprecated_auto_delete => false, :deprecated_internal => false }.merge(opts)
64: )
65: )
66:
67: method = client.next_method
68:
69: client.check_response(method, Qrack::Protocol09::Exchange::DeclareOk,
70: "Error declaring exchange #{name}: type = #{type}")
71:
72: end
73: end
Requests that an exchange is deleted from broker/server. Removes reference from exchanges if successful. If an error occurs raises Bunny::ProtocolError.
:delete_ok if successful
# File lib/bunny/exchange09.rb, line 94
94: def delete(opts = {})
95: # ignore the :nowait option if passed, otherwise program will hang waiting for a
96: # response that will not be sent by the server
97: opts.delete(:nowait)
98:
99: client.send_frame(
100: Qrack::Protocol09::Exchange::Delete.new({ :exchange => name, :nowait => false, :deprecated_ticket => 0 }.merge(opts))
101: )
102:
103: method = client.next_method
104:
105: client.check_response(method, Qrack::Protocol09::Exchange::DeleteOk,
106: "Error deleting exchange #{name}")
107:
108: client.exchanges.delete(name)
109:
110: # return confirmation
111: :delete_ok
112: end
Publishes a message to a specific exchange. The message will be routed to queues as defined by the exchange configuration and distributed to any active consumers when the transaction, if any, is committed.
nil
# File lib/bunny/exchange09.rb, line 144
144: def publish(data, opts = {})
145: opts = opts.dup
146: out = []
147:
148: # Set up options
149: routing_key = opts.delete(:key) || key
150: mandatory = opts.delete(:mandatory)
151: immediate = opts.delete(:immediate)
152: delivery_mode = opts.delete(:persistent) ? 2 : 1
153:
154: out << Qrack::Protocol09::Basic::Publish.new(
155: { :exchange => name,
156: :routing_key => routing_key,
157: :mandatory => mandatory,
158: :immediate => immediate,
159: :deprecated_ticket => 0 }
160: )
161: data = data.to_s
162: out << Qrack::Protocol09::Header.new(
163: Qrack::Protocol09::Basic,
164: data.length, {
165: :content_type => 'application/octet-stream',
166: :delivery_mode => delivery_mode,
167: :priority => 0
168: }.merge(opts)
169: )
170: out << Qrack::Transport09::Body.new(data)
171:
172: client.send_frame(*out)
173: end