Rails:如何侦听/从服务或队列中提取消息? [英] Rails: How to listen to / pull from service or queue?
问题描述
大多数Rails应用程序的工作方式是等待客户端发出的请求,然后再发挥作用. 但是,如果我想将Rails应用程序用作微服务架构的一部分(例如)并进行一些异步通信(Serivce A将事件发送到Kafka或RabbitMQ队列中,而Service B-我的Rails应用程序-应该侦听此队列),如何调整/启动Rails应用以立即侦听队列并从那里触发事件? (意味着初始触发不是来自客户端,而是来自应用程序本身.)
Most Rails applications work in a way that they are waiting for requests comming from a client and then do their magic. But if I want to use a Rails application as part of a microservice architecture (for example) with some asychonious communication (Serivce A sends an event into a Kafka or RabbitMQ queue and Service B - my Rails app - is supposed to listen to this queue), how can I tune/start the Rails app to immediately listen to a queue and being triggered by event from there? (Meaning the initial trigger is not comming from a client, but from the App itself.)
感谢您的建议!
推荐答案
我刚刚在应用程序中设置RabbitMQ消息传递,并将在第二天左右为分离的(多个分布式)应用程序实现.我发现这篇文章非常有帮助(而 RabbitMQ教程).以下所有代码均适用于RabbitMQ,并假定您已在本地计算机上启动并运行RabbitMQ服务器.
I just set up RabbitMQ messaging within my application and will be implementing for decoupled (multiple, distributed) applications in the next day or so. I found this article very helpful (and the RabbitMQ tutorials, too). All the code below is for RabbitMQ and assumes you have a RabbitMQ server up and running on your local machine.
到目前为止,这是我的工作-对我有用:
Here's what I have so far - that's working for me:
#Gemfile
gem 'bunny'
gem 'sneakers'
我有一个Publisher
发送到队列:
# app/agents/messaging/publisher.rb
module Messaging
class Publisher
class << self
def publish(args)
connection = Bunny.new
connection.start
channel = connection.create_channel
queue_name = "#{args.keys.first.to_s.pluralize}_queue"
queue = channel.queue(queue_name, durable: true)
channel.default_exchange.publish(args[args.keys.first].to_json, :routing_key => queue.name)
puts "in #{self}.#{__method__}, [x] Sent #{args}!"
connection.close
end
end
end
end
我这样使用:
Messaging::Publisher.publish(event: {... event details...})
然后我有我的听众":
# app/agents/messaging/events_queue_receiver.rb
require_dependency "#{Rails.root.join('app','agents','messaging','events_agent')}"
module Messaging
class EventsQueueReceiver
include Sneakers::Worker
from_queue :events_queue, env: nil
def work(msg)
logger.info msg
response = Messaging::EventsAgent.distribute(JSON.parse(msg).with_indifferent_access)
ack! if response[:success]
end
end
end
侦听器"将消息发送到Messaging::EventsAgent.distribute
,如下所示:
The 'listener' sends the message to Messaging::EventsAgent.distribute
, which is like this:
# app/agents/messaging/events_agent.rb
require_dependency #{Rails.root.join('app','agents','fsm','state_assignment_agent')}"
module Messaging
class EventsAgent
EVENT_HANDLERS = {
enroll_in_program: ["FSM::StateAssignmentAgent"]
}
class << self
def publish(event)
Messaging::Publisher.publish(event: event)
end
def distribute(event)
puts "in #{self}.#{__method__}, message"
if event[:handler]
puts "in #{self}.#{__method__}, event[:handler: #{event[:handler}"
event[:handler].constantize.handle_event(event)
else
event_name = event[:event_name].to_sym
EVENT_HANDLERS[event_name].each do |handler|
event[:handler] = handler
publish(event)
end
end
return {success: true}
end
end
end
end
按照Codetunes上的说明进行操作,
Following the instructions on Codetunes, I have:
# Rakefile
# Add your own tasks in files placed in lib/tasks ending in .rake,
# for example lib/tasks/capistrano.rake, and they will automatically be available to Rake.
require File.expand_path('../config/application', __FILE__)
require 'sneakers/tasks'
Rails.application.load_tasks
并且:
# app/config/sneakers.rb
Sneakers.configure({})
Sneakers.logger.level = Logger::INFO # the default DEBUG is too noisy
我打开两个控制台窗口.首先,我说(让我的监听器运行):
I open two console windows. In the first, I say (to get my listener running):
$ WORKERS=Messaging::EventsQueueReceiver rake sneakers:run
... a bunch of start up info
2016-03-18T14:16:42Z p-5877 t-14d03e INFO: Heartbeat interval used (in seconds): 2
2016-03-18T14:16:42Z p-5899 t-14d03e INFO: Heartbeat interval used (in seconds): 2
2016-03-18T14:16:42Z p-5922 t-14d03e INFO: Heartbeat interval used (in seconds): 2
2016-03-18T14:16:42Z p-5944 t-14d03e INFO: Heartbeat interval used (in seconds): 2
第二,我说:
$ rails s --sandbox
2.1.2 :001 > Messaging::Publisher.publish({:event=>{:event_name=>"enroll_in_program", :program_system_name=>"aha_chh", :person_id=>1}})
in Messaging::Publisher.publish, [x] Sent {:event=>{:event_name=>"enroll_in_program", :program_system_name=>"aha_chh", :person_id=>1}}!
=> :closed
然后,在我的第一个窗口中,我看到:
Then, back in my first window, I see:
2016-03-18T14:17:44Z p-5877 t-19nfxy INFO: {"event_name":"enroll_in_program","program_system_name":"aha_chh","person_id":1}
in Messaging::EventsAgent.distribute, message
in Messaging::EventsAgent.distribute, event[:handler]: FSM::StateAssignmentAgent
在我的RabbitMQ服务器中,我看到:
And in my RabbitMQ server, I see:
这是一个非常小的设置,我相信未来几天我会学到更多的东西.
It's a pretty minimal setup and I'm sure I'll be learning a lot more in coming days.
祝你好运!
这篇关于Rails:如何侦听/从服务或队列中提取消息?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!