在序列化的Spark中使用maxmind geoip [英] using maxmind geoip in spark serialized
问题描述
我正在尝试将MaxMind GeoIP API用于scala-spark,发现它 https://github.com/snowplow/scala-maxmind-iplookups .我使用标准文件加载文件:
I am trying to use the MaxMind GeoIP API for scala-spark which is found https://github.com/snowplow/scala-maxmind-iplookups. I load in the file using standard:
val ipLookups = IpLookups(geoFile = Some("GeoLiteCity.dat"), memCache = false, lruCache = 20000)
我加载了一个基本的csv文件,其中包含时间和IP地址:
I have a basic csv file which I load in that contains time and IP adresses:
val sweek1 = week1.map{line=> IP(parse(line))}.collect{
case Some(ip) => {
val ipadress = ipdetect(ip.ip)
(ip.time, ipadress)
}
}
ipdetect函数的基本定义是:
The function ipdetect is basically defined by:
def ipdetect(a:String)={
ipLookups.performLookups(a)._1 match{
case Some(value) => value.toString
case _ => "Unknown"
}
}
当我运行该程序时,它提示任务不可序列化".所以我读了几篇文章,似乎有一些解决方法.
When I run this program, it prompt that "Task not serializable". So I read a few posts and there seem to be a few ways around this.
1,包装器 2,使用 SparkContext.addFile (可在整个群集中分发文件)
1, a wrapper 2, using SparkContext.addFile (which distribute file across cluster)
但是我无法弄清楚它们中的任何一个如何工作,我尝试了包装器,但是我不知道如何以及在何处调用它. 我尝试了addFile,但是它返回的是Unit而不是String,我认为您需要以某种方式通过管道传输Binary文件.所以我不确定现在该怎么办.任何帮助都非常感激
but I cannot work out how either one of them works, I tried the wrapper, but I don't know how and where to call it. I tried addFile, but it returns a Unit instead of String, which I assume you will need to somehow pipe the Binary file. So I am not sure about what to do now. Any help is much appreciated
因此我已经能够通过使用mapPartitions对其进行某种程度的序列化,并在每个本地分区上进行迭代,但是我想知道是否存在一种更有效的方法,因为我拥有数百万的数据集
So I have been able to somewhat serialize it by using mapPartitions and iterate over each local partition, but I wonder if there is a more efficient way to do this as I have dataset in the range of millions
推荐答案
假定您的csv文件每行包含一个IP地址,例如,您要将每个IP地址映射到一个城市.
Assume that your csv file contains an IP address per line, and for example, you want to map each ip address to a city.
import com.snowplowanalytics.maxmind.iplookups.IpLookups
val geoippath = "path/to/geoip.dat"
val sc = new SparkContext(new SparkConf().setAppName("IP Converter"))
sc.addFile(geoippath)
def parseIP(ip:String, ipLookups: IpLookups): String = {
val lookupResult = ipLookups.performLookups(ip)
val city = lookupResult._1.map(_.city).getOrElse(None).getOrElse("")
}
val logs = sc.textFile("path/to/your.csv")
.mapWith(_ => IpLookups(geoFile = Some(SparkFiles.get("geoip.dat"))))(parseIP)
有关其他IP转换,请参考 Scala MaxMind IP查找.
此外,似乎不推荐使用mapWith
.改为使用mapPartitionsWithIndex
.
For other ip transformation, please refer to Scala MaxMind IP Lookups.
Furthermore, mapWith
seems to be deprecated. Use mapPartitionsWithIndex
instead.
这篇关于在序列化的Spark中使用maxmind geoip的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!