Pyspark - 将控制权转移出 Spark 会话 (sc) [英] Pyspark - Transfer control out of Spark Session (sc)

查看:81
本文介绍了Pyspark - 将控制权转移出 Spark 会话 (sc)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是关于

的后续问题

Dstream 上的 Pyspark 过滤操作

要计算一天、一小时内出现的错误消息/警告消息的数量 - 人们如何设计工作.

我尝试过的:

from __future__ import print_function导入系统从 pyspark 导入 SparkContext从 pyspark.streaming 导入 StreamingContext定义计数():计数器 += 1打印(计数器.值)如果 __name__ == "__main__":如果 len(sys.argv) != 3:print("用法:network_wordcount.py <主机名> <端口>", file=sys.stderr)退出(-1)sc = SparkContext(appName="PythonStreamingNetworkWordCount")ssc = StreamingContext(sc, 5)计数器 = sc.accumulator(0)行 = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))错误 = lines.filter(lambda l: "error" in l.lower())errors.foreachRDD(lambda e : e.foreach(counts))错误.pprint()ssc.start()ssc.awaitTermination()

然而,这有多个问题,从打印开始不起作用(不输出到标准输出,我已经阅读过它,我在这里可以使用的最好的是日志记录).我可以将该函数的输出保存到一个文本文件中并改为尾随该文件吗?

我不知道为什么程序刚刚出来,没有任何错误/转储可以进一步查看(spark 1.6.2)

如何保存状态?我正在尝试按服务器和严重性聚合日志,另一个用例是通过查找某些关键字来计算处理了多少事务

我想尝试的伪代码:

foreachRDD(Dstream):if RDD.contains("keyword1 | keyword2 | keyword3"):dictionary[keyword] = dictionary.get(keyword,0) + 1//如果不存在就添加关键字并增加计数器打印字典//或将此字典发送到其他地方

发送或打印字典的最后一部分需要切换出火花流上下文 - 有人可以解释一下这个概念吗?

解决方案

无法打印

我建议阅读 设计模式部分.我认为大致你想要的是这样的:

def _process(iter):对于 iter 中的项目:打印项目行 = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))错误 = lines.filter(lambda l: "error" in l.lower())errors.foreachRDD(lambda e : e.foreachPartition(_process))

这将使您的调用 print 工作(尽管值得注意的是,print 语句将在工作人员而不是驱动程序上执行,因此如果您在集群上运行此代码,您只会在工作日志中看到它).

但是,它不会解决您的第二个问题:

<块引用>

如何保存状态?

为此,请查看 updateStateByKey相关示例.

This is a follow up question on

Pyspark filter operation on Dstream

To keep a count of how many error messages/warning messages has come through for say a day, hour - how does one design the job.

What I have tried:

from __future__ import print_function

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext


    def counts():
            counter += 1
            print(counter.value)

    if __name__ == "__main__":

            if len(sys.argv) != 3:
                    print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr)
            exit(-1)


            sc = SparkContext(appName="PythonStreamingNetworkWordCount")
            ssc = StreamingContext(sc, 5)
            counter = sc.accumulator(0)

            lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
            errors = lines.filter(lambda l: "error" in l.lower())
            errors.foreachRDD(lambda e : e.foreach(counts))
            errors.pprint()

            ssc.start()
            ssc.awaitTermination()

this however has multiple issues, to start with print doesn't work (does not output to stdout, I have read about it, the best I can use here is logging). Can I save the output of that function to a text file and tail that file instead?

I am not sure why the program just comes out, there is no error/dump anywhere to look further into (spark 1.6.2)

How does one preserve state? What I am trying is to aggregate logs by server and severity, another use case is to count how many transactions were processed by looking for certain keywords

Pseudo Code for what I want to try:

foreachRDD(Dstream):
     if RDD.contains("keyword1 | keyword2 | keyword3"):
     dictionary[keyword] = dictionary.get(keyword,0) + 1 //add the keyword if not present and increase the counter
     print dictionary //or send this dictionary to else where

The last part of sending or printing dictionary requires switching out of spark streaming context - Can someone explain the concept please?

解决方案

print doesn't work

I would recommend reading the design patterns section of the Spark documentation. I think that roughly what you want is something like this:

def _process(iter):
    for item in iter:
        print item

lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
errors = lines.filter(lambda l: "error" in l.lower())
errors.foreachRDD(lambda e : e.foreachPartition(_process))

This will get your call print to work (though it is worth noting that the print statement will execute on the workers and not the drivers, so if you're running this code on a cluster you will only see it on the worker logs).

However, it won't solve your second problem:

How does one preserve state?

For this, take a look at updateStateByKey and the related example.

这篇关于Pyspark - 将控制权转移出 Spark 会话 (sc)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆