用于 Cassandra 的 Spark2 会话,sql 查询 [英] Spark2 session for Cassandra , sql queries

查看:25
本文介绍了用于 Cassandra 的 Spark2 会话,sql 查询的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在 Spark-2.0 中,创建 Spark 会话的最佳方式是什么.因为在 Spark-2.0 和 Cassandra 中,API 都经过了重新设计,基本上弃用了 SqlContext(以及 CassandraSqlContext).因此,为了执行 SQL - 我要么创建一个 Cassandra 会话 (com.datastax.driver.core.Session) 并使用 execute(" ").或者我必须创建一个 SparkSession (org.apache.spark.sql.SparkSession) 并执行 sql(String sqlText) 方法.

In Spark-2.0 what is the best way to create a Spark session. Because in both Spark-2.0 and Cassandra- the APIs have been reworked, essentially deprecating the SqlContext (and also CassandraSqlContext). So for executing SQL- either I create a Cassandra Session (com.datastax.driver.core.Session) and use execute( " "). Or I have to create a SparkSession (org.apache.spark.sql.SparkSession) and execute sql(String sqlText) method.

我不知道两者的 SQL 限制 - 有人可以解释一下.

I don't know the SQL limitations of either - can someone explain.

此外,如果我必须创建 SparkSession - 我该怎么做 - 找不到任何合适的示例.随着 API 的重新设计,旧的示例不起作用.我正在浏览此代码示例 - DataFrames- 不清楚这里使用的是什么 sql 上下文(这是正确的方法.)(出于某种原因,已弃用的 API 甚至没有编译 - 需要检查我的 eclipse 设置)

Also if I have to create the SparkSession - how do I do it- couldn't find any suitable example. With APIs getting reworked the old examples don't work. I was going thru this code sample- DataFrames- not clear what sql context is being used here (is that the right approach going forward.) (For some reason deprecated APIs are not even compiling - need to check my eclipse settings)

谢谢

推荐答案

您需要 Cassandra Session 来从 Cassandra DB 创建/删除键空间和表.在 Spark 应用程序中,为了创建 Cassandra Session,您需要将 SparkConf 传递给 CassandraConnector.在 Spark 2.0 中,你可以像下面那样做.

You would need Cassandra Session for create/drop keyspace and table from Cassandra DB. In Spark application, in order to create Cassandra Session you need to pass SparkConf to CassandraConnector. In Spark 2.0 you can do it like below.

 SparkSession spark = SparkSession
              .builder()
              .appName("SparkCassandraApp")
              .config("spark.cassandra.connection.host", "localhost")
              .config("spark.cassandra.connection.port", "9042")
              .master("local[2]")
              .getOrCreate();

CassandraConnector connector = CassandraConnector.apply(spark.sparkContext().conf());
Session session = connector.openSession();
session.execute("CREATE TABLE mykeyspace.mytable(id UUID PRIMARY KEY, username TEXT, email TEXT)");

如果您有现有的 Dataframe,那么您也可以使用 DataFrameFunctions.createCassandraTable(Df) 在 Cassandra 中创建表.查看api详情 此处.

If you have existing Dataframe then you can create table in Cassandra using DataFrameFunctions.createCassandraTable(Df) as well. See api details here.

您可以使用 spark-cassandra-connector 提供的 api 从 Cassandra DB 读取数据,如下所示.

You can read data from Cassandra DB using api provided by spark-cassandra-connector like below.

Dataset<Row> dataset = spark.read().format("org.apache.spark.sql.cassandra")
            .options(new HashMap<String, String>() {
                {
                    put("keyspace", "mykeyspace");
                    put("table", "mytable");
                }
            }).load();

dataset.show(); 

您可以使用 SparkSession.sql() 方法对在 Spark cassandra 连接器返回的 Dataframe 上创建的临时表运行查询,如下所示.

You can use SparkSession.sql() method to run query on temporary table created on Dataframe returned by spark cassandra connector like below.

dataset.createOrReplaceTempView("usertable");
Dataset<Row> dataset1 = spark.sql("select * from usertable where username = 'Mat'");
dataset1.show();

这篇关于用于 Cassandra 的 Spark2 会话,sql 查询的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆