scala相关内容

SparkConext.text文件可以与自定义接收器一起使用吗?

我正在尝试实现一个使用自定义接收器从SQS读取消息的流作业。每封邮件都包含对S3文件的单个引用,然后我希望读取、解析该文件并将其存储为ORC。 以下是我到目前为止拥有的代码: val sc = new SparkContext(conf) val streamContext = new StreamingContext(sc, Seconds(5)) val sqs = stream ..
发布时间:2022-03-29 20:29:21 其他开发

如何在Scala中得到Spark Stream中两个DStream的笛卡尔乘积?

我有两个DStream。让A:DStream[X]和B:DStream[Y]。 我想得到它们的笛卡尔积,换句话说,一个新的C:DStream[(X, Y)] 包含所有X和Y值对。 我知道有一个cartesian函数用于RDDS。我只能找到this similar question,但它是Java版本,因此无法回答我的问题。 推荐答案 链接问题答案的scala等效项(忽略Time ..
发布时间:2022-03-29 20:24:56 其他开发

如何使用mapWithState提取超时会话

我正在更新代码,以便从updateStateByKey切换到mapWithState,以便基于2分钟的超时获得用户会话(2分钟仅用于测试目的)。每个会话应在超时之前聚合会话内的所有流数据(JSON字符串)。 这是我的旧代码: val membersSessions = stream.map[(String, (Long, Long, List[String]))](eventRecor ..
发布时间:2022-03-29 20:23:25 其他开发

Spark 2.0.0打包了SBT-Assembly的流作业缺少Scala运行时方法

在电光流媒体2.0.0作业中使用->,或者使用spark-streaming-kafka-0-8_2.11v2.0.0,并通过spark-submit提交时,出现以下错误: 线程“Main”中的异常org.apache.spk.SparkException:作业因阶段失败而中止:阶段72.0中的任务0失败1次,最近的失败:阶段72.0中丢失的任务0.0(TID37,localhost):ja ..
发布时间:2022-03-29 20:15:57 其他开发

如何在使用Scala读取HDFS目录后删除该目录中的文件?

我使用fileStream从Spark(流上下文)读取HDFS目录中的文件。如果我的Spark在一段时间后关闭并启动,我希望读取目录中的新文件。我不想读取Spark已经读取和处理过的目录中的旧文件。我在此尽量避免重复。 val lines = ssc.fileStream[LongWritable, Text, TextInputFormat]("/home/File") 是否有需要帮助 ..
发布时间:2022-03-29 19:33:04 其他开发

Scala中的切片表示法?

Scala中是否有类似于the slice notation in Python的内容? 我认为这确实是一个有用的操作,应该合并到所有语言中。 推荐答案 scala> import collection.IterableLike import collection.IterableLike scala> implicit def pythonicSlice[A, Repr](col ..
发布时间:2022-03-26 20:12:36 Python

将集合流平面映射到其元素流

我有一个事件序列流,希望将其平面映射到事件流。 我对flatMap函数的语法有问题 val stream = DataStream[Seq[Event]] stream.flatMap(???) 如有任何帮助,我们将不胜感激 推荐答案 我建议您查看Flink附带的示例,例如wordcount application: val counts: DataStream ..
发布时间:2022-03-15 12:13:42 其他开发

闪烁中的类型信息

我在以JSON格式将数据从Flink发送到Kafka主题的位置有一个管道。我也能够从Kafka主题中获得它,也能够获得JSON属性。现在,像scala reflect类(我还可以在运行时比较数据类型)一样,我试图使用TypeInformation在Fink中做同样的事情,在那里我可以设置一些预定义的格式,从主题读取的任何数据都应该在这个Validation下,并且应该相应地传递或失败。 我有如下数 ..
发布时间:2022-03-15 11:43:33 其他开发

Slick使用的连接多于线程的情况

我对SLICK文档中的这句话感到困惑: 对事务内的非数据库操作进行排序时,Slick将使用比池中线程更多的连接。 这是否意味着Slick打开一个事务,然后使用不同的数据库连接(在不同的线程中)在该事务中执行操作?我说得对吗?我从未想过可以在多个连接之间保持事务打开。 推荐答案 似乎我的理解是错误的。 假设浮动线程池大小为1。我们有一个事务1,它运行两个查询A和B。 ..
发布时间:2022-03-09 15:06:13 其他开发

如何使用反压溢出策略创建Akka-Http客户端?

我有未定数量的Akka-http客户端流从http服务下载数据。我使用Akka-http主机级连接池,因为我希望自定义池,因为有长时间运行的请求通过它。 因为客户端的数量是未定义的和动态的,所以我不知道如何配置连接池(max-open-request/max-connections)。此外,我可能希望连接池较小(少于客户端数量),以不损害带宽。 因此,我想设置一个客户端流,以便对池的新 ..
发布时间:2022-02-27 18:51:36 其他开发

让Scalatron工作(操作码有问题)

我是Scala的新手。为了学习Scala,我最近下载了Scalatron。我已经有了用于IntelliJ的Scala插件,所以我认为设置Scalatron最简单的选择是在IntelliJ中创建bot,并将ScalatronBot.jar添加到Scalatron插件目录。当我开始遇到问题时,我开始学习Scalatron教程中的基本示例。以下基本代码在我的机器人上运行良好: class Cont ..
发布时间:2022-02-26 09:15:23 其他开发

如何在Apache POI 5.1.0中使用数组溢出

使用Apache POI生成Excel文件,是否可以阻止Excel在公式中添加implicit intersection operator(@)? 例如,使用以下代码,我想做的是使用Excel Array Spilling behaviour将列中的所有值从A复制到K。但是,使用Excel Desktop(版本16.54)打开文件时,它会自动在公式中添加@运算符。 在工作簿sheet工作表 ..
发布时间:2022-02-21 16:15:37 Java开发