点燃TCP SPI发现,并在嵌入式中点燃内存管理 [英] Ignite TCP SPI discovery, and memory management in ignite embedded

查看:54
本文介绍了点燃TCP SPI发现,并在嵌入式中点燃内存管理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

用例说明:我想使用嵌入式的ignite来维护内存中的缓存,以加快我的spark工作.

Use case description: I want to use ignite embedded to maintain in-memory cache to speed up my spark jobs.

1)TCP SPI发现如何在嵌入式模式下工作?该文档指出,在点火嵌入中,点火节点的生命周期由spark管理,并且从spark作业本身内部启动和杀死节点.由于点火节点绑定到YARN容器,因此仍然需要通过SPI配置吗?还是自动/动态地进行服务发现?

1) How does TCP SPI discovery work in ignite embedded mode? The documentation states, that in ignite embedded the life cycle of ignite nodes is managed by spark, and the nodes are started and killed from inside the spark job itself. Since, ignite nodes are bound to YARN containers, so is it still necessary to pass the SPI configuration? or does service discovery happen automatically/dynamically?

2)建立在第一个问题的基础上:我们如何着手启动一个火花作业,比如说有4个火花执行器,但只触发2个点火节点?

2) building on first question: How do we go about launching a spark job with say, 4 spark executors but fire only 2 ignite nodes?

3)我正在提供一个示例代码,在此示例代码下面进行开发,由于超出了内存,我的工作被取消了.我已经按照原始文档中的说明浏览了容量计划页面.我的数据大约有300 MB,在最坏的情况下,我希望它消耗大约1.5 GB的内存,没有复制,并且在一个整数字段上建立索引.

3) I am providing a sample code below which I developed, and my job was killed as it exceeded the memory. I have already gone through the capacity planning page as specified in the original documentation. My data is around 300 MB, and I expect it to consume around 1.5 GB memory in worst case scenario, with no replication, and index on one integer field.

我的集群配置:1个主服务器-24 GB内存,2个核心CPU和2个从属设备-8GB内存,2个核心CPU

my cluster configuration: 1 master- 24 GB memory, 2 Core CPU and 2 slaves- 8GB memory, 2 Core CPU

import java.io.Serialisable
import org.apache.spark._
import org.apache.ignite.spark._
import org.apache.ignite.configuration._
import org.apache.ignite.spi.discovry.tcp.ipfinder.vm.TCPDiscoveryVmIpFinder
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi
import java.util.Arrays
import org.apache.spark.sql._
import org.apache.ignite.cache.query.annotations.QuerySqlField
import scala.annotation.meta.field
import org.apache.ignite._


val ic= new IgniteContext(sc, () => {
    new discoveryspi= new TcpDiscoverySpi()
    val finder= new TCPDiscoveryVmIpFinder()
    finder.setAddresses(Arrays.asList("127.0.0.1:47500")) //and more ip address in my cluster
    discoveryspi.setIpFinder(finder)

    val dataStorage= new DataStorageConfiguration()
    dataStorage.getDefaultDataRegionConfiguration().setMaxSize(16L*1024*1024*1024)  //16GB

    val cfg= new IgniteConfiguration()
    cfg.setDiscoverySpi(discoveryspi)
    cfg.setDataStorageConfiguration(dataStorage)
    cfg}, false)

case class User(
    @(QuerySqlField @field)(index=true) id: Int,
    @(QuerySqlField @field) gender: String,
    @(QuerySqlField @field) marks: Int
) extends Serialisable

val cacheCfg= new CacheConfiguration[Int, User]("sharedRDD")
cacheCfg.setIndexedTypes(classOf[Int], classOf[User])
cacheCfg.setCacheMode(CahceMode.PARTITIONED)

val df2= spark.sql("select * from db_name.User")  //read data from hive table
val df2_rdd= df2.rdd
val data_rdd= df2_rdd.map(x=> User(x.getInt(0), x.getString(1), x.getInt(2)))

val tcache: IgniteRDD[Int, User]= ic.fromCache(cacheCfg)
tcache.savePairs(data_rdd.map(x=> (x.id, x)))

val result= tcache.sql("select * from User u1 left join User u2 on(u1.id=u2.id)") //test query for self join

在我进行自我加入之前,该程序可以正常工作.像从用户限制5中选择*"这样的简单查询可以很好地工作.

This program works fine until I do the self join. Simple queries like "select * from User limit 5" work perfectly fine.

错误日志:

WARN TcpDiscoverySpi:无法连接到任何地址frim IP查找程序(将每2秒重试加入拓扑)

WARN TcpDiscoverySpi: Failed to connect to any addres frim IP finder (will retry to join topology every 2 secs)

WARN TCPCommunicationSpi:连接超时

WARN TCPCommunicationSpi: Connect timed out

WARN YarnSchedulerBackend $ YarnSchedulerEndpoint:容器因超出内存限制而被YARN杀死.考虑提高spark.yarn.executor.memoryOverhead

WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Container killed by YARN for exceeding memory limit. Consider boosting spark.yarn.executor.memoryOverhead

我已经将spark.yarn.executor.memoryOverhead参数增加到2 GB,将执行者内存增加到6GB.但是,考虑到我的数据大小仅为300 MB,我仍然无法弄清我在这里遗失了什么

I had already increased the spark.yarn.executor.memoryOverhead parameter to 2 GB, and executor memeory to 6GB. I am however not able to figure out still what I am missing here, considering that my data size is only a mere 300 MB

推荐答案

  1. 发现Spi像往常一样在Apache Ignite节点的嵌入式模式下工作.因此,无论如何,您都需要正确配置Discovery Spi(尤其是Ip Finder).有关发现节点的更多详细信息,可以找到那里,并选择更适合您的情况的
  2. .>
  3. 如果您不需要在Spark作业中实例化Apache Ignite,只需不要创建IgniteContext对象.
  4. 我认为您的JVM占用了此内存的很大一部分.您需要检查JVM设置.
  1. Discovery Spi working in embedded mode as usual Apache Ignite node. So you need to properly configure Discovery Spi (especially Ip Finder) anyway. More details about node discovering you can found there and choose more applicable for your case.
  2. If you don't need to instance Apache Ignite in spark job, just don't create a IgniteContext object.
  3. I think your JVM consume significant part of this memory. You need to check JVM settings.

这篇关于点燃TCP SPI发现,并在嵌入式中点燃内存管理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆