将Cassandra数据加载到Dask Dataframe中 [英] Loading Cassandra Data into Dask Dataframe

查看:111
本文介绍了将Cassandra数据加载到Dask Dataframe中的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试将cassandra数据库中的数据加载到Dask数据框中。我尝试查询以下内容均未成功:

I am trying to load data from a cassandra database into a Dask dataframe. I have tried querying the following with no success:

query="""SELECT * FROM document_table"""
df = man.session.execute(query)
df = dd.DataFrame(list(df)) 







TypeError                                 Traceback (most recent call last)
<ipython-input-135-021507f6f2ab> in <module>()
----> 1 a = dd.DataFrame(list(df))

    TypeError: __init__() missing 3 required positional arguments: 'name', 'meta', and 'divisions'

有人知道一种简单的方法将数据直接从Cassandra加载到Dask吗?

Does anybody know an easy way to load data directly from Cassandra into Dask? It is too much memory too load into pandas first.

推荐答案

您的代码存在一些问题:

Some problems with your code:


  • df = 可能会将整个数据集加载到内存中。 Dask在这里没有被调用,它在其中没有任何作用。知道Cassandra驱动程序的人可以确认这一点。

  • the line df = presumably loads the whole data-set into memory. Dask is not invoked here, it plays no part in this. Someone with knowledge of the Cassandra driver can confirm this.

list(df)会生成一个列表。数据框的列名,并删除所有数据

list(df) produces a list of the column names of a dataframe and drops all the data

dd.DataFrame ,如果您阅读了文档不是这样构造的。

dd.DataFrame, if you read the docs is not constructed like this.

您可能想做的是:a)创建一个返回数据分区的函数,b)延迟此函数并使用分区的各种值进行调用c)使用 dd.from_delayed 来创建dask数据帧。例如,假设该表具有字段 partfield ,该字段方便地具有值1..6,并且每个分区的行数相似:

What you probably want to do is a) make a function that returns one partition of the data, b) delay this function and call with the various values of the partitions c) use dd.from_delayed to make the dask dataframe. E.g., assuming the table has a field partfield which handily has possible values 1..6 and similar number of rows for each partition:

@dask.delayed
def part(x):
    session = # construct Cassandra session
    q = "SELECT * FROM document_table WHERE partfield={}".format(x)
    df = man.session.execute(query)
    return dd.DataFrame(list(df)) 

parts = [part(x) for x in range(1, 7)]
df = dd.from_delayed(parts)

这篇关于将Cassandra数据加载到Dask Dataframe中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆