如何在龙卷风上使用kafka? [英] How to use kafka on tornado?

查看:66
本文介绍了如何在龙卷风上使用kafka?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试基于

但是我也想使用kafka来存储消息.我怎样才能做到这一点?

But also I want to use kafka to store the messages. How can I do that?

现在,我使用了来制作使用者并以某种方式工作,但它只能在控制台上打印而且我需要将消息显示在网页上,例如Tornade应用程序,仅将其保存在kafka中.

Now, I used this to make a consumer and somehow it's working but it's only printing on the console and I need the messages to show on the webpage, like the tornade app, only it's saved in kafka.

这是到目前为止我的app.py代码

Here's my app.py code as of now

#!/usr/bin/env python
#
# Copyright 2009 Facebook
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import logging
import tornado.escape
import tornado.ioloop
import tornado.web
import os.path
import uuid

from tornado.concurrent import Future
from tornado import gen
from tornado.options import define, options, parse_command_line
from pykafka import KafkaClient


define("port", default=8888, help="run on the given port", type=int)
define("debug", default=False, help="run in debug mode")


class MessageBuffer(object):
    def __init__(self):
        self.waiters = set()
        self.cache = []
        self.cache_size = 200

    def wait_for_messages(self, cursor=None):
        # Construct a Future to return to our caller.  This allows
        # wait_for_messages to be yielded from a coroutine even though
        # it is not a coroutine itself.  We will set the result of the
        # Future when results are available.
        result_future = Future()
        if cursor:
            new_count = 0
            for msg in reversed(self.cache):
                if msg["id"] == cursor:
                    break
                new_count += 1
            if new_count:
                result_future.set_result(self.cache[-new_count:])
                return result_future
        self.waiters.add(result_future)
        return result_future

    def cancel_wait(self, future):
        self.waiters.remove(future)
        # Set an empty result to unblock any coroutines waiting.
        future.set_result([])

    def new_messages(self, messages):
        logging.info("Sending new message to %r listeners", len(self.waiters))
        for future in self.waiters:
            future.set_result(messages)
        self.waiters = set()
        self.cache.extend(messages)
        if len(self.cache) > self.cache_size:
            self.cache = self.cache[-self.cache_size:]

client = KafkaClient(hosts="127.0.0.1:9092")
topic = client.topics['test']
consumer = topic.get_simple_consumer()
for message in consumer:
    if message is not None:
        print message.value
# Making this a non-singleton is left as an exercise for the reader.
global_message_buffer = MessageBuffer()


class MainHandler(tornado.web.RequestHandler):
    def get(self):
        self.render("index.html", messages=global_message_buffer.cache)


class MessageNewHandler(tornado.web.RequestHandler):
    def post(self):
        message = {
            "id": str(uuid.uuid4()),
            "body": self.get_argument("body"),
        }
        # to_basestring is necessary for Python 3's json encoder,
        # which doesn't accept byte strings.
        message["html"] = tornado.escape.to_basestring(
            self.render_string("message.html", message=message))
        if self.get_argument("next", None):
            self.redirect(self.get_argument("next"))
        else:
            self.write(message)
        global_message_buffer.new_messages([message])


class MessageUpdatesHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def post(self):
        cursor = self.get_argument("cursor", None)
        # Save the future returned by wait_for_messages so we can cancel
        # it in wait_for_messages
        self.future = global_message_buffer.wait_for_messages(cursor=cursor)
        messages = yield self.future
        if self.request.connection.stream.closed():
            return
        self.write(dict(messages=messages))

    def on_connection_close(self):
        global_message_buffer.cancel_wait(self.future)


def main():
    parse_command_line()
    app = tornado.web.Application(
        [
            (r"/", MainHandler),
            (r"/a/message/new", MessageNewHandler),
            (r"/a/message/updates", MessageUpdatesHandler),
            ],
        cookie_secret="__TODO:_GENERATE_YOUR_OWN_RANDOM_VALUE_HERE__",
        template_path=os.path.join(os.path.dirname(__file__), "templates"),
        static_path=os.path.join(os.path.dirname(__file__), "static"),
        xsrf_cookies=True,
        debug=options.debug,
        )
    app.listen(options.port)
    tornado.ioloop.IOLoop.current().start()


if __name__ == "__main__":
    main()

推荐答案

我知道这是一个老问题,但是如果对其他人有用,则 可以使用龙卷风和python-kafka模块在一起(尽管@sixstone建议使用kiel也是一个好方法).

I know this is an old question, but in case it is useful to somebody else, it is possible to use tornado and the python-kafka module together (though @sixstone's suggestion of using kiel is also a good one).

由于python-kafaka被阻止,并且我们还需要运行龙卷风主循环,因此我们需要单独的线程.在以下(冗长的)示例中,我为python-kafka调用创建了一个线程,并将龙卷风IOLoop保留在主线程中.

Since python-kafaka is blocking, and we also need the tornado main loop to be running, we need separate the threads. In the following (longish) example I create a thread for the python-kafka call, and keep the tornado IOLoop in the main thread.

该示例相当冗长,因为它还利用websocket在收到消息后立即发布消息.希望对于那些想通过websockets与龙卷风和kafka结合实时通知的人来说,增加的复杂性值得这几行.

The example is fairly lengthy because it also makes use of websockets to publish the messages as soon as they are recieved. Hopefully the added complexity is worth the extra few lines for those wanting to combine real-time notifications through websockets with tornado and kafka.

from __future__ import absolute_import, print_function

import collections
import threading

import jinja2
from kafka import KafkaConsumer
import tornado.web
import tornado.websocket


# A global to store some history...
message_history = collections.deque([], maxlen=100)


class KafkaWebSocket(tornado.websocket.WebSocketHandler):
    # Keep track of open sockets globally, so that we can 
    # communicate with them conveniently.
    open_sockets = set()

    @classmethod
    def write_to_all(cls, message):
        removable = set()
        for ws in cls.open_sockets:
            if not ws.ws_connection or not ws.ws_connection.stream.socket:
                removable.add(ws)
            else:
                ws.write_message(message)
        for ws in removable:
            cls.open_sockets.remove(ws)

    def open(self):
        # We don't want these sockets to be buffered.
        self.set_nodelay(True)
        type(self).open_sockets.add(self)


class MainHandler(tornado.web.RequestHandler):
    template = """
<html>
<head>
<link href="//netdna.bootstrapcdn.com/twitter-bootstrap/2.3.1/css/bootstrap-combined.no-icons.min.css" rel="stylesheet">
<script type="text/javascript" src="//ajax.googleapis.com/ajax/libs/jquery/1.8.2/jquery.min.js"></script>

</head>
<body>
<div class="container">
{{ messages|length }} messages in cache:
<br><br>
<div id="messages">
{% for message in messages %}
 <div>{{ message }}</div>
{% endfor %}
</div>
</div>
    <footer class="footer">
      <div class="container">
    Web socket status: <div id="message">Not connected</div>
      </div>
    </footer>

<script>

var loc = window.location, new_uri;
if (loc.protocol === "https:") {
    new_uri = "wss:";
} else {
    new_uri = "ws:";
}
new_uri += "//" + loc.host;
new_uri += loc.pathname + "ws";

var ws = new WebSocket(new_uri);
var $message = $('#message');
ws.onopen = function(){
  $message.attr("class", 'label label-success');
  $message.text('open');
};
ws.onmessage = function(ev){
  $message.attr("class", 'label label-info');
  $message.hide();
  $message.fadeIn("slow");
  $message.text('recieved message ' + new Date().toLocaleString());
  $('#messages').append("<div>" + ev.data + "</div>")
};
ws.onclose = function(ev){
  $message.attr("class", 'label label-important');
  $message.text('closed');
};
ws.onerror = function(ev){
  $message.attr("class", 'label label-warning');
  $message.text('error occurred');
};
</script>

</body>
</html>
"""
    def get(self):
        env = jinja2.Environment()
        template = env.from_string(self.template)
        self.write(template.render(messages=message_history))


class Consumer(threading.Thread):
    daemon = True

    def __init__(self, kafka_consumer):
        self._consumer = kafka_consumer
        super(Consumer, self).__init__()

    def run(self):
        for message in self._consumer:
            message = str(message)
            message_history.append(message)
            KafkaWebSocket.write_to_all(message)


def make_app(*args, **kwargs):
    return tornado.web.Application([
        (r"/?", MainHandler),
        (r"/ws/?", KafkaWebSocket),
    ], *args, **kwargs)


if __name__ == "__main__":
    kafka_consumer = Consumer(KafkaConsumer('mytopic'))
    # Start the kafka consumer thread.
    kafka_consumer.start()

    app = make_app()
    app.listen(8889)

    io_loop = tornado.ioloop.IOLoop.current()
    io_loop.start()

这篇关于如何在龙卷风上使用kafka?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆