Cassandra的Spark2会话,SQL查询 [英] Spark2 session for Cassandra , sql queries

查看:74
本文介绍了Cassandra的Spark2会话,SQL查询的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在Spark-2.0中,什么是创建Spark会话的最佳方法.因为在Spark-2.0和Cassandra-中都对API进行了重新设计,从而实质上弃用了SqlContext(以及CassandraSqlContext).因此,为了执行SQL,我要么创建一个Cassandra会话(com.datastax.driver.core.Session)并使用execute(").或者我必须创建一个 SparkSession(org.apache.spark.sql.SparkSession)并执行sql(String sqlText)方法.

In Spark-2.0 what is the best way to create a Spark session. Because in both Spark-2.0 and Cassandra- the APIs have been reworked, essentially deprecating the SqlContext (and also CassandraSqlContext). So for executing SQL- either I create a Cassandra Session (com.datastax.driver.core.Session) and use execute( " "). Or I have to create a SparkSession (org.apache.spark.sql.SparkSession) and execute sql(String sqlText) method.

我也不知道两者的SQL限制-有人可以解释.

I don't know the SQL limitations of either - can someone explain.

此外,如果我必须创建SparkSession-我该怎么做-找不到任何合适的示例.随着API的重新设计,旧示例不起作用.我正在遍历此代码示例- DataFrames -不清楚在这里使用什么sql上下文(是正确的方法.)(由于某些原因,不推荐使用的API甚至没有编译-需要检查我的日食设置)

Also if I have to create the SparkSession - how do I do it- couldn't find any suitable example. With APIs getting reworked the old examples don't work. I was going thru this code sample- DataFrames- not clear what sql context is being used here (is that the right approach going forward.) (For some reason deprecated APIs are not even compiling - need to check my eclipse settings)

谢谢

推荐答案

您需要Cassandra Session才能从Cassandra DB创建/删除键空间和表.在Spark应用程序中,为了创建Cassandra会话,您需要将SparkConf传递给CassandraConnector.在Spark 2.0中,您可以按以下方式进行操作.

You would need Cassandra Session for create/drop keyspace and table from Cassandra DB. In Spark application, in order to create Cassandra Session you need to pass SparkConf to CassandraConnector. In Spark 2.0 you can do it like below.

 SparkSession spark = SparkSession
              .builder()
              .appName("SparkCassandraApp")
              .config("spark.cassandra.connection.host", "localhost")
              .config("spark.cassandra.connection.port", "9042")
              .master("local[2]")
              .getOrCreate();

CassandraConnector connector = CassandraConnector.apply(spark.sparkContext().conf());
Session session = connector.openSession();
session.execute("CREATE TABLE mykeyspace.mytable(id UUID PRIMARY KEY, username TEXT, email TEXT)");

如果您已有数据框,则也可以使用 DataFrameFunctions.createCassandraTable(Df)在Cassandra中创建表.请参阅api详细信息

If you have existing Dataframe then you can create table in Cassandra using DataFrameFunctions.createCassandraTable(Df) as well. See api details here.

您可以使用spark-cassandra-connector提供的api从Cassandra DB读取数据,如下所示.

You can read data from Cassandra DB using api provided by spark-cassandra-connector like below.

Dataset<Row> dataset = spark.read().format("org.apache.spark.sql.cassandra")
            .options(new HashMap<String, String>() {
                {
                    put("keyspace", "mykeyspace");
                    put("table", "mytable");
                }
            }).load();

dataset.show(); 

您可以使用SparkSession.sql()方法在由spark cassandra连接器返回的Dataframe上创建的临时表上运行查询,如下所示.

You can use SparkSession.sql() method to run query on temporary table created on Dataframe returned by spark cassandra connector like below.

dataset.createOrReplaceTempView("usertable");
Dataset<Row> dataset1 = spark.sql("select * from usertable where username = 'Mat'");
dataset1.show();

这篇关于Cassandra的Spark2会话,SQL查询的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆