星火 - 能否在Multimap之转换为Java中的一个数据帧 [英] Spark - Can a MultiMap be converted to a DataFrame in JAVA
问题描述
我想一个多重映射转换百亿数据值到火花数据框上运行,然后将结果写入一个卡桑德拉表计算。
I'm trying to convert a MultiMap of billions of data values to a Spark DataFrame to run calculations on then write the results to a cassandra table.
我生成从以下卡桑德拉查询和环multimap中。我很乐意采取的建议是否会有一个更好的方式来获得这个数据处理成数据帧像我与循环。
I generate the multimap from the following cassandra query and loop. I'd be happy to take suggestions if there would be a better way to get and manipulate this data into a DataFrame like I am with the loop.
code更新了答案:
//Build ResultSet from cassandra query for data manipulation.
Statement stmt = new SimpleStatement("SELECT \"Power\",\"Bandwidth\",\"Start_Frequency\" FROM \"SB1000_49552019\".\"Measured_Value\";");
//Statement stmt = new SimpleStatement("SELECT power, bandwidth, start_frequency FROM model.reports;");
stmt.setFetchSize(1000);
ResultSet results = session.execute(stmt);
// Get the Variables from each Row of Cassandra Data
Multimap<Double, Float> data = LinkedListMultimap.create();
for (Row row : results){
// Column Names in Cassandra (Case Sensitive)
start_frequency = row.getDouble("Start_Frequency");
power = row.getFloat("Power");
bandwidth = row.getDouble("Bandwidth");
// Create Channel Power Buckets, place information into prepared statement binding, write to cassandra.
for(channel = 1.6000E8; channel <= channel_end; ){
if( (channel >= start_frequency) && (channel <= (start_frequency + bandwidth)) ) {
data.put(channel, power);
} // end if
channel+=increment;
} // end for
} // end "row" for
// Create Spark List for DataFrame
List<Value> values = data.asMap().entrySet()
.stream()
.flatMap(x -> x.getValue()
.stream()
.map(y -> new Value(x.getKey(), y)))
.collect(Collectors.toList());
// Create DataFrame and Calculate Results
sqlContext.createDataFrame(sc.parallelize(values), Value.class).groupBy(col("channel"))
.agg(min("power"), max("power"), avg("power"))
.write().mode(SaveMode.Append)
.option("table", "results")
.option("keyspace", "model")
.format("org.apache.spark.sql.cassandra").save();
} // end session
} // End Compute
public class Value implements Serializable {
public Value(Double channel, Float power) {
this.channel = channel;
this.power = power;
}
Double channel;
Float power;
public void setChannel(Double channel) {
this.channel = channel;
}
public void setPower(Float power) {
this.power = power;
}
public Double getChannel() {
return channel;
}
public Float getPower() {
return power;
}
@Override
public String toString() {
return "[" +channel +","+power+"]";
}
}
样品multimap中具有类型{双} = [浮点]其中可能有对于每个双多个浮子项
The sample multimap has the types {Double}=[Float] where there may be multiple Float items for each Double
示例
{1.50E8=[10, 20], 1.51E8=[-10, -13, -14, -15], 1.52E8=[-10, -11]
我需要使用火花来获得最小值,最大值,平均每个这些的。例如,对于第一个1.50ED将分10,最高20,平均15。
I need to use spark to get the min, max, average of each of these. For example for the first one 1.50ED would be min 10, max 20, avg 15.
我已经有code,我可以用一次,我可以在一个不是Temptable得到它和运行在一个数据框:
I already have the code that I can use once I can get it in a temptable and operated on as a dataframe:
queryMV.groupBy(col("channel"))
.agg(min("power"), max("power"), avg("power"))
.write().mode(SaveMode.Append)
.option("table", "results")
.option("keyspace", "model")
.format("org.apache.spark.sql.cassandra").save();
我将是如何将多重映射转换为使用Java数据帧的一些技巧感激。我一直没能找到有关使用屈德宁火花任何documenation。
I would be grateful for some tips on how to convert the multimap into a DataFrame using JAVA. I haven't been able to find any documenation about using multimaps with spark.
即时通讯目前使用的解决方案,做初步的查询,并在for循环的原始数据写入到一个新表,我又可以直接映射到一个不是Temptable /数据帧而是花费太多的时间,因为我不得不写了数十亿计算的前行卡桑德拉。我想用一个多重映射或类似的东西,并直接转化为火花计算。
Im currently using a solution that does the initial query and with the for loop writes the raw data to a new table that I can in turn directly map to a temptable / dataframe but that takes to much time since I have to write billions of rows to cassandra before calculating. I'd like to use a multimap or something similar and convert directly to spark for calculation.
推荐答案
唉了Java 并行化
方法将两种 T的名单
或 parallelizePairs
元组LT; K,V&GT;
。所以,你需要转换。而 createDataFrame
只能RDDS和Scala的 SEQ
,需要一个架构(无论是豆或StructType)。
Alas the Java parallelize
method takes either a list of T
or for parallelizePairs
a list of Tuple<K, V>
. So you will need to convert. While the createDataFrame
only works of RDDs and Scala Seq
and needs a schema (either a bean or a StructType).
要使它更有趣 com.google.common.collect.ImmutableEntry
不是序列化的,所以你需要在Java中转换,所以Java的ficated版本@Pankaj阿罗拉解决方案是行不通的,除非你移动了转换逻辑转换成Java。即
To make it Even More Fun com.google.common.collect.ImmutableEntry
is not serializable, so you need to convert in Java, so a Java-ficated version of @Pankaj Arora solution would not work unless you moved the conversion logic into Java. I.e.
public class Value implements Serializable {
public Value(Double a, Float b) {
this.a = a;
this.b = b;
}
Double a;
Float b;
public void setA(Double a) {
this.a = a;
}
public void setB(Float b) {
this.b = b;
}
public Double getA() {
return a;
}
public Float getB() {
return b;
}
public String toString() {
return "[" +a +","+b+"]";
}
}
Multimap<Double, Float> data = LinkedListMultimap.create();
data.put(1d, 1f);
data.put(1d, 2f);
data.put(2d, 3f);
List<Value> values = data.asMap().entrySet()
.stream()
.flatMap(x -> x.getValue()
.stream()
.map(y -> new Value(x.getKey(), y)))
.collect(Collectors.toList());
sqlContext.createDataFrame(sc.parallelize(values), Value.class).show();
由于您的编辑我想看看离断创建对象(而不是多重映射)。
Given your edit I'd look at creating objects (rather than a multimap) from the off.
这篇关于星火 - 能否在Multimap之转换为Java中的一个数据帧的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!