Spark DataFrame - 最后一个分区收集慢 [英] Spark DataFrame - Last Partition Collect Slow

查看:793
本文介绍了Spark DataFrame - 最后一个分区收集慢的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个Java代码段,它从远程Oracle DB中读取记录(至少65k个记录)。基本上,我们试图将小时过滤器传递给数据帧以便在小时分区x 24上获取记录。



源视图基于具有数百万的记录。



我们面临的问题是,Spark(在YARN或作为SPARK群集)在3分钟内处理24个分区中的22个。最后2个分区正在花费超过5个小时才能完成。



有没有办法使用DataFrames加快速度?

  HashMap< String,String> options = new HashMap<>(); 
sqlContext.setConf(spark.sql.shuffle.partition,50);
options.put(dbtable,(select * from+ VIEW_NAME +where 1 = 1));
options.put(driver,oracle.jdbc.OracleDriver);
options.put(url,JDBC_URL);
options.put(partitionColumn,hrs);
options.put(lowerBound,00);
options.put(upperBound,23);
options.put(numPartitions,24);

DataFrame dk = sqlContext.load(jdbc,options).cache();
dk.registerTempTable(VIEW_NAME);
dk.printSchema();
DateTime dt = new DateTime(2015,5,8,10,0,0);
String s = SQL_DATE_FORMATTER.print(dt);
dt = dt.plusHours(24);
String t = SQL_DATE_FORMATTER.print(dt);
System.out.println(S是+ s +,t是+ t);
流<行> rows = dk.filter(DATETIME> ='+ s +'和DATETIME< ='+ t +').collectAsList()。parallelStream()
System.out.println(Collected+ rows.count());


解决方案

不确定如果这是完整的答案,作为一个工作,如果我们做以下

  dt = dt.plusHours(24).minusSeconds(1)

它更快,但仍然不如前23个分区一样快


I have a Java snippet that reads records from a remote Oracle DB (atleast 65k records). Essentially, we are trying to pass the hourly filter to the dataframe to fetch the records, on an hourly partition x 24.

The source view is based on a table with millions of records.

The problem we are facing is that, Spark (on YARN or as a SPARK cluster) processes 22 out of 24 partitions in under 3 mins. The last 2 partitions are taking more than 5 hours to complete.

Is there any way we can speed this up using DataFrames ?

HashMap<String, String> options = new HashMap<>();
sqlContext.setConf("spark.sql.shuffle.partition", "50");
options.put("dbtable", "( select * from "+VIEW_NAME+" where 1=1)");
options.put("driver", "oracle.jdbc.OracleDriver");
options.put("url", JDBC_URL);
options.put("partitionColumn", "hrs");
options.put("lowerBound", "00");
options.put("upperBound", "23");
options.put("numPartitions", "24");

DataFrame dk = sqlContext.load("jdbc", options).cache();   
dk.registerTempTable(VIEW_NAME);
dk.printSchema();
DateTime dt = new DateTime(2015, 5, 8, 10, 0, 0);
String s = SQL_DATE_FORMATTER.print(dt);
dt = dt.plusHours(24);
String t = SQL_DATE_FORMATTER.print(dt);
System.out.println("S is " + s + "and t is "+ t);
Stream<Row> rows = dk.filter("DATETIME >= '" + s + "' and DATETIME <= '" + t + "'").collectAsList().parallelStream();
    System.out.println("Collected" + rows.count());

解决方案

Not Sure if this is an answer in complete, but as a work around, if we do the following

dt = dt.plusHours(24).minusSeconds(1)

It is faster, but still not a as fast as First 23 partitions

这篇关于Spark DataFrame - 最后一个分区收集慢的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆