python - Multiple workers consume same message from RabbitMQ queue -
I use the py-amqp module and Python 3.4 when I run more than 1 listener and message to a constructor The listener starts to publish the message and to start this process simultaniously I do not need that kind of behavior because the messages should be written only once in DB. Therefore, the fastest employees write messages to DB and all other workers say that the message already exists.
Producer:
Import Import import from import from json import import randomly. Import settings RMQ_PASSWORD, RMQ_USER, RMQ_HOST, RMQ_EXCHANGE def main (): conn = amqp.Connection (RMQ_HOST, RMQ_USER, RMQ_PASSWORD, SSL = False) ch = conn.channel () ch.exchange_declare (RMQ_EXCHANGE, 'fanout') req = {"Request": {"transaction_number": random.randint (100000, 9999999999)}} message = json.dumps (req) msg = amqp.message (message) ch.basic_publish (msg, RMQ_EXCHANGE) ch.close ( ) conn.close () if __name__ == '__main__': in range (100) for x: main () worker: From the pipeline import pipeline to fun control import, lent settings Import DB_CONNECTION_STRING, RMQ_EXCHANGE, RMQ_HOST, RMQ_PASSWORD, RMQ_USER import amqp DB = dal.DAL (DB_CONNECTION_STRING) message_processor = Pipeline.pplline (db) def callback (channel, msg): channel.basic_ack (msg.delivery_tag) message_processor.process msg) if msg.body == 'left': channel.basic_cancel (msg.consumer_tag) def main () : Conn = amqp.Connection (RMQ_HOST, RMQ_USER, RMQ_PASSWORD, SSL = False) ch = conn.channel () ch Exchange _ Dislayer (RMQXNNNTNAGN, FANAPANG) qname, _, _ = ch.queue_declare () ch.queue_bind (qname, rmq_EXCHANGE ch.basic_consume (qname, callback = partial (callback, ca), while ch.callbacks: ch.wait ()
also:
User @ rbitmq: ~ $ pseudo rabbitemcontell list_band Listings Binding ... Exchange AMKJen - CRTGFSLE6GWLLRV7PPUQEMMAG- CRTJFSLU6GLLVW7PQ [] ExchangeMcGen-1X3WVGF5OKN_GCNFPJKFG QIMMJen-1X3VVGF5OKN_GCNFPJKFG [] ... exchange amq.gen-yf8ieG1AK9x83Vz4GBj-ZA qi amq.gen-yf8ieG1AK9x83Vz4GBj-ZA [] Exchange AntiSpy.test line entryEttest [] Antipee Exchange Ent Piktest queue [] Ajhjhe exchange Amakkjen - Siartijefseli 6 Jivielelarwu 7 pp Q [] Ajhjhe Exchange Exchange .gen-1X3vwGF5OKn_gcnofpJKFg line [] ... Azaza exchange amq.gen-yf8ieG1AK9x83Vz4GBj-ZA Queue [] Azza exchange entryapi.test queue [] ... done.
I think you use the wrong type set up for your use case are doing. You have a publisher published in one currency and you want to read the messages and want to write them in DB. You want to write DB with many consumers doing this so that you can increase the throughput. Fanout will result in a lot of queue for repeating the exchange message and the dubbing of DB to many consumers of the same data. You need to use the 'work queue' Each exchange will exchange a default (no type, or direct exchange with all messages using the same routing key). All messages sent to the exchange will be sent in a single quote. html>
Comments
Post a Comment