在序列化的Spark中使用maxmind geoip [英] using maxmind geoip in spark serialized

查看:200
本文介绍了在序列化的Spark中使用maxmind geoip的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试将MaxMind GeoIP API用于scala-spark,发现它 https://github.com/snowplow/scala-maxmind-iplookups .我使用标准文件加载文件:

I am trying to use the MaxMind GeoIP API for scala-spark which is found https://github.com/snowplow/scala-maxmind-iplookups. I load in the file using standard:

val ipLookups = IpLookups(geoFile = Some("GeoLiteCity.dat"), memCache = false, lruCache = 20000)

我加载了一个基本的csv文件,其中包含时间和IP地址:

I have a basic csv file which I load in that contains time and IP adresses:

val sweek1 = week1.map{line=> IP(parse(line))}.collect{
  case Some(ip) => {
    val ipadress = ipdetect(ip.ip)
    (ip.time, ipadress)
    }
}

ipdetect函数的基本定义是:

The function ipdetect is basically defined by:

def ipdetect(a:String)={
  ipLookups.performLookups(a)._1 match{
    case Some(value) => value.toString
    case _ => "Unknown"
  }
}

当我运行该程序时,它提示任务不可序列化".所以我读了几篇文章,似乎有一些解决方法.

When I run this program, it prompt that "Task not serializable". So I read a few posts and there seem to be a few ways around this.

1,包装器 2,使用 SparkContext.addFile (可在整个群集中分发文件)

1, a wrapper 2, using SparkContext.addFile (which distribute file across cluster)

但是我无法弄清楚它们中的任何一个如何工作,我尝试了包装器,但是我不知道如何以及在何处调用它. 我尝试了addFile,但是它返回的是Unit而不是String,我认为您需要以某种方式通过管道传输Binary文件.所以我不确定现在该怎么办.任何帮助都非常感激

but I cannot work out how either one of them works, I tried the wrapper, but I don't know how and where to call it. I tried addFile, but it returns a Unit instead of String, which I assume you will need to somehow pipe the Binary file. So I am not sure about what to do now. Any help is much appreciated

因此我已经能够通过使用mapPartitions对其进行某种程度的序列化,并在每个本地分区上进行迭代,但是我想知道是否存在一种更有效的方法,因为我拥有数百万的数据集

So I have been able to somewhat serialize it by using mapPartitions and iterate over each local partition, but I wonder if there is a more efficient way to do this as I have dataset in the range of millions

推荐答案

假定您的csv文件每行包含一个IP地址,例如,您要将每个IP地址映射到一个城市.

Assume that your csv file contains an IP address per line, and for example, you want to map each ip address to a city.

import com.snowplowanalytics.maxmind.iplookups.IpLookups

val geoippath = "path/to/geoip.dat"
val sc = new SparkContext(new SparkConf().setAppName("IP Converter"))
sc.addFile(geoippath)

def parseIP(ip:String, ipLookups: IpLookups): String = {
  val lookupResult = ipLookups.performLookups(ip)
  val city = lookupResult._1.map(_.city).getOrElse(None).getOrElse("")
}

val logs = sc.textFile("path/to/your.csv")
             .mapWith(_ => IpLookups(geoFile = Some(SparkFiles.get("geoip.dat"))))(parseIP)

有关其他IP转换,请参考 Scala MaxMind IP查找. 此外,似乎不推荐使用mapWith.改为使用mapPartitionsWithIndex.

For other ip transformation, please refer to Scala MaxMind IP Lookups. Furthermore, mapWith seems to be deprecated. Use mapPartitionsWithIndex instead.

这篇关于在序列化的Spark中使用maxmind geoip的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆