有效地批处理Spark数据帧以调用API [英] Efficiently batching Spark dataframes to call an API

查看:52
本文介绍了有效地批处理Spark数据帧以调用API的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我对Spark还是很陌生,我正尝试使用Spotipy调用Spotify API.我有一个艺术家ID列表,可用于获取艺术家信息.Spotify API允许一次批量调用最多50个ID.我从MySQL数据库加载艺术家ID,并将其存储在数据框中.

I am fairly new to Spark and I'm trying to call the Spotify API using Spotipy. I have a list of artist ids which can be used to fetch artist info. The Spotify API allows for batch calls up to 50 ids at once. I load the artist ids from a MySQL database and store them in a dataframe.

我现在的问题是我不知道如何有效地将该数据帧批处理为50行或更少的行.

My problem now is that I do not know how to efficiently batch that dataframe into pieces of 50 or less rows.

在下面的示例中,我将数据框转换为常规的Python列表,可以从中批量调用50个API.

In the example below I'm turning the dataframe into a regular Python list from which I can call the API on batches of 50.

有什么主意,我可以不用返回Python列表就能做到这一点吗?

Any ideas how I could do this without going back to a Python list?

import spotipy
from spotipy.oauth2 import SpotifyClientCredentials
from pyspark.sql import SparkSession
import os

spark = SparkSession\
        .builder\
        .appName("GetArtists")\
        .getOrCreate()

df = spark.read.format('jdbc') \
    .option("url", "jdbc:mysql://"+os.getenv("DB_SERVER")+":"+os.getenv("DB_PORT")+"/spotify_metadata")\
    .option("user", os.getenv("DB_USER"))\
    .option("password", os.getenv("DB_PW"))\
    .option("query", "SELECT artist_id FROM artists")\
    .load()

sp = spotipy.Spotify(client_credentials_manager=SpotifyClientCredentials())

ids = [row['artist_id'] for row in df.collect()]

batch_size = 50
for i in range(0,len(ids), batch_size):
    artists = sp.artists( ids[i:i+batch_size] )

    # process the JSON response

我考虑过使用 foreach 并为每个ID调用API,但这会导致不必要的请求.结果也存储回数据库中,这意味着我正在向数据库中写入许多单行.

I thought about using foreach and calling the API for each id, but this results in unnecessary requests. Also the results are stored back in the database, which means that I am writing many single rows to the database.

推荐答案

如果要基于行号划分数据帧,则可以这样做:

If you want to divide the dataframe based on the row number then you can do it like:

from pyspark.sql import functions as f
from pyspark.sql import Window

df = df.withColumn('row_num', f.row_number().over(Window.orderBy(f.lit(1))))
len = df.count()

for i in range(0,len, 50):
    df = df.filter(f.col('row_num')>=i & f.col('row_num')<=i+50)
    #api logic goes here

但是,如果您可以直接将df传递给api,则可以传递df或收集df,而每次仅包含50个值.

But if you can pass the df to the api directly then pass df or collect df which will have only 50 values each time.

这篇关于有效地批处理Spark数据帧以调用API的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆