为什么我的Spark流媒体应用这么慢? [英] Why is my Spark streaming app so slow?

查看:290
本文介绍了为什么我的Spark流媒体应用这么慢?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个有4个节点的集群:3个Spark节点和1个Solr节点。我的CPU是8核,我的内存是32 GB,磁盘空间是SSD。我使用cassandra作为我的数据库。我的数据量是22GB后6小时,我现在有大约3,400万行,这应该在5分钟内阅读。



但它已经无法在这段时间内完成任务。我的未来计划是在5分钟内读取 100百万行。我不知道我可以增加或做得更好,以实现这个结果,以及实现我的未来目标。这是可能的,还是更好地使用spark的实时分析和使用例如hadoop更长的尾部数据(更老,然后1天或几个小时)?
非常感谢!
这是我的Spark应用程序代码:

  import sys 
import json
from pyspark import SparkConf
从pyspark.streaming导入StreamingContext
从pyspark.sql导入SQLContext,行
从pyspark.streaming.kafka import KafkaUtils
从datetime导入datetime,timedelta
from dateutil.parser import parse
从cassandra.cluster导入集群
import pytz
从dateutil.tz导入tzutc
tz = pytz.timezone('')
appname = str(sys.argv [1])$ ​​b $ b source = str(sys.argv [2])
cluster = Cluster(['localhost']);
session_statis = cluster.connect('keyspace')
def read_json(x):
try:
y = json.loads(x)
except:
y = 0
return y
def TransformInData(x):
try:
body = json.loads(x ['body'])
return 'articles'])
except:
return 0
def axesTransformData(x):
try:
body = json.loads(x ['body'])
return(body)
except:
return 0
def storeDataToCassandra(rdd):
rdd_cassandra = rdd.map(lambda x:(x [0], x [0],x [1] ['thumbnail'],x [1] ['title'],x [1] ['url'],datetime.strptime(parse(x [1] ['created_at'] ),源[x],源[x],源自x,y, ['category'] if x [1] ['category'] else'',x [1] ['channel'],x [1] ['genre']))\
.subtract article)
rdd_article = rdd_cassandra.map(lambda x:Row(id = x [1] [0],source = x [1] [5],thumbnail = x [1] [1],title = x [1] [2],url = x [1] [3],created_at = x [1] [4],category = x [1] [1] [8]))
rdd_schedule = rdd_cassandra.map(lambda x:Row(source = x [1] [5],type ='article',scheduled_for = x [1] [4] + timedelta (分钟= 5),id = x [1] [0]))
rdd_article_by_created_at = rdd_cassandra.map(lambda x:Row(source = x [1] [5],created_at = x [1] ],article = x [1] [0]))
rdd_article_by_url = rdd_cassandra.map(lambda x:Row(url = x [1] [3],article = x [1] b $ b if rdd_article.count()> 0:
result_rdd_article = sqlContext.createDataFrame(rdd_article)
result_rdd_article.write.format(org.apache.spark.sql.cassandra)。options如果rdd_schedule.count()> 0:
result_rdd_schedule = sqlContext.createDataFrame(rdd_schedule)





b result_rdd_schedule.write.format(org.apache.spark.sql.cassandra)。options(table =tablename,keyspace =keyspace)save(mode =append)
如果rdd_article_by_created_at。 count()> 0:
result_rdd_article_by_created_at = sqlContext.createDataFrame(rdd_article_by_created_at)
result_rdd_article_by_created_at.write.format(org.apache.spark.sql.cassandra)。options(table =tablename,如果rdd_article_by_url.count()> 0:
result_rdd_article_by_url = sqlContext.createDataFrame(rdd_article_by_url)
result_rdd_article_by_url.write.format(keyspace =keyspace (org.apache.spark.sql.cassandra)。options(table =tablename,keyspace =keyspace)save(mode =append)

def axesStoreToCassandra ):
axes_rdd = rdd.map(lambda x:Row(article = x [1] ['id'],at = datetime.now(),comments = x [1]如果axes_rdd.count()> 0:
result_axes_rdd = sqlContext,则
= x [1] ['attitudes'],reads = 0,shares = x [1] ['reposts']) createDataFrame(axes_rdd)
result_axes_rdd.write.format(org.apache.spark.sql.cassandra)。options(table =tablename,keyspace =keyspace)。save(mode =append)



def joinstream(rdd):
article_channels = articlestat.join(channels).map(lambda x:(x [1] [0] id'],{'id':x [1] [0] ['id'],'thumbnail':x [1] [0] ['title'],'url':x [1] [0] ['url'],'created_at':x [1] [0] ['created_at'],'source':x [1] [0 ] ['category'],'author':x [1] [1] [1] [1] 1] ['author']}))
speed_rdd = axes.map(lambda x:(x.article,[[x.at,x.comments,x.likes,x.reads,x.shares] ]))\
.reduceByKey(lambda x,y:x + y)\
.map(lambda x:(x [0],sorted(x [1],key = lambda y :y [0],y [0],reverse = True)[0],sorted(x [1],key = lambda y:y [0],reverse = True) 2 else(x [0],sorted(x [1],key = lambda y:y [0],reverse = True) ,reverse = True)[0] [0] -timedelta(seconds = 300),0,0,0,0]))\
.filter(lambda x:(x [1] [0] x [2] [0])。seconds> 0)\
.map(lambda x:(x [0],{'id':x [0],'comments':x [1] 1],'likes':x [1] [2],'reads':x [1] [3] (x [1] [4] -x [2] [4])/((x [1] [0] -x [2] [0])。seconds / 60.0) $ b .filter(lambda x:x [1] ['speed']> = 0)\
.filter(lambda x:x [1] ['shares']> 0)
statistics = article_channels.join(speed_rdd)\
.map(lambda x:{'id':x [1] [0] ['id'],'thumbnail':x [1] [0] ['thumbnail'],'title':x [1] [0] ['title'],'url':x [1] [0] ['url'],'created_at':x [1] [0 ] [x] [x] [x] [x] [x] [x] [x] 0] [1] [1] [1] [1] [1] [1] [1] [1] ['likes'],'读取':x [1] [1] ['reads'],'shares':x [1] [1] ['shares' ]
timeone =(timedelta(hours = 1)
timethree = datetime.now() - timedelta(hours = 3)
b timesix = datetime.now() - timedelta(hours = 6)
timetwelve = datetime.now() - timedelta(hours = 12)
timetwentyfour = datetime.now )
result1 = statistics.filter(lambda x:x ['created_at'] + timedelta(hours = 8)> = timeone).map(lambda x:Row(timespan ='1',source = source, id = x ['id'],title = x ['title'],thumbnail = x ['thumbnail'],url = x ['url'],created_at = x ['created_at'] + timedelta ),genre = x ['genre'],reads = 0,likes = x ['likes'],comments = x ['comments'],shares = x ['shares' ,category = x ['category'],author = x ['author']))
result3 = statistics.filter(lambda x:x ['created_at'] + timedelta(hours = 8)> = timethree和x ['created_at'] + timedelta(小时= 8)<= timeone).map(lambda x:Row(timespan ='3',source = source,id = x ['id'],title = 'title'],thumbnail = x ['thumbnail'],url = x ['url'],created_at = x ['created_at'] + timedelta(hours = 8),genre = x ['genre'],reads = 0,likes = x ['likes'],comments = x ['comments'],shares = x ['shares'],speed = x ['speed'],category = x ['category'],author = x ['author']))
result6 = statistics.filter(lambda x:x ['created_at'] + timedelta(hours = 8)> = timesix和x ['created_at'] + timedelta )< = timethree).map(lambda x:Row(timespan ='6',source = source,id = x ['id'],title = x ['title'],thumbnail = x ['thumbnail'] ,url = x ['url'],created_at = x ['created_at'] + timedelta(hours = 8),genre = x ['genre'],reads = 0,likes = x ['likes'],comments = x ['comments'],shares = x ['shares'],speed = x ['speed'],category = x ['category'],author = x ['author']))
result12 = statistics.filter(lambda x:x ['created_at'] + timedelta(hours = 8)> = timetwelve and x ['created_at'] + timedelta(hours = 8)< = timesix).map (timespan = '12',source = source,id = x ['id'],title = x ['title'],thumbnail = x ['thumbnail'],url = x ['url'],created_at = x ['created_at'] + timedelta(hours = 8),genre = x ['genre'],reads = 0,likes = x ['likes'],comments = x ['comments'],shares = '],speed = x ['speed'],category = x ['category'],author = x ['author']))
result24 = statistics.filter(lambda x:x ['created_at'] + timedelta(hours = 8)> = timetwentyfour and x ['created_at'] + timedelta(hours = 8)<= timetwelve).map(lambda x:Row(timespan ='24',source = source,id = x ['id'],title = x ['title'],thumbnail = x ['thumbnail'],url = x ['url'],created_at = x ['created_at'] + timedelta genre = x ['genre'],reads = 0,likes = x ['likes'],comments = x ['comments'],shares = x ['shares' = x ['category'],author = x ['author']))
if result1.count()> 0:
session_statis.execute('DELETE FROM tablename WHERE source =%s and timespan =%s',(source,'1'))
resultschema1 = sqlContext.createDataFrame(result1)
resultschema1.write.format(org.apache.spark.sql.cassandra)。options (table =tablename,keyspace =keyspace)save(mode =append)
if result3.count()> 0:
session_statis.execute('DELETE FROM tablename WHERE source =%s和timespan =%s',(source,'3'))
resultschema3 = sqlContext.createDataFrame(result3)
resultschema3.write.format(org.apache.spark.sql.cassandra ).options(table =tablename,keyspace =keyspace)save(mode =append)

if result6.count()> 0:
session_statis。 execute('DELETE FROM tablename WHERE source =%s and timespan =%s',(source,'6'))
resultschema6 = sqlContext.createDataFrame(result6)
resultschema6.write.format .apache.spark.sql.cassandra)。选项(table =tablename,keyspace =keyspace)save(mode =append)

if result12.count()> 0:
session_statis.execute('DELETE FROM tablename WHERE source =%s and timespan =%s',(source,'12'))
resultschema12 = sqlContext.createDataFrame(result12)
resultschema12.write.format(org.apache.spark.sql.cassandra)。options(table =tablename,keyspace =keyspace)。save(mode =append)

如果result24.count()> 0:
session_statis.execute('DELETE FROM tablename WHERE source =%s and timespan =%s',(source,'24'))
resultschema24 = sqlContext。 createDataFrame(result24)
resultschema24.write.format(org.apache.spark.sql.cassandra)。options(table =tablename,keyspace =keyspace)。save(mode =append)
conf = SparkConf()。setAppName(appname)
sc = SparkContext(conf = conf)
ssc = StreamingContext(sc,30)
sqlContext = SQLContext $ b channels = sc.cassandraTable(keyspace,tablename)。map(lambda x:(x.id,{'author':x.name,'category':x.category}))
article = sc.cassandraTable('keyspace','tablename')map(lambda x:(x.id,(x.id,x.thumbnail,x.title,x.url,x.created_at + timedelta(hours = 8),source,x.category,x.channel,x.genre))
articlestat = sc.cassandraTable('keyspace','tablename')map(lambda x:(x.channel,{' id':x.id,'thumbnail':x.thumbnail,'title':x.title,'url':x.url,'created_at':x.created_at,'source':x.source,'category' :x.category,'channel':x.channel,'genre':x.genre}))
axes = sc.cassandraTable('keyspace','tablename')
topic ='topic1'
kafkaParams = {metadata.broker.list:localhost:9092}
article_stream = KafkaUtils.createDirectStream(ssc,[topic],kafkaParams)
article_join_stream = article_stream.map x:read_json(x [1]))。filter(lambda x:x!= 0).map(lambda x:TransformInData(x))。filter(lambda x:x!= 0).flatMap a for a in x))。map(lambda x:(x ['id']。encode(utf-8),x))
article_join_stream.transform(storeDataToCassandra).pprint b axes_topic ='topic2'
axes_stream = KafkaUtils.createDirectStream(ssc,[axes_topic],kafkaParams)
axes_join_stream = axes_stream.map(lambda x:read_json(x [1]))filter :x!= 0).map(lambda x:axesTransformData(x))。filter(lambda x:x!= 0).flatMap(lambda x:(a for a in x) (x ['id']),x))
axes_join_stream.transform(axesStoreToCassandra).pprint()
statistics = article_join_stream.map(lambda x:(x [0]))。 * 60,15 * 60)
statistics.transform(joinstream).pprint()
ssc.start()

编辑:
这是似乎消耗大部分时间的阶段。有任何想法吗?



解决方案

乍看起来,您刚刚开始使用spark-submit< your application>



这意味着你使用默认的内存和CPU分配给你的应用程序(在大多数默认情况下是大约1cpu和512MB的RAM)。



这是假设您使用的是YARN,因为您没有提供此信息。



使用适当的资源启动您的应用程序,您会看到改进。



编辑:



我看到你使用了很多lambdas,序列化。
知道当使用对象时,你每次都传递整个对象。



I.E。您使用的是完整的对象 this.value ,而不是 value
要解决这个问题,你可以使用一个局部变量_value = this.value并使用它来继续。



这可能会提高你的速度。 p>

I have a cluster with 4 nodes: 3 Spark nodes and 1 Solr node. My CPU is 8 core, my memory is 32 GB, disc space is SSD. I use cassandra as my database. My data amount is 22GB after 6 hours and I now have around 3,4 Million rows, which should be read in under 5 minutes.

But already it can't complete the task in this amount of time. My future plan is to read 100 Million rows in under 5 minutes. I am not sure what I can increase or do better to achieve this result now as well as to achieve my future goal. Is that even possible or would it be better to use spark for the real time analysis and use for example hadoop for longer tail data (older then 1 day or a couple of hours)? Thanks a lot! Here is my Spark app code:

import sys
import json
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext, Row
from pyspark.streaming.kafka import KafkaUtils
from datetime import datetime, timedelta 
from dateutil.parser import parse 
from cassandra.cluster import Cluster
import pytz
from dateutil.tz import tzutc
tz = pytz.timezone('')
appname = str(sys.argv[1])
source = str(sys.argv[2])
cluster = Cluster(['localhost']);
session_statis = cluster.connect('keyspace')
def read_json(x):
    try:
        y = json.loads(x)
    except:
        y = 0
    return y
def TransformInData(x):
    try:
        body = json.loads(x['body'])
        return (body['articles'])
    except:
        return 0
def axesTransformData(x):
    try:
        body = json.loads(x['body'])
        return (body)
    except:
        return 0
def storeDataToCassandra(rdd):
    rdd_cassandra =rdd.map(lambda x:(x[0],(x[0],x[1]['thumbnail'], x[1]['title'], x[1]['url'], datetime.strptime(parse(x[1]['created_at']).strftime('%Y-%m-%d %H:%M:%S'), "%Y-%m-%d %H:%M:%S"),source, x[1]['category'] if x[1]['category'] else '', x[1]['channel'],x[1]['genre']))) \
                            .subtract(articles)
    rdd_article = rdd_cassandra.map(lambda x:Row(id=x[1][0],source=x[1][5],thumbnail=x[1][1],title=x[1][2],url=x[1][3],created_at=x[1][4],category=x[1][6],channel=x[1][7],genre=x[1][8]))
    rdd_schedule = rdd_cassandra.map(lambda x:Row(source=x[1][5],type='article',scheduled_for=x[1][4]+timedelta(minutes=5),id=x[1][0]))
    rdd_article_by_created_at = rdd_cassandra.map(lambda x:Row(source=x[1][5],created_at=x[1][4],article=x[1][0]))
    rdd_article_by_url = rdd_cassandra.map(lambda x:Row(url=x[1][3],article=x[1][0]))
    if rdd_article.count()>0:
        result_rdd_article = sqlContext.createDataFrame(rdd_article)
        result_rdd_article.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append")
    if rdd_schedule.count()>0:   
        result_rdd_schedule = sqlContext.createDataFrame(rdd_schedule)
        result_rdd_schedule.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append")
    if rdd_article_by_created_at.count()>0:  
        result_rdd_article_by_created_at = sqlContext.createDataFrame(rdd_article_by_created_at)
        result_rdd_article_by_created_at.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append")
    if rdd_article_by_url.count()>0:   
        result_rdd_article_by_url = sqlContext.createDataFrame(rdd_article_by_url)
        result_rdd_article_by_url.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append")
#     
def axesStoreToCassandra(rdd):
    axes_rdd = rdd.map(lambda x:Row(article=x[1]['id'],at=datetime.now(),comments=x[1]['comments'],likes=x[1]['attitudes'],reads=0,shares=x[1]['reposts']))
    if axes_rdd.count()>0:
        result_axes_rdd = sqlContext.createDataFrame(axes_rdd)
        result_axes_rdd.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append")



def joinstream(rdd):
    article_channels = articlestat.join(channels).map(lambda x:(x[1][0]['id'],{'id':x[1][0]['id'],'thumbnail':x[1][0]['thumbnail'],'title':x[1][0]['title'],'url':x[1][0]['url'],'created_at':x[1][0]['created_at'],'source':x[1][0]['source'],'genre':x[1][0]['genre'],'category':x[1][1]['category'],'author':x[1][1]['author']}))
    speed_rdd = axes.map(lambda x:(x.article,[[x.at,x.comments,x.likes,x.reads,x.shares]])) \
                .reduceByKey(lambda x,y:x+y) \
                .map(lambda x:(x[0],sorted(x[1],key=lambda y:y[0],reverse = True)[0],sorted(x[1],key=lambda y:y[0],reverse = True)[1]) if len(x[1])>=2 else (x[0],sorted(x[1],key=lambda y:y[0],reverse = True)[0],[sorted(x[1],key=lambda y:y[0],reverse = True)[0][0]-timedelta(seconds=300),0,0,0,0])) \
                .filter(lambda x:(x[1][0]-x[2][0]).seconds>0) \
                .map(lambda x:(x[0],{'id':x[0],'comments':x[1][1],'likes':x[1][2],'reads':x[1][3],'shares':x[1][4],'speed':int(5*288*((x[1][4]-x[2][4])/((x[1][0]-x[2][0]).seconds/60.0)))})) \
                .filter(lambda x:x[1]['speed']>=0) \
                .filter(lambda x:x[1]['shares']>0)
    statistics = article_channels.join(speed_rdd)  \
                .map(lambda x:{'id':x[1][0]['id'],'thumbnail':x[1][0]['thumbnail'],'title':x[1][0]['title'],'url':x[1][0]['url'],'created_at':x[1][0]['created_at'],'source':x[1][0]['source'],'category':x[1][0]['category'],'author':x[1][0]['author'],'genre':x[1][0]['genre'],'comments':x[1][1]['comments'],'likes':x[1][1]['likes'],'reads':x[1][1]['reads'],'shares':x[1][1]['shares'],'speed':x[1][1]['speed']})
    timeone=datetime.now()-timedelta(hours=1)
    timethree = datetime.now()-timedelta(hours=3)
    timesix = datetime.now()-timedelta(hours=6)
    timetwelve = datetime.now()-timedelta(hours=12)
    timetwentyfour = datetime.now()-timedelta(hours=24)
    result1 = statistics.filter(lambda x:x['created_at']+timedelta(hours=8)>=timeone).map(lambda x:Row(timespan='1',source=source,id=x['id'],title=x['title'],thumbnail=x['thumbnail'],url=x['url'],created_at=x['created_at']+timedelta(hours=8),genre=x['genre'],reads=0,likes=x['likes'],comments=x['comments'],shares=x['shares'],speed=x['speed'],category=x['category'],author=x['author']))
    result3 = statistics.filter(lambda x:x['created_at']+timedelta(hours=8)>=timethree  and x['created_at']+timedelta(hours=8)<=timeone).map(lambda x:Row(timespan='3',source=source,id=x['id'],title=x['title'],thumbnail=x['thumbnail'],url=x['url'],created_at=x['created_at']+timedelta(hours=8),genre=x['genre'],reads=0,likes=x['likes'],comments=x['comments'],shares=x['shares'],speed=x['speed'],category=x['category'],author=x['author']))
    result6 = statistics.filter(lambda x:x['created_at']+timedelta(hours=8)>=timesix  and x['created_at']+timedelta(hours=8)<=timethree).map(lambda x:Row(timespan='6',source=source,id=x['id'],title=x['title'],thumbnail=x['thumbnail'],url=x['url'],created_at=x['created_at']+timedelta(hours=8),genre=x['genre'],reads=0,likes=x['likes'],comments=x['comments'],shares=x['shares'],speed=x['speed'],category=x['category'],author=x['author']))
    result12 = statistics.filter(lambda x:x['created_at']+timedelta(hours=8)>=timetwelve  and x['created_at']+timedelta(hours=8)<=timesix).map(lambda x:Row(timespan='12',source=source,id=x['id'],title=x['title'],thumbnail=x['thumbnail'],url=x['url'],created_at=x['created_at']+timedelta(hours=8),genre=x['genre'],reads=0,likes=x['likes'],comments=x['comments'],shares=x['shares'],speed=x['speed'],category=x['category'],author=x['author']))
    result24 = statistics.filter(lambda x:x['created_at']+timedelta(hours=8)>=timetwentyfour  and x['created_at']+timedelta(hours=8)<=timetwelve).map(lambda x:Row(timespan='24',source=source,id=x['id'],title=x['title'],thumbnail=x['thumbnail'],url=x['url'],created_at=x['created_at']+timedelta(hours=8),genre=x['genre'],reads=0,likes=x['likes'],comments=x['comments'],shares=x['shares'],speed=x['speed'],category=x['category'],author=x['author']))
    if result1.count()>0:
        session_statis.execute('DELETE FROM tablename WHERE source = %s and timespan= %s', (source,'1'))
        resultschema1 = sqlContext.createDataFrame(result1)
        resultschema1.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append")
    if result3.count()>0:   
        session_statis.execute('DELETE FROM tablename WHERE source = %s and timespan= %s', (source,'3'))
        resultschema3 = sqlContext.createDataFrame(result3)
        resultschema3.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append")

    if result6.count()>0:
        session_statis.execute('DELETE FROM tablename WHERE source = %s and timespan= %s', (source,'6'))
        resultschema6 = sqlContext.createDataFrame(result6)
        resultschema6.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append")

    if result12.count()>0:
        session_statis.execute('DELETE FROM tablename WHERE source = %s and timespan= %s', (source,'12'))
        resultschema12 = sqlContext.createDataFrame(result12)
        resultschema12.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append")

    if result24.count()>0:
        session_statis.execute('DELETE FROM tablename WHERE source = %s and timespan= %s', (source,'24'))
        resultschema24 = sqlContext.createDataFrame(result24)
        resultschema24.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append")
conf = SparkConf().setAppName(appname)
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc,30)
sqlContext = SQLContext(sc)
channels = sc.cassandraTable("keyspace","tablename").map(lambda x:(x.id,{'author':x.name,'category':x.category}))
articles = sc.cassandraTable('keyspace','tablename').map(lambda x:(x.id,(x.id,x.thumbnail,x.title,x.url,x.created_at+timedelta(hours=8),source,x.category,x.channel,x.genre)))
articlestat = sc.cassandraTable('keyspace','tablename').map(lambda x:(x.channel,{'id':x.id,'thumbnail':x.thumbnail,'title':x.title,'url':x.url,'created_at':x.created_at,'source':x.source,'category':x.category,'channel':x.channel,'genre':x.genre}))
axes = sc.cassandraTable('keyspace','tablename')
topic = 'topic1'
kafkaParams = {"metadata.broker.list": "localhost:9092"}
article_stream = KafkaUtils.createDirectStream(ssc, [topic], kafkaParams)
article_join_stream=article_stream.map(lambda x:read_json(x[1])).filter(lambda x: x!=0).map(lambda x:TransformInData(x)).filter(lambda x: x!=0).flatMap(lambda x:(a for a in x)).map(lambda x:(x['id'].encode("utf-8") ,x))
article_join_stream.transform(storeDataToCassandra).pprint()
axes_topic = 'topic2'
axes_stream = KafkaUtils.createDirectStream(ssc, [axes_topic], kafkaParams)
axes_join_stream = axes_stream.map(lambda x:read_json(x[1])).filter(lambda x: x!=0).map(lambda x:axesTransformData(x)).filter(lambda x: x!=0).flatMap(lambda x:(a for a in x)).map(lambda x:(str(x['id']),x))
axes_join_stream.transform(axesStoreToCassandra).pprint()
statistics = article_join_stream.map(lambda x:(x[0])).window(15*60,15*60)
statistics.transform(joinstream).pprint()
ssc.start()    

EDIT: This is the stage that seems to consume most time. Any thoughts on that?

解决方案

At first glance it seems that you just start your application with "spark-submit <your application>"

This means you are using the default allocation of memory and CPU's to your application (which is about 1cpu and 512MB of ram in most default cases)

This is assuming you are using YARN since you don't provide info on this.

Start your application with the appropriate resources and you'll see improvements.

Edit:

I see you are using a lot of lambdas, those need to be serialized. Do know that when using objects you are passing around the full object every time.

I.E. you are using the full object this.value and not just value. To fix this, you could use a local variable _value = this.value and use that to proceed.

This might provide you with a speedup.

这篇关于为什么我的Spark流媒体应用这么慢?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆