| Class | Bunny::Exchange |
| In: |
lib/bunny/exchange08.rb
|
| Parent: | Object |
Exchanges are the routing and distribution hub of AMQP. All messages that Bunny sends to an AMQP broker/server have to pass through an exchange in order to be routed to a destination queue. The AMQP specification defines the types of exchange that you can create.
At the time of writing there are four (4) types of exchange defined -
AMQP-compliant brokers/servers are required to provide default exchanges for the direct and fanout exchange types. All default exchanges are prefixed with ‘amq.’, for example -
If you want more information about exchanges, please consult the documentation for your target broker/server or visit the AMQP website to find the version of the specification that applies to your target broker/server.
| client | [R] | |
| key | [R] | |
| name | [R] | |
| opts | [R] | |
| type | [R] |
# File lib/bunny/exchange08.rb, line 36
36: def initialize(client, name, opts = {})
37: # check connection to server
38: raise Bunny::ConnectionError, 'Not connected to server' if client.status == :not_connected
39:
40: @client, @name, @opts = client, name, opts
41:
42: # set up the exchange type catering for default names
43: if name.match(/^amq\./)
44: new_type = name.sub(/amq\./, '')
45: # handle 'amq.match' default
46: new_type = 'headers' if new_type == 'match'
47: @type = new_type.to_sym
48: else
49: @type = opts[:type] || :direct
50: end
51:
52: @key = opts[:key]
53: @client.exchanges[@name] ||= self
54:
55: # ignore the :nowait option if passed, otherwise program will hang waiting for a
56: # response that will not be sent by the server
57: opts.delete(:nowait)
58:
59: unless name == "amq.#{type}" or name == ''
60: client.send_frame(
61: Qrack::Protocol::Exchange::Declare.new(
62: { :exchange => name, :type => type, :nowait => false }.merge(opts)
63: )
64: )
65:
66: method = client.next_method
67:
68: client.check_response(method, Qrack::Protocol::Exchange::DeclareOk,
69: "Error declaring exchange #{name}: type = #{type}")
70:
71: end
72: end
Requests that an exchange is deleted from broker/server. Removes reference from exchanges if successful. If an error occurs raises Bunny::ProtocolError.
:delete_ok if successful
# File lib/bunny/exchange08.rb, line 93
93: def delete(opts = {})
94: # ignore the :nowait option if passed, otherwise program will hang waiting for a
95: # response that will not be sent by the server
96: opts.delete(:nowait)
97:
98: client.send_frame(
99: Qrack::Protocol::Exchange::Delete.new({ :exchange => name, :nowait => false }.merge(opts))
100: )
101:
102: method = client.next_method
103:
104: client.check_response(method, Qrack::Protocol::Exchange::DeleteOk,
105: "Error deleting exchange #{name}")
106:
107: client.exchanges.delete(name)
108:
109: # return confirmation
110: :delete_ok
111: end
Publishes a message to a specific exchange. The message will be routed to queues as defined by the exchange configuration and distributed to any active consumers when the transaction, if any, is committed.
nil
# File lib/bunny/exchange08.rb, line 143
143: def publish(data, opts = {})
144: opts = opts.dup
145: out = []
146:
147: # Set up options
148: routing_key = opts.delete(:key) || key
149: mandatory = opts.delete(:mandatory)
150: immediate = opts.delete(:immediate)
151: delivery_mode = opts.delete(:persistent) ? 2 : 1
152:
153: out << Qrack::Protocol::Basic::Publish.new(
154: { :exchange => name,
155: :routing_key => routing_key,
156: :mandatory => mandatory,
157: :immediate => immediate }
158: )
159: data = data.to_s
160: out << Qrack::Protocol::Header.new(
161: Qrack::Protocol::Basic,
162: data.length, {
163: :content_type => 'application/octet-stream',
164: :delivery_mode => delivery_mode,
165: :priority => 0
166: }.merge(opts)
167: )
168: out << Qrack::Transport::Body.new(data)
169:
170: client.send_frame(*out)
171: end