如何在每个执行器中一次加载文件? [英] How to load a file in each executor once?

查看:109
本文介绍了如何在每个执行器中一次加载文件?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我定义以下代码以加载预训练的嵌入模型:

I define the following code in order to load a pretrained embedding model:

import gensim

from gensim.models.fasttext import FastText as FT_gensim
import numpy as np

class Loader(object):
    cache = {}
    emb_dic = {}
    count = 0
    def __init__(self, filename):
        print("|-------------------------------------|")
        print ("Welcome to Loader class in python")
        print("|-------------------------------------|")
        self.fn = filename

    @property
    def fasttext(self):
        if Loader.count == 1:
                print("already loaded")
        if self.fn not in Loader.cache:
            Loader.cache[self.fn] =  FT_gensim.load_fasttext_format(self.fn)
            Loader.count = Loader.count + 1
        return Loader.cache[self.fn]


    def map(self, word):
        if word not in self.fasttext:
            Loader.emb_dic[word] = np.random.uniform(low = 0.0, high = 1.0, size = 300)
            return Loader.emb_dic[word]
        return self.fasttext[word]

我称此类为:

inputRaw = sc.textFile(inputFile, 3).map(lambda line: (line.split("\t")[0], line.split("\t")[1])).map(Loader(modelpath).map)

  1. 我对要加载多少次模型路径文件感到困惑?我希望每个执行器一次加载并被其所有内核使用.我对这个问题的回答是,模型路径将被加载3次(=分区数.).如果我的回答正确,那么这种建模的缺点与文件模型路径的大小有关.假设此文件为10 GB,并假设我有200个分区.因此,在这种情况下,我们将需要10 * 200gb = 2000,并且具有巨大的容量(此解决方案只能在分区数量较少的情况下使用).

假设我有一个 rdd =(id, sentence) =[(id1, u'patina californian'), (id2, u'virgil american'), (id3', u'frensh'), (id4, u'american')]

我想总结每个句子的嵌入词向量:

and i want to sumup the embedding word vectors for each sentence:

def test(document):
    print("document is = {}".format(document))
    documentWords = document.split(" ")
    features = np.zeros(300)
    for word in documentWords:
        features = np.add(features, Loader(modelpath).fasttext[word])
    return features

def calltest(inputRawSource):

    my_rdd = inputRawSource.map(lambda line: (line[0], test(line[1]))).cache()
    return my_rdd

在这种情况下,模型路径文件将被加载多少次?请注意,我设置了spark.executor.instances" to 3

In this case how many times the modelpath file will be loaded? Note that i set spark.executor.instances" to 3

推荐答案

默认情况下,分区数设置为Spark集群中所有执行者节点上的内核总数.假设您正在包含总共200个CPU内核的Spark集群(或超级计算执行程序)上处理10 GB,这意味着默认情况下,Spark可能使用200个分区来处理数据.

By default, the number of partitions is set to the total number of cores on all the executer nodes in the Spark cluster. Suppose you are processing 10 GB on a Spark cluster (or supercomputing executor) that contains a total of 200 CPU cores, that means Spark might use 200 partitions, by default, to process your data.

另外,要使每个CPU的每个执行器都能使用所有CPU内核,可以在python中解决(将100%的所有内核与多处理模块一起使用).

Also, to make all your CPU cores work per each executer this can be solved in python (using 100% of all cores with the multiprocessing module).

这篇关于如何在每个执行器中一次加载文件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆