在 pyspark 中执行 NLTK [英] Perform NLTK in pyspark
问题描述
我对 pyspark 非常陌生,我开发了一个程序来对 HDFS 文件执行 NLTK,以下是执行步骤.我使用的是 spark 2.3.1
I am very new in pyspark and I have developed a program to perform NLTK on HDFS file, The following are the steps for that.I'm using spark 2.3.1
1.从 HDFS 获取文件
2.执行词形还原
3.删除标点符号.
4.将 RDD 转换为 DataFrame
5.执行分词器
6.删除停用词
7.分解列数据为每条记录创建唯一的行
8.我想将所有文件数据保存到一个文件中,因此我将输出与旧文件合并
9.现在将整个合并输出写入 HDFS
10.然后删除旧文件并将火花创建的文件重命名为不同的名称
11.我正在为所有 bigram 和 trigram 文件执行此操作.
这是我的 pyspark 代码.
Here is my pyspark code.
%pyspark
import os
import pyspark
import csv
import nltk
import json
import string
import re
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.ml.feature import NGram
from pyspark import SparkContext, SparkConf as sc
from pyspark.sql.types import StringType
from nltk.corpus import stopwords
nltk.download('stopwords')
from pyspark.sql import SQLContext
from pyspark.sql.functions import explode,regexp_replace
import pandas
import hdfs
nltk.download('punkt')
from nltk.stem import WordNetLemmatizer
nltk.download('wordnet')
from pyspark import SparkContext, SparkConf
# conf = SparkConf().setAppName("PySpark App")
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)
hdfs_dst_dir = "/user/zeppelin/achyuttest.csv/"
counter=0
#Lemmatizen
def lemma(x):
lemmatizer = WordNetLemmatizer()
return lemmatizer.lemmatize(x)
for i in range(1,50001):
data = sc.textFile('hdfs:///user/spark/Patentdata/ElectronicsPatents/Link\ {}/abstract.txt'.format(i), use_unicode=False)
print(type(data))
if data.isEmpty():
continue
else:
lem_words = data.map(lemma)
list_punct=list(string.punctuation)
len_list = lem_words.collect()
test_str = len_list[0]
test_df = test_str.split(' ')
data_df = data.map(lambda x: (x, )).toDF(['lem_words'])
# Perform Tokenizer
tokenizer = Tokenizer(inputCol="lem_words", outputCol="tokenized_data")
outputdata = tokenizer.transform(data_df)
outputdata = outputdata.select('tokenized_data')
# Remove stop words
remover = StopWordsRemover(inputCol='tokenized_data', outputCol='words_clean')
outputdata = remover.transform(outputdata).select('words_clean')
#Explode one Row into multiple Row with value
result_df = outputdata.withColumn("exploded", explode("words_clean")).select("exploded")
result_df=result_df.select(regexp_replace('exploded',"[^a-zA-Z\\s]",""))
print("Link ========>",i)
#Merge with old output
if counter>0:
old_data = sc.textFile('hdfs:///user/zeppelin/achyuttest.csv/unigram.csv', use_unicode=False)
old_data_df = old_data.map(lambda x: (x, )).toDF(['words_clean'])
result_df = old_data_df.union(result_df)
else:
pass
#Write DataFrame to HDFS
result_df.coalesce(1).write.mode('append').csv(hdfs_dst_dir)
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
# Rename file
#list files in the directory
list_status = fs.listStatus(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir))
#filter name of the file starts with part-
print("Get FileName")
file_name = [file.getPath().getName() for file in list_status if file.getPath().getName().startswith('part-')][0]
print(file_name)
#rename the file
new_filename = "unigram.csv"
# Remove Old file
fs.delete(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir+''+new_filename))
fs.rename(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir+''+file_name),spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir+''+new_filename))
## Bigrams
bigram = NGram(n=2, inputCol="words_clean", outputCol="bigrams")
bigramDataFrame = bigram.transform(outputdata)
#Explode one Row into multiple Row with value
result_df = bigramDataFrame.withColumn("exploded", explode("bigrams")).select("exploded")
result_df=result_df.select(regexp_replace('exploded',"[^a-zA-Z\\s]",""))
#Merge with old output
if counter>0:
old_data = sc.textFile('hdfs:///user/zeppelin/achyuttest.csv/bigram.csv', use_unicode=False)
old_data_df = old_data.map(lambda x: (x, )).toDF(["exploded"])
result_df = old_data_df.union(result_df)
else:
pass
# Write Output in file
result_df.coalesce(1).write.mode('append').csv('hdfs:///user/zeppelin/achyuttest.csv')
# Rename file
#list files in the directory
list_status = fs.listStatus(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir))
#filter name of the file starts with part-
file_name = [file.getPath().getName() for file in list_status if file.getPath().getName().startswith('part-')][0]
#rename the file
new_filename = "bigram.csv"
fs.delete(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir+''+new_filename))
fs.rename(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir+''+file_name),spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir+''+new_filename))
## TriGram
trigram = NGram(n=3, inputCol="words_clean", outputCol="trigrams")
trigramDataFrame = trigram.transform(outputdata)
#Explode one Row into multiple Row with value
result_df = trigramDataFrame.withColumn("exploded", explode("trigrams")).select("exploded")
result_df=result_df.select(regexp_replace('exploded',"[^a-zA-Z\\s]",""))
#Merge with old output
if counter>0:
old_data = sc.textFile('hdfs:///user/zeppelin/achyuttest.csv/trigram.csv', use_unicode=False)
old_data_df = old_data.map(lambda x: (x, )).toDF(["exploded"])
result_df = old_data_df.union(result_df)
else:
pass
#Save DataFrame in HDFS
result_df.coalesce(1).write.mode('append').csv('hdfs:///user/zeppelin/achyuttest.csv')
# Rename file
#list files in the directory
list_status = fs.listStatus(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir))
#filter name of the file starts with part-
file_name = [file.getPath().getName() for file in list_status if file.getPath().getName().startswith('part-')][0]
#rename the file
new_filename = "trigram.csv"
fs.delete(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir+''+new_filename))
fs.rename(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir+''+file_name),spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir+''+new_filename))
counter = counter+1
我正在对 50K 文件执行此代码,而我的 Spark 执行此程序需要太多时间.(过了 2 天,还在继续……)
I am performing this code on 50K files, and my spark is taking too much time to perform this program. (Passed 2 days and still going ...)
我在虚拟机中运行 HDP(运行一个节点 HDP 沙盒)这是我的系统规格...
I'm running HDP in Virtual machine(running one node HDP Sandbox)Here is my system specification...
====> 来宾操作系统::
====> Guest OS::
内存:12930 MB
Memory: 12930 MB
CPU:6 个 CPU
CPU: 6CPUs
===> 纱线规格::
===> YARN Specifications::
1.内存:4608MB
1.Memory: 4608 MB
最大容器内存:4608 MB
Maximum Container memory: 4608 MB
最大容器大小(Vcores):4
Maximum Container size(Vcores): 4
虚拟核心数:4
===> Zeppelin Pyspark 解释器规范::1. spark.executor.memory: 空白(按照文档中的说明是指 1g)
===> Zeppelin Pyspark Interpreter Specification:: 1. spark.executor.memory: Blank (it's mean 1g as per specified in the documentation)
所以我有两个问题.
- 我的代码是否正确?
- 我必须在 YARN 和 Zeppelin Interpreter 中指定哪个值,以便快速高效地工作.
谢谢.
推荐答案
我正在回答我的第一个问题.
I'm answering my first question.
根据旧代码,我正在为文件夹中的每个文件制作一个 RDD,所以花费了太多时间(处理 3K 文件需要 19 小时.)
According to the old code, I was making an RDD for each file located in folder, So It was taking too much time (To process 3K files it was taking 19 hr.)
但是现在我所做的是在 Single RDD 操作中读取所有输入文件,并对其执行所有操作.(现在新代码需要大约 15 分钟才能处理 3K 文件.)
But Now What I have done is to Read all input files in Single RDD operation, and perform all operations on it. (Now New code is taking ~15 min to process 3K files.)
评论用于额外理解
Patentdetect-local.py
Patentdetect-local.py
"""
To Run this code
Set Pyspark_python
$ export PYSPARK_PYTHON=/usr/bin/python3
$ pip install nltk
RUN ON Spark::
$ ./bin/spark-submit file_path/Patentdetect-local.py
"""
import pyspark
import nltk
import string
import os
import re
from pyspark import SparkContext
from nltk.stem import WordNetLemmatizer
from pyspark.ml.feature import NGram
from pyspark.sql.types import ArrayType,StructType,StructField,StringType
from pyspark.sql.functions import explode,array,split,collect_list
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
sc = SparkContext.getOrCreate()
spark = SparkSession.builder.appName('Spark Example').getOrCreate()
Source_path="<path>/*/abstract.txt"
Destination_path="<path>/spark-outputs/parquet/Electronics-50/"
data=sc.textFile(Source_path)
data.persist()
lower_casetext = data.map(lambda x:x.lower())
# splitting_rdd = lower_casetext.map(lambda x:x.split(" "))
# print(splitting_rdd.collect())
# Function to perform Sentence tokeniaztion
def sent_TokenizeFunct(x):
return nltk.sent_tokenize(x)
sentencetokenization_rdd = lower_casetext.map(sent_TokenizeFunct)
# Function to perform Word tokenization
def word_TokenizeFunct(x):
splitted = [word for line in x for word in line.split()]
return splitted
wordtokenization_rdd = sentencetokenization_rdd.map(word_TokenizeFunct)
# Remove Stop Words
def removeStopWordsFunct(x):
from nltk.corpus import stopwords
stop_words=set(stopwords.words('english'))
filteredSentence = [w for w in x if not w in stop_words]
return filteredSentence
stopwordRDD = wordtokenization_rdd.map(removeStopWordsFunct)
# Remove Punctuation marks
def removePunctuationsFunct(x):
list_punct=list(string.punctuation)
filtered = [''.join(c for c in s if c not in list_punct) for s in x]
filtered_space = [s for s in filtered if s] #remove empty space
return filtered
rmvPunctRDD = stopwordRDD.map(removePunctuationsFunct)
# Perform Lemmatization
def lemma(x):
lemmatizer = WordNetLemmatizer()
final_rdd = [lemmatizer.lemmatize(s) for s in x]
return final_rdd
lem_wordsRDD = rmvPunctRDD.map(lemma)
# Join tokens
# def joinTokensFunct(x):
# joinedTokens_list = []
# x = " ".join(x)
# return x
# joinedTokensRDD = lem_wordsRDD.map(joinTokensFunct)
##Create DataFrame from RDD
df = lem_wordsRDD.map(lambda x: (x, )).toDF(["features"])
tokenized_df = df.withColumn("values", explode("features")).select("values")
## Write DataFrame Output
# tokenized_df.write.mode('append').csv(Destination_path)
## Change File-name
# for old_file_name in os.listdir(Destination_path):
# src = Destination_path+old_file_name
# dst = Destination_path+"unigram.csv"
# if old_file_name.startswith("part-"):
# os.rename(src, dst)
# break
## For Bigrams following commented line is enough
# # tokenized_df.select(F.concat_ws(" ",F.col("values"),F.lead("values").over(Window.orderBy(F.lit(None))))).show()
## Create Final DataFrme
final_df = tokenized_df.select(collect_list("values").alias("listed_data"))
# final_df.show(truncate=False)
final_df.persist()
## Unigram
unigram = NGram(n=1, inputCol="listed_data", outputCol="unigrams")
unigramDataFrame = unigram.transform(final_df)
unigram_FinalDataFrame = unigramDataFrame.withColumn("unigram_final",explode("unigrams")).select("unigram_final")
## Write DataFrame Outputs
unigram_FinalDataFrame.write.mode('append').parquet(Destination_path)
# Change filename
for old_file_name in os.listdir(Destination_path):
src = Destination_path+old_file_name
dst = Destination_path+"unigram.parquet"
if old_file_name.startswith("part-"):
os.rename(src, dst)
## Bigram
bigram = NGram(n=2, inputCol="listed_data", outputCol="bigrams")
bigramDataFrame = bigram.transform(final_df)
bigram_FinalDataFrame = bigramDataFrame.withColumn("bigram_final",explode("bigrams")).select("bigram_final")
## Write DataFrame Outputs
bigram_FinalDataFrame.write.mode('append').parquet(Destination_path)
## Change filename
for old_file_name in os.listdir(Destination_path):
src = Destination_path+old_file_name
dst = Destination_path+"bigram.parquet"
if old_file_name.startswith("part-"):
os.rename(src, dst)
# break
## Trigram
trigram = NGram(n=3, inputCol="listed_data", outputCol="trigram")
trigramDataFrame = trigram.transform(final_df)
trigram_FinalDataFrame = trigramDataFrame.withColumn("trigram_final",explode("trigram")).select("trigram_final")
## Write DataFrame Outputs
trigram_FinalDataFrame.write.mode('append').parquet(Destination_path)
# Change Filename
for old_file_name in os.listdir(Destination_path):
src = Destination_path+old_file_name
dst = Destination_path+"trigram.parquet"
if old_file_name.startswith("part-"):
os.rename(src, dst)
# break
final_df.unpersist()
data.unpersist()
这篇关于在 pyspark 中执行 NLTK的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!