在pyspark中执行NLTK [英] Perform NLTK in pyspark

查看:64
本文介绍了在pyspark中执行NLTK的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是pyspark的新手,我开发了一个程序来对HDFS文件执行NLTK,以下是该步骤.我正在使用spark 2.3.1

I am very new in pyspark and I have developed a program to perform NLTK on HDFS file, The following are the steps for that.I'm using spark 2.3.1

1.从HDFS获取文件

2.进行合法化

3.删除标点符号.

4.将RDD转换为DataFrame

5.执行令牌生成器

6.删除停用词

7.展开列数据以为每条记录创建唯一的行

8.我想将所有文件数据保存在一个文件中,所以我将输出与旧的fil合并

9.现在,将整个合并的输出写入HDFS

10.然后删除旧文件,并将Spark创建的文件重命名为其他名称

11.我正在对所有bigram和trigram文件执行此操作.

这是我的pyspark代码.

Here is my pyspark code.

%pyspark

import os
import pyspark
import csv
import nltk
import json
import string
import re

from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.ml.feature import NGram
from pyspark import SparkContext, SparkConf as sc
from pyspark.sql.types import StringType

from nltk.corpus import stopwords
nltk.download('stopwords')

from pyspark.sql import SQLContext
from pyspark.sql.functions import explode,regexp_replace

import pandas
import hdfs



nltk.download('punkt')
from nltk.stem import WordNetLemmatizer
nltk.download('wordnet')


from pyspark import SparkContext, SparkConf
# conf = SparkConf().setAppName("PySpark App")
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

hdfs_dst_dir = "/user/zeppelin/achyuttest.csv/"
counter=0

#Lemmatizen

def lemma(x):

    lemmatizer = WordNetLemmatizer()
    return lemmatizer.lemmatize(x)



for i in range(1,50001):
    data = sc.textFile('hdfs:///user/spark/Patentdata/ElectronicsPatents/Link\ {}/abstract.txt'.format(i), use_unicode=False)

    print(type(data))

    if data.isEmpty():
        continue


    else:
        lem_words = data.map(lemma)


        list_punct=list(string.punctuation)


        len_list = lem_words.collect()


        test_str = len_list[0]
        test_df = test_str.split(' ')


        data_df = data.map(lambda x: (x, )).toDF(['lem_words'])






# Perform Tokenizer

        tokenizer =  Tokenizer(inputCol="lem_words", outputCol="tokenized_data")
        outputdata = tokenizer.transform(data_df)
        outputdata = outputdata.select('tokenized_data')




    # Remove stop words

        remover = StopWordsRemover(inputCol='tokenized_data', outputCol='words_clean')
        outputdata = remover.transform(outputdata).select('words_clean')


#Explode one Row into multiple Row with value

        result_df = outputdata.withColumn("exploded", explode("words_clean")).select("exploded")

        result_df=result_df.select(regexp_replace('exploded',"[^a-zA-Z\\s]",""))



        print("Link  ========>",i)
#Merge with old output

        if counter>0:
            old_data = sc.textFile('hdfs:///user/zeppelin/achyuttest.csv/unigram.csv', use_unicode=False)
            old_data_df = old_data.map(lambda x: (x, )).toDF(['words_clean'])


            result_df = old_data_df.union(result_df)

        else:
            pass

#Write DataFrame to HDFS

        result_df.coalesce(1).write.mode('append').csv(hdfs_dst_dir)

        fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())

    # Rename file

    #list files in the directory


        list_status = fs.listStatus(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir))


    #filter name of the file starts with part-

        print("Get FileName")
        file_name = [file.getPath().getName() for file in list_status if file.getPath().getName().startswith('part-')][0]

        print(file_name)
    #rename the file


        new_filename = "unigram.csv"

    # Remove Old file

        fs.delete(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir+''+new_filename))
        fs.rename(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir+''+file_name),spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir+''+new_filename))



## Bigrams

        bigram = NGram(n=2, inputCol="words_clean", outputCol="bigrams")

        bigramDataFrame = bigram.transform(outputdata)




    #Explode one Row into multiple Row with value

        result_df = bigramDataFrame.withColumn("exploded", explode("bigrams")).select("exploded")
        result_df=result_df.select(regexp_replace('exploded',"[^a-zA-Z\\s]",""))


    #Merge with old output

        if counter>0:
            old_data = sc.textFile('hdfs:///user/zeppelin/achyuttest.csv/bigram.csv', use_unicode=False)
            old_data_df = old_data.map(lambda x: (x, )).toDF(["exploded"])


            result_df = old_data_df.union(result_df)

        else:
            pass


    # Write Output in file

        result_df.coalesce(1).write.mode('append').csv('hdfs:///user/zeppelin/achyuttest.csv')

    # Rename file

    #list files in the directory

        list_status = fs.listStatus(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir))

    #filter name of the file starts with part-

        file_name = [file.getPath().getName() for file in list_status if file.getPath().getName().startswith('part-')][0]

    #rename the file

        new_filename = "bigram.csv"

        fs.delete(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir+''+new_filename))
        fs.rename(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir+''+file_name),spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir+''+new_filename))





## TriGram

        trigram = NGram(n=3, inputCol="words_clean", outputCol="trigrams")

        trigramDataFrame = trigram.transform(outputdata)


    #Explode one Row into multiple Row with value

        result_df = trigramDataFrame.withColumn("exploded", explode("trigrams")).select("exploded")
        result_df=result_df.select(regexp_replace('exploded',"[^a-zA-Z\\s]",""))

    #Merge with old output

        if counter>0:
            old_data = sc.textFile('hdfs:///user/zeppelin/achyuttest.csv/trigram.csv', use_unicode=False)
            old_data_df = old_data.map(lambda x: (x, )).toDF(["exploded"])


            result_df = old_data_df.union(result_df)

        else:
            pass


#Save DataFrame in HDFS
        result_df.coalesce(1).write.mode('append').csv('hdfs:///user/zeppelin/achyuttest.csv')

    # Rename file

    #list files in the directory

        list_status = fs.listStatus(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir))

    #filter name of the file starts with part-

        file_name = [file.getPath().getName() for file in list_status if file.getPath().getName().startswith('part-')][0]

    #rename the file

        new_filename = "trigram.csv"

        fs.delete(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir+''+new_filename))
        fs.rename(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir+''+file_name),spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir+''+new_filename))

        counter = counter+1

我正在对50K文件执行此代码,而我的火花花了太多时间来执行此程序. (经过2天,仍在继续...)

I am performing this code on 50K files, and my spark is taking too much time to perform this program. (Passed 2 days and still going ...)

我正在虚拟机中运行HDP(运行一个节点HDP沙箱),这是我的系统规格...

I'm running HDP in Virtual machine(running one node HDP Sandbox)Here is my system specification...

====>访客操作系统::

====> Guest OS::

  1. 内存:12930 MB

  1. Memory: 12930 MB

CPU:6个CPU

CPU: 6CPUs

===>纱线规格::

===> YARN Specifications::

1.内存:4608 MB

1.Memory: 4608 MB

  1. 最大容器内存:4608 MB

  1. Maximum Container memory: 4608 MB

最大容器大小(Vcores):4

Maximum Container size(Vcores): 4

虚拟核数:4

===> Zeppelin Pyspark口译员规范:: 1. spark.executor.memory:空白(按照文档中的规定,平均为1克)

===> Zeppelin Pyspark Interpreter Specification:: 1. spark.executor.memory: Blank (it's mean 1g as per specified in the documentation)

所以我有两个问题.

  1. 我的代码是否正确?
  2. 我必须在YARN和Zeppelin解释器中指定哪个值,以便它快速有效地工作.

谢谢.

推荐答案

我正在回答我的第一个问题.

I'm answering my first question.

根据旧代码,我为文件夹中的每个文件制作了一个RDD,所以这花费了太多时间(处理3K文件需要19个小时.)

According to the old code, I was making an RDD for each file located in folder, So It was taking too much time (To process 3K files it was taking 19 hr.)

但是现在我要做的是在Single RDD操作中读取所有输入文件,并对它执行所有操作. (现在,新代码需要大约15分钟的时间来处理3K文件.)

But Now What I have done is to Read all input files in Single RDD operation, and perform all operations on it. (Now New code is taking ~15 min to process 3K files.)

(如果有人想要我提供的代码).

(If anyone wants the code I will provide.)

这篇关于在pyspark中执行NLTK的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆