如何在龙卷风上使用 kafka? [英] How to use kafka on tornado?

查看:21
本文介绍了如何在龙卷风上使用 kafka?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用基于 this

但我也想使用 kafka 来存储消息.我怎样才能做到这一点?

现在,我使用 this 来制作消费者,不知何故它正在工作,但它只是在控制台上打印我需要在网页上显示消息,就像龙卷风应用程序一样,只有它保存在 kafka 中.

这是我目前的 app.py 代码

#!/usr/bin/env python## 版权所有 2009 Facebook## 根据 Apache 许可,2.0 版(许可")获得许可;你可以# 除非符合许可证,否则不得使用此文件.您可以获得# 许可证副本位于## http://www.apache.org/licenses/LICENSE-2.0## 除非适用法律要求或书面同意,否则软件# 在许可证下分发是按原样"分发的,没有# 任何类型的保证或条件,无论是明示的还是暗示的.见# 管理权限和限制的特定语言的许可证# 根据许可证.导入日志导入 tornado.escape导入 tornado.ioloop导入 tornado.web导入 os.path导入 uuidfrom tornado.concurrent import Future从龙卷风进口发电机从 tornado.options 导入定义、选项、parse_command_line从 pykafka 导入 KafkaClient定义(端口",默认值=8888,帮助=在给定端口上运行",类型=整数)定义(调试",默认=假,帮助=在调试模式下运行")类消息缓冲区(对象):def __init__(self):self.waiters = set()self.cache = []self.cache_size = 200def wait_for_messages(self, cursor=None):# 构造一个 Future 返回给我们的调用者.这允许# wait_for_messages 将从协程中产生,即使# 它本身不是协程.我们将设置结果# 当结果可用时的未来.result_future = 未来()如果光标:新计数 = 0对于反向(self.cache)中的味精:如果 msg["id"] == 游标:休息新计数 += 1如果新计数:result_future.set_result(self.cache[-new_count:])返回 result_futureself.waiters.add(result_future)返回 result_futuredef cancel_wait(self, future):self.waiters.remove(未来)# 设置一个空结果以解除任何等待的协程.future.set_result([])def new_messages(self, messages):logging.info("正在向 %r 个听众发送新消息", len(self.waiters))对于 self.waiters 的未来:future.set_result(消息)self.waiters = set()self.cache.extend(消息)如果 len(self.cache) >self.cache_size:self.cache = self.cache[-self.cache_size:]客户端 = KafkaClient(hosts="127.0.0.1:9092")topic = client.topics['test']消费者 = topic.get_simple_consumer()对于消费者中的消息:如果消息不是无:打印 message.value# 使其成为非单例留给读者作为练习.global_message_buffer = MessageBuffer()类 MainHandler(tornado.web.RequestHandler):定义获取(自我):self.render("index.html", messages=global_message_buffer.cache)类 MessageNewHandler(tornado.web.RequestHandler):def post(self):消息 = {"id": str(uuid.uuid4()),"body": self.get_argument("body"),}# to_basestring 是 Python 3 的 json 编码器所必需的,# 不接受字节字符串.message["html"] = tornado.escape.to_basestring(self.render_string("message.html", message=message))if self.get_argument("next", None):self.redirect(self.get_argument("next"))别的:self.write(消息)global_message_buffer.new_messages([消息])类 MessageUpdatesHandler(tornado.web.RequestHandler):@gen.coroutinedef post(self):cursor = self.get_argument("cursor", None)# 保存wait_for_messages 返回的future 以便我们可以取消# 在 wait_for_messages 中self.future = global_message_buffer.wait_for_messages(cursor=cursor)消息 = 收益 self.future如果 self.request.connection.stream.closed():返回self.write(dict(messages=messages))def on_connection_close(self):global_message_buffer.cancel_wait(self.future)定义主():parse_command_line()app = tornado.web.Application([(r"/", MainHandler),(r"/a/message/new", MessageNewHandler),(r"/a/message/updates", MessageUpdatesHandler),],cookie_secret="__TODO:_GENERATE_YOUR_OWN_RANDOM_VALUE_HERE__",template_path=os.path.join(os.path.dirname(__file__), "templates"),static_path=os.path.join(os.path.dirname(__file__), "static"),xsrf_cookies=真,调试=选项.调试,)app.listen(options.port)tornado.ioloop.IOLoop.current().start()如果 __name__ == "__main__":主要的()

解决方案

我知道这是一个老问题,但如果它对其他人有用, 可以使用 tornado 和python-kafka 模块一起使用(尽管@sixstone 建议使用 kiel 也是一个不错的建议).

由于 python-kafaka 是阻塞的,而且我们还需要运行 tornado 主循环,我们需要将线程分开.在以下(较长的)示例中,我为 python-kafka 调用创建了一个线程,并将龙卷风 IOLoop 保留在主线程中.

该示例相当冗长,因为它还利用 websockets 在收到消息后立即发布.希望对于那些希望通过 websockets 将实时通知与 tornado 和 kafka 结合起来的人来说,增加的复杂性值得多花几行.

from __future__ import absolute_import, print_function进口藏品进口螺纹进口jinja2从 kafka 导入 KafkaConsumer导入 tornado.web导入 tornado.websocket# 一个全局存储一些历史...message_history = collections.deque([], maxlen=100)KafkaWebSocket 类(tornado.websocket.WebSocketHandler):# 全局跟踪打开的套接字,以便我们可以# 方便地与他们交流.open_sockets = set()@类方法def write_to_all(cls​​, message):可移动 = 设置()对于 cls.open_sockets 中的 ws:如果不是 ws.ws_connection 或不是 ws.ws_connection.stream.socket:可移动.add(ws)别的:ws.write_message(message)对于可移动的 ws:cls.open_sockets.remove(ws)定义打开(自我):# 我们不希望这些套接字被缓冲.self.set_nodelay(True)类型(self).open_sockets.add(self)类 MainHandler(tornado.web.RequestHandler):模板 = """<头><link href="//netdna.bootstrapcdn.com/twitter-bootstrap/2.3.1/css/bootstrap-combined.no-icons.min.css" rel="stylesheet"><script type="text/javascript" src="//ajax.googleapis.com/ajax/libs/jquery/1.8.2/jquery.min.js"></script><身体><div class="容器">{{ 消息|长度 }} 消息在缓存中:<br><br><div id="消息">{% 用于消息中的消息 %}<div>{{消息}}</div>{% 结束为 %}

<footer class="footer"><div class="容器">Web 套接字状态:<div id="message">未连接</div>

</页脚><脚本>var loc = window.location, new_uri;if (loc.protocol === "https:") {new_uri = "wss:";} 别的 {new_uri = "ws:";}new_uri += "//" + loc.host;new_uri += loc.pathname + "ws";var ws = new WebSocket(new_uri);var $message = $('#message');ws.onopen = 函数(){$message.attr("class", 'label label-success');$message.text('打开');};ws.onmessage = 函数(ev){$message.attr("class", 'label label-info');$message.hide();$message.fadeIn("慢");$message.text('收到消息' + new Date().toLocaleString());$('#messages').append("

" + ev.data + "

")};ws.onclose = 函数(ev){$message.attr("class", 'label label-important');$message.text('关闭');};ws.onerror = 函数(ev){$message.attr("class", 'label label-warning');$message.text('发生错误');};</html>"""定义获取(自我):env = jinja2.Environment()模板 = env.from_string(self.template)self.write(template.render(messages=message_history))类消费者(线程.线程):守护进程 = 真def __init__(self, kafka_consumer):self._consumer = kafka_consumer超级(消费者,自我).__init__()定义运行(自我):对于 self._consumer 中的消息:消息 = str(消息)message_history.append(消息)KafkaWebSocket.write_to_all(message)def make_app(*args, **kwargs):返回 tornado.web.Application([(r"/?", MainHandler),(r"/ws/?", KafkaWebSocket),], *args, **kwargs)如果 __name__ == "__main__":kafka_consumer = Consumer(KafkaConsumer('mytopic'))# 启动kafka消费者线程.kafka_consumer.start()app = make_app()app.listen(8889)io_loop = tornado.ioloop.IOLoop.current()io_loop.start()

I'm trying to make a simple chat app using tornado based on this

But also I want to use kafka to store the messages. How can I do that?

Now, I used this to make a consumer and somehow it's working but it's only printing on the console and I need the messages to show on the webpage, like the tornade app, only it's saved in kafka.

Here's my app.py code as of now

#!/usr/bin/env python
#
# Copyright 2009 Facebook
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import logging
import tornado.escape
import tornado.ioloop
import tornado.web
import os.path
import uuid

from tornado.concurrent import Future
from tornado import gen
from tornado.options import define, options, parse_command_line
from pykafka import KafkaClient


define("port", default=8888, help="run on the given port", type=int)
define("debug", default=False, help="run in debug mode")


class MessageBuffer(object):
    def __init__(self):
        self.waiters = set()
        self.cache = []
        self.cache_size = 200

    def wait_for_messages(self, cursor=None):
        # Construct a Future to return to our caller.  This allows
        # wait_for_messages to be yielded from a coroutine even though
        # it is not a coroutine itself.  We will set the result of the
        # Future when results are available.
        result_future = Future()
        if cursor:
            new_count = 0
            for msg in reversed(self.cache):
                if msg["id"] == cursor:
                    break
                new_count += 1
            if new_count:
                result_future.set_result(self.cache[-new_count:])
                return result_future
        self.waiters.add(result_future)
        return result_future

    def cancel_wait(self, future):
        self.waiters.remove(future)
        # Set an empty result to unblock any coroutines waiting.
        future.set_result([])

    def new_messages(self, messages):
        logging.info("Sending new message to %r listeners", len(self.waiters))
        for future in self.waiters:
            future.set_result(messages)
        self.waiters = set()
        self.cache.extend(messages)
        if len(self.cache) > self.cache_size:
            self.cache = self.cache[-self.cache_size:]

client = KafkaClient(hosts="127.0.0.1:9092")
topic = client.topics['test']
consumer = topic.get_simple_consumer()
for message in consumer:
    if message is not None:
        print message.value
# Making this a non-singleton is left as an exercise for the reader.
global_message_buffer = MessageBuffer()


class MainHandler(tornado.web.RequestHandler):
    def get(self):
        self.render("index.html", messages=global_message_buffer.cache)


class MessageNewHandler(tornado.web.RequestHandler):
    def post(self):
        message = {
            "id": str(uuid.uuid4()),
            "body": self.get_argument("body"),
        }
        # to_basestring is necessary for Python 3's json encoder,
        # which doesn't accept byte strings.
        message["html"] = tornado.escape.to_basestring(
            self.render_string("message.html", message=message))
        if self.get_argument("next", None):
            self.redirect(self.get_argument("next"))
        else:
            self.write(message)
        global_message_buffer.new_messages([message])


class MessageUpdatesHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def post(self):
        cursor = self.get_argument("cursor", None)
        # Save the future returned by wait_for_messages so we can cancel
        # it in wait_for_messages
        self.future = global_message_buffer.wait_for_messages(cursor=cursor)
        messages = yield self.future
        if self.request.connection.stream.closed():
            return
        self.write(dict(messages=messages))

    def on_connection_close(self):
        global_message_buffer.cancel_wait(self.future)


def main():
    parse_command_line()
    app = tornado.web.Application(
        [
            (r"/", MainHandler),
            (r"/a/message/new", MessageNewHandler),
            (r"/a/message/updates", MessageUpdatesHandler),
            ],
        cookie_secret="__TODO:_GENERATE_YOUR_OWN_RANDOM_VALUE_HERE__",
        template_path=os.path.join(os.path.dirname(__file__), "templates"),
        static_path=os.path.join(os.path.dirname(__file__), "static"),
        xsrf_cookies=True,
        debug=options.debug,
        )
    app.listen(options.port)
    tornado.ioloop.IOLoop.current().start()


if __name__ == "__main__":
    main()

解决方案

I know this is an old question, but in case it is useful to somebody else, it is possible to use tornado and the python-kafka module together (though @sixstone's suggestion of using kiel is also a good one).

Since python-kafaka is blocking, and we also need the tornado main loop to be running, we need separate the threads. In the following (longish) example I create a thread for the python-kafka call, and keep the tornado IOLoop in the main thread.

The example is fairly lengthy because it also makes use of websockets to publish the messages as soon as they are recieved. Hopefully the added complexity is worth the extra few lines for those wanting to combine real-time notifications through websockets with tornado and kafka.

from __future__ import absolute_import, print_function

import collections
import threading

import jinja2
from kafka import KafkaConsumer
import tornado.web
import tornado.websocket


# A global to store some history...
message_history = collections.deque([], maxlen=100)


class KafkaWebSocket(tornado.websocket.WebSocketHandler):
    # Keep track of open sockets globally, so that we can 
    # communicate with them conveniently.
    open_sockets = set()

    @classmethod
    def write_to_all(cls, message):
        removable = set()
        for ws in cls.open_sockets:
            if not ws.ws_connection or not ws.ws_connection.stream.socket:
                removable.add(ws)
            else:
                ws.write_message(message)
        for ws in removable:
            cls.open_sockets.remove(ws)

    def open(self):
        # We don't want these sockets to be buffered.
        self.set_nodelay(True)
        type(self).open_sockets.add(self)


class MainHandler(tornado.web.RequestHandler):
    template = """
<html>
<head>
<link href="//netdna.bootstrapcdn.com/twitter-bootstrap/2.3.1/css/bootstrap-combined.no-icons.min.css" rel="stylesheet">
<script type="text/javascript" src="//ajax.googleapis.com/ajax/libs/jquery/1.8.2/jquery.min.js"></script>

</head>
<body>
<div class="container">
{{ messages|length }} messages in cache:
<br><br>
<div id="messages">
{% for message in messages %}
 <div>{{ message }}</div>
{% endfor %}
</div>
</div>
    <footer class="footer">
      <div class="container">
    Web socket status: <div id="message">Not connected</div>
      </div>
    </footer>

<script>

var loc = window.location, new_uri;
if (loc.protocol === "https:") {
    new_uri = "wss:";
} else {
    new_uri = "ws:";
}
new_uri += "//" + loc.host;
new_uri += loc.pathname + "ws";

var ws = new WebSocket(new_uri);
var $message = $('#message');
ws.onopen = function(){
  $message.attr("class", 'label label-success');
  $message.text('open');
};
ws.onmessage = function(ev){
  $message.attr("class", 'label label-info');
  $message.hide();
  $message.fadeIn("slow");
  $message.text('recieved message ' + new Date().toLocaleString());
  $('#messages').append("<div>" + ev.data + "</div>")
};
ws.onclose = function(ev){
  $message.attr("class", 'label label-important');
  $message.text('closed');
};
ws.onerror = function(ev){
  $message.attr("class", 'label label-warning');
  $message.text('error occurred');
};
</script>

</body>
</html>
"""
    def get(self):
        env = jinja2.Environment()
        template = env.from_string(self.template)
        self.write(template.render(messages=message_history))


class Consumer(threading.Thread):
    daemon = True

    def __init__(self, kafka_consumer):
        self._consumer = kafka_consumer
        super(Consumer, self).__init__()

    def run(self):
        for message in self._consumer:
            message = str(message)
            message_history.append(message)
            KafkaWebSocket.write_to_all(message)


def make_app(*args, **kwargs):
    return tornado.web.Application([
        (r"/?", MainHandler),
        (r"/ws/?", KafkaWebSocket),
    ], *args, **kwargs)


if __name__ == "__main__":
    kafka_consumer = Consumer(KafkaConsumer('mytopic'))
    # Start the kafka consumer thread.
    kafka_consumer.start()

    app = make_app()
    app.listen(8889)

    io_loop = tornado.ioloop.IOLoop.current()
    io_loop.start()

这篇关于如何在龙卷风上使用 kafka?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
Python最新文章
热门教程
热门工具
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆