SQL查询针对Apache Ignite缓存返回空结果 [英] SQL query returns empty result for Apache Ignite cache

查看:221
本文介绍了SQL查询针对Apache Ignite缓存返回空结果的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试从Spark RDD向Ignite缓存执行插入。我正在使用2.2版的Ignite和2.1版的Spark。

I am trying to perform an insert into an Ignite cache from a Spark RDD. I'm using version 2.2 of Ignite and 2.1 of Spark.

我要做的第一步是在单独的scala脚本中创建缓存,如下所示:

The first step I take, is to create the cache in a separate scala script, like so:

object Create_Ignite_Cache {

case class Custom_Class(
                   @(QuerySqlField @field)(index = true)   a: String,
                   @(QuerySqlField @field)(index = true)  b: String,
                   @(QuerySqlField @field)(index = true)  c: String,
                   @(QuerySqlField @field)(index = true)  d: String,
                   @(QuerySqlField @field)(index = true)  e: String,
                   @(QuerySqlField @field)(index = true)  f: String,
                   @(QuerySqlField @field)(index = true)  g: String,
                   @(QuerySqlField @field)(index = true)  h: String

                 )
def main(args: Array[String]): Unit = {
 val spi = new TcpDiscoverySpi
 val ipFinder = new TcpDiscoveryMulticastIpFinder
 val adresses = new util.ArrayList[String]
 adresses.add("127.0.0.1:48500..48520")
 ipFinder.setAddresses(adresses)
 spi.setIpFinder(ipFinder)
 val cfg = new IgniteConfiguration().setDiscoverySpi(spi).setClientMode(true)
 val cache_conf = new CacheConfiguration[String, Custom_Class]().setCacheMode(CacheMode.PARTITIONED).setAtomicityMode(CacheAtomicityMode.ATOMIC).setBackups(1).setIndexedTypes(classOf[String], classOf[Custom_Class]).setName("Spark_Ignite")
 val ignite = Ignition.getOrStart(cfg)
 ignite.getOrCreateCache(cache_conf)
 System.out.println("[INFO] CACHE CREATED")
 ignite.close()
}
}

成功创建了缓存,从ignitevisor可以看到:

The cache is created successfully, as can be seen from the ignitevisor:

接下来,我运行了一个Spark应用,以插入igniteRDD进入缓存:

Next I ran a Spark app to insert the contents of an igniteRDD into the cache:

object Spark_Streaming_Processing {

 case class Custom_Class(
                      @(QuerySqlField @field)(index = true) a: String,
                      @(QuerySqlField @field)(index = true) b: String,
                      @(QuerySqlField @field)(index = true) c: String,
                      @(QuerySqlField @field)(index = true) d: String,
                      @(QuerySqlField @field)(index = true) e: String,
                      @(QuerySqlField @field)(index = true) f: String,
                      @(QuerySqlField @field)(index = true) g: String,
                      @(QuerySqlField @field)(index = true) h: String

                    )

   //START IGNITE CONTEXT

  val addresses=new util.ArrayList[String]()
  addresses.add("127.0.0.1:48500..48520")

  val igniteContext:IgniteContext=new IgniteContext(sqlContext.sparkContext,()=>
    new IgniteConfiguration().setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(new TcpDiscoveryVmIpFinder().setAddresses(addresses))
      ).setCacheConfiguration(new CacheConfiguration[String,Custom_Class]()
      .setName("Spark_Ignite").setBackups(1).setIndexedTypes(classOf[String],classOf[Custom_Class]))
    ,true)


  println(igniteContext.ignite().cacheNames())

  val ignite_cache_rdd:IgniteRDD[String,Custom_Class] =igniteContext.fromCache[String,Custom_Class]("Spark_Ignite")

  val processed_Pair:RDD[(String,Custom_Class)]=(...)// rdd with data, which as you can see has the correct datatypes as parameters

  ignite_cache_rdd.savePairs(processed_PairRDD)

}
  }

可以看出,这些类是完全相同的。

As can be seen, the classes are completely identical.

成功运行应用程序后,在ignitevisor中我可以看到缓存包含63条记录,如先前的快照所示

After running the app successfully, I can see in ignitevisor that the cache contains 63 records, as can be seen in the previous screeshot of the console.

但是,如果我尝试对缓存执行sql查询,就像这样:

However, if I try to perform an sql query to the cache, like so:

      ignite_cache_rdd.sql("select * from Custom_Class").show(truncate = false)

我得到一个空表。

如果我通过外部sql服务器查询,也会发生同样的事情。

The same thing happens if I query via an external sql server.

奇怪的是,如果我不这样做创建先验缓存,然后运行Spark应用程序,如果IgniteContext不存在,则IgniteContext将创建缓存,并且之后我可以在查询中看到记录!

Curiously, if I don't create the cache apriori, and run the Spark app, the IgniteContext creates the cache if it doesnt exist and THEN I am able to see records in my queries!

这里可能是什么问题?

What might be the problem here?

据我所知,键和值的数据类型完全相同,因此在查询时我应该能够看到它们。

As far as I can tell the data types for both the key and values are exactly the same, so I should be able to see them when I query.

谢谢您的时间。

推荐答案

这里的问题是您使用创建缓存并将数据插入其中的不同类。
即使这两个类的字段匹配,但它们的完全限定名称是不同的,因此它们是两个不同的类。

The problem here is that you use different classes to create cache and insert data into it. Even though fields of these two classes match, their fully-qualified names are different, so these are two different classes.

如果您希望能够从SQL查询数据,您应该在创建缓存和插入数据的过程中使用同一类。

If you want to be able to query data from SQL, you should use the same class during cache creation and insertion of data.

跳过缓存创建可以解决问题的原因是Spark应用程序创建了缓存本身,而不是使用现有的。因此,当Spark创建它时,将在创建缓存时使用实际对象的类。

The reason why skipping cache creation solves the problem is that Spark app creates a cache itself instead of using an existing one. So, when Spark creates it, the class of the actual objects is used during cache creation.

这篇关于SQL查询针对Apache Ignite缓存返回空结果的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆