Spark执行两次每个动作 [英] Spark is executing every single action two times

查看:92
本文介绍了Spark执行两次每个动作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我创建了一个简单的Java应用程序,该应用程序使用Apache Spark从Cassandra检索数据,对其进行一些转换,然后将其保存在另一个Cassandra表中.

I have created a simple Java application that uses Apache Spark to retrieve data from Cassandra, do some transformation on it and save it in another Cassandra table.

我正在使用以独立群集模式配置的Apache Spark 1.4.1,该主机上有一个主服务器和一个从服务器.

I am using Apache Spark 1.4.1 configured in a standalone cluster mode with a single master and slave, located on my machine.

DataFrame customers = sqlContext.cassandraSql("SELECT email, first_name, last_name FROM customer " +
    "WHERE CAST(store_id as string) = '" + storeId + "'");

DataFrame customersWhoOrderedTheProduct = sqlContext.cassandraSql("SELECT email FROM customer_bought_product " +
    "WHERE CAST(store_id as string) = '" + storeId + "' AND product_id = " + productId + "");

// We need only the customers who did not order the product
// We cache the DataFrame because we use it twice.
DataFrame customersWhoHaventOrderedTheProduct = customers
    .join(customersWhoOrderedTheProduct
    .select(customersWhoOrderedTheProduct.col("email")), customers.col("email").equalTo(customersWhoOrderedTheProduct.col("email")), "leftouter")
    .where(customersWhoOrderedTheProduct.col("email").isNull())
    .drop(customersWhoOrderedTheProduct.col("email"))
    .cache();

int numberOfCustomers = (int) customersWhoHaventOrderedTheProduct.count();

Date reportTime = new Date();

// Prepare the Broadcast values. They are used in the map below.
Broadcast<String> bStoreId = sparkContext.broadcast(storeId, classTag(String.class));
Broadcast<String> bReportName = sparkContext.broadcast(MessageBrokerQueue.report_did_not_buy_product.toString(), classTag(String.class));
Broadcast<java.sql.Timestamp> bReportTime = sparkContext.broadcast(new java.sql.Timestamp(reportTime.getTime()), classTag(java.sql.Timestamp.class));
Broadcast<Integer> bNumberOfCustomers = sparkContext.broadcast(numberOfCustomers, classTag(Integer.class));

// Map the customers to a custom class, thus adding new properties.
DataFrame storeCustomerReport = sqlContext.createDataFrame(customersWhoHaventOrderedTheProduct.toJavaRDD()
    .map(row -> new StoreCustomerReport(bStoreId.value(), bReportName.getValue(), bReportTime.getValue(), bNumberOfCustomers.getValue(), row.getString(0), row.getString(1), row.getString(2))), StoreCustomerReport.class);


// Save the DataFrame to cassandra
storeCustomerReport.write().mode(SaveMode.Append)
    .option("keyspace", "my_keyspace")
    .option("table", "my_report")
    .format("org.apache.spark.sql.cassandra")
    .save();

如您所见,我在cache customersWhoHaventOrderedTheProduct DataFrame中执行了count并调用toJavaRDD.

As you can see I cache the customersWhoHaventOrderedTheProduct DataFrame, after that I execute a countand call toJavaRDD.

根据我的计算,这些操作只能执行一次.但是,当我进入Spark UI进行当前工作时,我看到以下阶段:

By my calculations these actions should be executed only once. But when I go in the Spark UI for the current job I see the following stages:

您可以看到每个动作执行了两次.

As you can see every action is executed twice.

我做错什么了吗?我错过了什么设置吗?

Am I doing something wrong? Is there any setting that I've missed?

任何想法都将受到赞赏.

Any ideas are greatly appreciated.

我打电话给System.out.println(storeCustomerReport.toJavaRDD().toDebugString());

这是调试字符串:

(200) MapPartitionsRDD[43] at toJavaRDD at DidNotBuyProductReport.java:93 []
  |   MapPartitionsRDD[42] at createDataFrame at DidNotBuyProductReport.java:89 []
  |   MapPartitionsRDD[41] at map at DidNotBuyProductReport.java:90 []
  |   MapPartitionsRDD[40] at toJavaRDD at DidNotBuyProductReport.java:89 []
  |   MapPartitionsRDD[39] at toJavaRDD at DidNotBuyProductReport.java:89 []
  |   MapPartitionsRDD[38] at toJavaRDD at DidNotBuyProductReport.java:89 []
  |   ZippedPartitionsRDD2[37] at toJavaRDD at DidNotBuyProductReport.java:89 []
  |   MapPartitionsRDD[31] at toJavaRDD at DidNotBuyProductReport.java:89 []
  |   ShuffledRDD[30] at toJavaRDD at DidNotBuyProductReport.java:89 []
  +-(2) MapPartitionsRDD[29] at toJavaRDD at DidNotBuyProductReport.java:89 []
     |  MapPartitionsRDD[28] at toJavaRDD at DidNotBuyProductReport.java:89 []
     |  MapPartitionsRDD[27] at toJavaRDD at DidNotBuyProductReport.java:89 []
     |  MapPartitionsRDD[3] at cache at DidNotBuyProductReport.java:76 []
     |  CassandraTableScanRDD[2] at RDD at CassandraRDD.scala:15 []
  |   MapPartitionsRDD[36] at toJavaRDD at DidNotBuyProductReport.java:89 []
  |   ShuffledRDD[35] at toJavaRDD at DidNotBuyProductReport.java:89 []
  +-(2) MapPartitionsRDD[34] at toJavaRDD at DidNotBuyProductReport.java:89 []
     |  MapPartitionsRDD[33] at toJavaRDD at DidNotBuyProductReport.java:89 []
     |  MapPartitionsRDD[32] at toJavaRDD at DidNotBuyProductReport.java:89 []
     |  MapPartitionsRDD[5] at cache at DidNotBuyProductReport.java:76 []
     |  CassandraTableScanRDD[4] at RDD at CassandraRDD.scala:15 []


因此,在经过反复试验和研究后,我设法优化了工作.

So after some research combined with trials and errors, I managed to optimize the job.

我从customersWhoHaventOrderedTheProduct创建了一个RDD,并在调用count()操作之前对其进行了缓存. (我将缓存从DataFrame移到了RDD.)

I created an RDD from customersWhoHaventOrderedTheProduct and I cache it before I call the count() action. (I moved the cache from the DataFrame to the RDD).

之后,我使用此RDD来创建storeCustomerReport DataFrame.

After that I use this RDD to create the storeCustomerReport DataFrame.

JavaRDD<Row> customersWhoHaventOrderedTheProductRdd = customersWhoHaventOrderedTheProduct.javaRDD().cache();

现在阶段看起来像这样:

Now the stages look like this:

您可以看到两个countcache现在都消失了,但是仍然有两个'javaRDD'操作.我不知道它们来自哪里,因为在我的代码中只调用了一次toJavaRDD.

As you can see the two count and cache are now gone, but there are still two 'javaRDD' actions. I have no idea where they are coming from, as I call toJavaRDD only once in my code.

推荐答案

您似乎在下面的代码段中应用了两个操作

It looks like you are applying two actions in below code segment

// Map the customers to a custom class, thus adding new properties.
DataFrame storeCustomerReport = sqlContext.createDataFrame(customersWhoHaventOrderedTheProduct.toJavaRDD()
    .map(row -> new StoreCustomerReport(bStoreId.value(), bReportName.getValue(), bReportTime.getValue(), bNumberOfCustomers.getValue(), row.getString(0), row.getString(1), row.getString(2))), StoreCustomerReport.class);


// Save the DataFrame to cassandra
storeCustomerReport.write().mode(SaveMode.Append)
    .option("keyspace", "my_keyspace")

一个在sqlContext.createDataFrame(),另一个在storeCustomerReport.write(),两者都需要customersWhoHaventOrderedTheProduct.toJavaRDD().

One at sqlContext.createDataFrame() and the other at storeCustomerReport.write() and both of these require customersWhoHaventOrderedTheProduct.toJavaRDD().

坚持由RDD产生的RDD应该可以解决此问题.

Persisting the RDD produced by should solve this issue.

JavaRDD cachedRdd = customersWhoHaventOrderedTheProduct.toJavaRDD().persist(StorageLevel.DISK_AND_MEMORY) //Or any other storage level

这篇关于Spark执行两次每个动作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆