星火作业服务器与Java [英] Spark Job Server with Java

查看:209
本文介绍了星火作业服务器与Java的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用的火花蒙山java的,我想用火花作业的服务器。为了这个,我遵循了所有在此链接:
https://github.com/spark-jobserver/spark-jobserver

这是我的项目中的Scala类:

 进口_root_.spark.jobserver.SparkJob
进口_root_.spark.jobserver.SparkJobValid
进口_root_.spark.jobserver.SparkJobValidation
进口com.typesafe.config._进口org.apache.spark._
进口org.apache.spark.api.java.JavaSparkContext
进口spark.jobserver {SparkJob,SparkJobValid,SparkJobValidation}反对JavaWord扩展SparkJob {
   高清主(参数:数组[字符串]){
   VAL CTX =新SparkContext(本地[4],JavaWordCount)
   VAL配置= ConfigFactory.parseString()   VAL结果= runJob(CTX,配置)
   }   覆盖高清验证(SC:SparkContext,配置:配置):SparkJobValidation = {
      SparkJobValid;
   }   覆盖高清runJob(SC:SparkContext,配置:配置):任何= {
      VAL JSC =新JavaSparkContext(SC)
      VAL J =新JavaCount()
      返回j.Mafonction(JSC:JavaSparkContext)
   }
  }

和Java类字wount

 进口org.apache.spark.api.java.JavaPairRDD;
进口org.apache.spark.api.java.JavaSparkContext;
进口org.apache.spark.api.java.function.Function2;
进口org.apache.spark.api.java.function.PairFunction;
进口scala.Tuple2;
的Bean;
进口java.util.Arrays中;
进口的java.util.regex.Pattern;公共final类JavaCount实现Serializable {
公共静态对象的主要(字串[] args)抛出异常{    返回null;
}公共对象Mafonction(JavaSparkContext SC){
    字符串s =A A A A B B C A;
    JavaPairRDD<字符串,整数>线= sc.parallelize(Arrays.asList(s.split()))mapToPair(新PairFunction<字符串,字符串,整数>(){
        @覆盖
        公共Tuple2<字符串,整数>调用(String s)将{
            返回新Tuple2<字符串,整数>(S,1);
        }
    })reduceByKey(新功能2<整数,整数,整数GT;(){
        @覆盖
        公共电话整数(整数I1,I2整数){
            返回I1 + I2;
        }
    });
    返回lines.collect();
}
}

但是,当我执行它,我得到卷曲:(52)从服务器空回复此错误火花作业服务器:

 >作业服务器[错误]从线程未捕获的错误[JobServer-akka.actor.default-调度-13]因为akka.jvm出境上致命错误为ActorSystem [JobServer]启用关闭JVM
作业服务器[错误] java.lang.IncompatibleClassChangeError:实现类
作业服务器[错误]在java.lang.ClassLoader.defineClass1(本机方法)
作业服务器[错误]在需要java.lang.ClassLoader.defineClass(ClassLoader.java:800)
作业服务器[错误]在java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
作业服务器[错误]在java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
作业服务器[错误]在java.net.URLClassLoader.access $ 100(URLClassLoader.java:71)
作业服务器[错误]在java.net.URLClassLoader的$ 1.run(URLClassLoader.java:361)
作业服务器[错误]在java.net.URLClassLoader的$ 1.run(URLClassLoader.java:355)
作业服务器[错误]在java.security.AccessController.doPrivileged(本机方法)
作业服务器[错误]在java.net.URLClassLoader.findClass(URLClassLoader.java:354)
作业服务器[错误]在java.lang.ClassLoader.loadClass(ClassLoader.java:425)
作业服务器[错误]在java.lang.ClassLoader.loadClass(ClassLoader.java:358)作业服务器[错误]在spark.jobserver.JobManagerActor$$anonfun$spark$jobserver$JobManagerActor$$getJobFuture$4.apply(JobMan agerActor.scala:222)
作业服务器[错误]在scala.concurrent.impl.Future $ PromiseCompletingRunnable.liftedTree1 $ 1(Future.scala:24)
作业服务器[错误]在scala.concurrent.impl.Future $ PromiseCompletingRunnable.run(Future.scala:24)
作业服务器[错误]在akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:42)
作业服务器[错误]在akka.dispatch.ForkJoinExecutorConfigurator $ AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
作业服务器[错误]在scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
作业服务器[错误]在scala.concurrent.forkjoin.ForkJoinPool $ WorkQueue.runTask(ForkJoinPool.java:1339)
作业服务器[错误]在scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
作业服务器[错误]在scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
任务服务器...完成了退出code 255


解决方案

我解决了这一问题。我只是删除所有的contenent的/ tmp /火花JobServe和我重新编译JobServer和它的作品^^非常感谢你的帮助。

I'm using spark whith java, and i want to use spark Job-Server. For this i followed all in this link : https://github.com/spark-jobserver/spark-jobserver

This is the scala class in my project :

import _root_.spark.jobserver.SparkJob
import _root_.spark.jobserver.SparkJobValid
import _root_.spark.jobserver.SparkJobValidation
import com.typesafe.config._

import org.apache.spark._
import org.apache.spark.api.java.JavaSparkContext
import spark.jobserver.{SparkJob, SparkJobValid, SparkJobValidation}

object JavaWord extends SparkJob {
   def main(args: Array[String]) {
   val ctx = new SparkContext("local[4]", "JavaWordCount")
   val config = ConfigFactory.parseString("")

   val results = runJob(ctx, config)
   }

   override def validate(sc: SparkContext, config: Config): SparkJobValidation = {
      SparkJobValid;
   }

   override def runJob(sc: SparkContext, config: Config): Any = {
      val jsc = new JavaSparkContext(sc)
      val j = new JavaCount()
      return j.Mafonction(jsc: JavaSparkContext)
   }
  }

And the Java class "word wount"

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.io.Serializable;
import java.util.Arrays;  
import java.util.regex.Pattern;



public final class JavaCount implements Serializable {
public static Object main(String[] args) throws Exception {

    return null;
}

public Object Mafonction(JavaSparkContext sc){
    String s= "a a a a b b c a";
    JavaPairRDD<String, Integer>  lines = sc.parallelize(Arrays.asList(s.split(" "))).mapToPair(new PairFunction<String, String, Integer>() {
        @Override
        public Tuple2<String, Integer> call(String s) {
            return new Tuple2<String, Integer>(s, 1);
        }
    }).reduceByKey(new Function2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer i1, Integer i2) {
            return i1 + i2;
        }
    });
    return lines.collect();
}
}

But when i execute it i got curl: (52) Empty reply from server with this error in spark job-server:

> job-server[ERROR] Uncaught error from thread [JobServer-akka.actor.default-dispatcher-13]       shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[JobServer]
job-server[ERROR] java.lang.IncompatibleClassChangeError: Implementing class
job-server[ERROR]   at java.lang.ClassLoader.defineClass1(Native Method)
job-server[ERROR]   at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
job-server[ERROR]   at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
job-server[ERROR]   at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
job-server[ERROR]   at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
job-server[ERROR]   at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
job-server[ERROR]   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
job-server[ERROR]   at java.security.AccessController.doPrivileged(Native Method)
job-server[ERROR]   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
job-server[ERROR]   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
job-server[ERROR]   at java.lang.ClassLoader.loadClass(ClassLoader.java:358)

job-server[ERROR]   at      spark.jobserver.JobManagerActor$$anonfun$spark$jobserver$JobManagerActor$$getJobFuture$4.apply(JobMan    agerActor.scala:222)
job-server[ERROR]   at      scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
job-server[ERROR]   at     scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
job-server[ERROR]   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:42)
job-server[ERROR]   at    akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
job-server[ERROR]   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
job-server[ERROR]   at    scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
job-server[ERROR]   at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
job-server[ERROR]   at     scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
job-server ... finished with exit code 255

解决方案

i fixed the problem. I just deleted all the the contenent of /tmp/Spark-JobServe and i recompiled the JobServer and It works ^^ Thank you so much for your help

这篇关于星火作业服务器与Java的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆