Cassandra在多个进程中同步执行阻塞同步请求 [英] Cassandra assynchronous execution in multiple processes blocking synchronous requests

查看:397
本文介绍了Cassandra在多个进程中同步执行阻塞同步请求的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个应用程序读取一系列XML文件,其中包含道路中车辆段落的日志。应用程序然后处理每个记录,转换一些信息以匹配数据库列,并将其插入到cassandra数据库(在远程服务器中运行单个节点[它在内部网络中,因此连接不是真正的问题])。 。在数据库中插入数据之后,每个文件的过程接着读取这些数据并产生摘要表的信息,这些信息准备好在应用程序的无关部分进行的深入分析。

I have an application that reads a series of XML files containing logs of vehicles passages in a road. The application then processes each record, transform a few of the informations to match the database columns and inserts it into a cassandra database (running a single node in a remote server [it's in an internal network so connection isn't really an issue]). After inserting data in the database, the process for each file then goes on to read this data and produce information for summary tables, that leaves information ready for a drilldown analysis made in an unrelated part of the application.

我使用多处理并行处理许多XML文件,我遇到的麻烦是与cassandra服务器通信。示意图,过程如下:

I'm using multiprocessing to process many XML files in parallel, and the trouble I'm having is with communicating to the cassandra server. Schematically, the process goes as follows:


  1. 从XML文件中读取记录

  2. 处理记录的数据

  3. 将处理的数据插入数据库(使用 .execute_async(query)


  4. 从数据库读取数据

  5. 处理读取的数据

  6. 在汇总表中插入处理的数据

  1. Read record from XML file
  2. Process record's data
  3. insert processed data into the database (using .execute_async(query))
  4. repeat 1 to 3 until the XMl file is over
  5. Wait for the responses of all the insert queries I made
  6. Read data from the database
  7. Process the read data
  8. Insert the processed data in summary tables

现在,这在多个并行进程中运行顺利,直到一个进程转到步骤6,其请求(使用 .execute(query)我会等待响应)总是面临超时。我收到的错误是:

Now, this is running smoothly in multiple parallel processes, until, when one process goes on to step 6, its request (that's made using .execute(query), meaning I'll wait for the response) is always facing a timeout. The error I receive is:

 Process ProcessoImportacaoPNCT-1:
Traceback (most recent call last):
  File "C:\Users\Lucas\Miniconda\lib\multiprocessing\process.py", line 258, in _bootstrap
    self.run()
  File "C:\Users\Lucas\PycharmProjects\novo_importador\app\core\ImportacaoArquivosPNCT.py", line 231, in run
    core.CalculoIndicadoresPNCT.processa_equipamento(sessao_cassandra, equipamento, data, sentido, faixa)
  File "C:\Users\Lucas\PycharmProjects\novo_importador\app\core\CalculoIndicadoresPNCT.py", line 336, in processa_equipamento
    desvio_medias(sessao_cassandra, equipamento, data_referencia, sentido, faixa)
  File "C:\Users\Lucas\PycharmProjects\novo_importador\app\core\CalculoIndicadoresPNCT.py", line 206, in desvio_medias
    veiculos = sessao_cassandra.execute(sql_pronto)
  File "C:\Users\Lucas\Miniconda\lib\site-packages\cassandra\cluster.py", line 1594, in execute
    result = future.result(timeout)
  File "C:\Users\Lucas\Miniconda\lib\site-packages\cassandra\cluster.py", line 3296, in result
    raise self._final_exception
ReadTimeout: code=1200 [Coordinator node timed out waiting for replica nodes' responses] message="Operation timed out - received only 0 responses." info={'received_responses': 0, 'required_responses': 1, 'consistency': 'ONE'}

我已经改变了服务器的超时时间为荒谬的时间(例如500000000毫秒),我也尝试在客户端设置超时限制, .execute(query,timeout = 3000 )但仍然没有成功。

I have changed the timeout in the server to absurd amounts of time (500000000 ms for instance), and I have also attempted setting the timeout limit in the client, with .execute(query, timeout=3000) but still, no success.

现在,当更多的进程遇到同样的问题,停止,进入步骤6的最后的过程成功地遵循该过程,这使得我认为问题是卡桑德拉优先考虑我每秒要求的成千上万个插入请求,并且忽略我的读请求或者把它放回行。

Now, when more processes hit the same problem and the intense writing from steps 1-3 in multiple processes stops, the last processes to get to step 6 have success in following the procedure, which makes me think the problem is that cassandra is giving priority to the tens of thousands of insert requests I'm asking per second and either ignoring my read request or putting it way back in the line.

在我看来,解决这个问题的方法是,如果以任何方式,我可以要求cassandra优先考虑我的读取请求,以便我可以继续处理,甚至如果这意味着减缓其他过程。

A way to solve this, in my opinion, would be if in any way I could ask cassandra to give priority to my read request so that I can keep processing, even if that means slowing down the other processes.

现在,作为一个旁注,你可能认为我的过程建模不是最佳的,我想听到意见但是,对于这个应用程序的现实,这是,在我们的愿景,最好的方式进行。因此,我们实际上广泛地考虑了优化过程,但是(如果cassandra服务器可以处理它),这对我们的现实是最佳的。

Now, as a side note, you might think my process modelling is not optimal, and I'd love to hear opinions on that, but for the reality of this application this is, in our vision, the best way to proceed. So we have actually thought extensively about optimising the process, but (if the cassandra server can handle it) this is optimal for our reality.

所以,TL; DR:Is有一种方法在执行数万个同步查询时优先考虑查询?如果没有,是否有一种方法执行成千上万的插入查询和读取查询每秒,请求不超时?另外,你会建议我做什么来解决问题?并行运行少过程显然是一个解决方案,但我想避免一个。

So, TL;DR: Is there a way of giving priority to a query when executing tens of thousands of assynchronous queries? If not, is there a way of executing tens of thousands of insert queries and read queries per second in a way that the requests don't timeout? additionally, what would you suggest I do to solve the problem? run less processes in parallel is obviously a solution but one I'm trying to avoid. So, Would love to hear everyone's thoughts.

在插入时存储数据,所以我不需要再次读取它来进行摘要是不可能的,因为XML文件是巨大的和内存是一个问题。

Storing the data while inserting so I don't need to read it again for summary is not a possibility because the XML files are huge and memory is an issue.

推荐答案

我不知道一种优先阅读查询的方法。我相信内部Cassandra有单独的线程池读和写操作,所以这些并行运行。没有看到你正在做的模式和查询,很难说,如果你正在做一个非常昂贵的读取操作,或者如果系统只是这样淹没了写入,它不能跟上读取。

I don't know of a way to give priority to read queries. I believe internally Cassandra has separate thread pools for read and write operations, so those are running in parallel. Without seeing the schema and queries you're doing, it's hard to say if you are doing a very expensive read operation or if the system is just so swamped with writes that it can't keep up with the reads.

您可能希望在应用程序运行时尝试监控Cassandra中的操作。有几个工具,你可以用来监视发生了什么。例如,如果你ssh到你的Cassandra节点并运行:

You might want to try monitoring what's going on in Cassandra as your application is running. There are several tools you can use to monitor what's going on. For example, if you ssh to your Cassandra node and run:

watch -n 1 nodetool tpstats

这将显示线程池统计信息(每秒更新一次)。您将能够看到队列是否正在填充或操作被阻止。如果任何Dropped计数器增加,这是一个迹象,你没有足够的容量,你想要做的。如果是这种情况,那么通过添加更多节点来添加容量,或者更改你的模式和方法,以便节点做更少的工作。

This will show you the thread pool stats (updated once per second). You'll be able to see if the queues are filling up or operations are getting blocked. If any of the "Dropped" counters increase, that's a sign you don't have enough capacity for what you're trying to do. If that's the case, then add capacity by adding more nodes, or change your schema and approach so that the node has less work to do.

在linux上使用手表-n 1连续监视):

Other useful things to monitor (on linux use watch -n 1 to monitor continuously):

nodetool compactionstats
nodetool netstats
nodetool cfstats <keyspace.table name>
nodetool cfhistograms <keyspace> <table name>

使用linux命令(如top和iostat)监视节点以检查CPU利用率和磁盘利用。

It also good to monitor the node with linux commands like top and iostat to check the CPU utilization and disk utilization.

我所说的印象是,你的单个节点没有足够的能力来完成你所有的工作,所以你需要每单位时间处理更少的数据,或者添加更多的Cassandra节点来分散工作负载。

My impression from what you say is that your single node doesn't have enough capacity to do all the work you're giving it, so either you need to process less data per unit of time, or add more Cassandra nodes to spread out the workload.

我目前面临自己的超时错误, ,所以我可能必须添加基数到我的分区键,使每个分区的内容更小。

I'm currently facing my own timeout error due to partitions having too many rows, so I may have to add cardinality to my partition key to make the contents of each partition smaller.

这篇关于Cassandra在多个进程中同步执行阻塞同步请求的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆