尝试从python写入cassandra时CQL查询中的语法错误 [英] Syntax error in CQL query when trying to write to cassandra from python

查看:139
本文介绍了尝试从python写入cassandra时CQL查询中的语法错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

因此,我正在用python构建一个应用程序,该应用程序从twitter获取数据,然后将其保存到cassandra.我当前的问题在于一个脚本,该脚本从kafka读取数据,然后尝试将其写入cassandra,如下所示:

So, I am building an application in python that takes data from twitter and then saves it to cassandra. My current problems lies in a script which reads data from kafka and tries to write it to cassandra, as follows:

import threading, logging, time
import multiprocessing
from cassandra.cluster import Cluster

from kafka import KafkaConsumer, KafkaProducer




class Consumer(multiprocessing.Process):
   def __init__(self):
        multiprocessing.Process.__init__(self)
        self.stop_event = multiprocessing.Event()

    def stop(self):
         self.stop_event.set()

    def run(self):
       consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
                                 auto_offset_reset='earliest',
                                 consumer_timeout_ms=1000)
        consumer.subscribe(['twitter'])



    while not self.stop_event.is_set():
        for message in consumer:
            # session.execute(
            #     """
            #     INSERT INTO mensaje_73 (tweet)
            #     VALUES (message)
            #     """
            # )
            print(message)
            cluster = Cluster()
            session = cluster.connect('twitter')
            session.execute(
                    """
                    INSERT INTO mensaje_73 (tweet)
                    VALUES (message)
                    """
                )

            # if self.stop_event.is_set():
            #     break

    consumer.close()


   def main():

    tasks = [
        Consumer()
    ]

    for t in tasks:
        t.start()

    time.sleep(10)

    for task in tasks:
        task.stop()



if __name__ == "__main__":
     logging.basicConfig(
        format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:% 
   (levelname)s:%(process)d:%(message)s',
        level=logging.INFO
    )
    main()

我试图将测试消息插入表twitter.mensaje_73中,并且效果很好,如下所示:

I have tried to insert test messages into table twitter.mensaje_73 and it has worked perfectly, as here:

import threading, logging, time
import multiprocessing
from cassandra.cluster import Cluster

from kafka import KafkaConsumer, KafkaProducer


cluster = Cluster()
session = cluster.connect('twitter')
session.execute(
    """
    INSERT INTO mensaje_73 (tweet)
    VALUES ('helooo')
    """
)

任何帮助将不胜感激:)

Any help would be deeply appreciated :)

推荐答案

所以这里的问题是,您的message变量被视为CQL中的文字,如果没有单引号,该变量将不起作用.因此,错误.

So the problem here, is that your message variable is being treated as a literal in CQL, which won't work without single quotes. Hence, the error.

要解决此问题,我将使用准备好的语句,然后将message绑定到它:

To fix this, I would go the route of using a prepared statement, and then bind message to it:

session = cluster.connect('twitter')
preparedTweetInsert = session.prepare(
        """
        INSERT INTO mensaje_73 (tweet)
        VALUES (?)
        """
    )
session.execute(preparedTweetInsert,[message])

尝试一下,看看是否有帮助.

Give that a try, and see if it helps.

此外,这似乎是一个简单的数据模型.但是要问自己一件事,您将如何查询此数据?除非tweet是您唯一的PRIMARY KEY,否则它将无法正常工作.这也意味着,查询消息的唯一方法是输入消息的确切文本.需要考虑一下,但是按天划分分区可能是一个更好的选择,因为它可以很好地分布并提供更好的查询模型.

Also, this seems like a simple data model. But one thing to ask yourself, is how are you going to query this data? This wouldn't work unless tweet was your only PRIMARY KEY. Which also means that the only way you can query an individual tweet, is by the exact text of the message. Something to think about, but partitioning it by day might be a better option as it will distribute well and provide a much better query model.

这篇关于尝试从python写入cassandra时CQL查询中的语法错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆