Dstream 上的 Pyspark 过滤操作 [英] Pyspark filter operation on Dstream

查看:60
本文介绍了Dstream 上的 Pyspark 过滤操作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直在尝试扩展网络字数,以便能够根据特定关键字过滤行

I have been trying to extend the network word count to be able to filter lines based on certain keyword

我使用的是 spark 1.6.2

I am using spark 1.6.2

from __future__ import print_function

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr)
        exit(-1)
    sc = SparkContext(appName="PythonStreamingNetworkWordCount")
    ssc = StreamingContext(sc, 5)

    lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
    counts = lines.flatMap(lambda line: line.split(" ")).filter("ERROR")
    counts.pprint()

    ssc.start()
    ssc.awaitTermination()

我已经尝试了所有的变化,

I have tried all the variations,

我几乎总是收到错误,我无法应用类似的功能

I almost always get the error I cannot apply functions like

在 TransformedDStream 上的 pprint/show/take/collect

pprint/show/take/collect on TransformedDStream

.我在行 Dstream 上使用了带有 foreachRDD 的转换和一个函数来检查使用本机 python 字符串方法,这也失败了(实际上,如果我在程序的任何地方使用打印,spark-submit 就会出现 - 没有报告错误.

. I used transform with foreachRDD on lines Dstream with a function to check using native python string methods, that fails too (actually if I use print anywhere in the program, spark-submit just comes out - there are no errors reported.

我想要的是能够过滤传入的 Dstreams 像ERROR"这样的关键字 |警告"等并将其输出到标准输出或标准错误.

What I want to is to be able to filter the incoming Dstreams on a keyword like "ERROR" | "WARNING" etc and output it to stdout or stderr.

推荐答案

我想要的是能够过滤传入的 Dstreams 像ERROR"这样的关键字 |警告"等并将其输出到标准输出或标准错误.

What I want to is to be able to filter the incoming Dstreams on a keyword like "ERROR" | "WARNING" etc and output it to stdout or stderr.

然后您不想调用 flatMap,因为这会将您的行拆分为单独的标记.相反,您可以使用对 filter 的调用替换该调用,该调用检查该行是否包含 "error":

Then you don't want to call flatMap, as this will split your lines up into individual tokens. Instead, you can replace that call with a call to filter that checks whether the line contains "error":

lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
errors = lines.filter(lambda l: "error" in l.lower())
errors.pprint()

这篇关于Dstream 上的 Pyspark 过滤操作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆