在 Python 中同时运行 POST [英] Simultaneously run POST in Python

查看:52
本文介绍了在 Python 中同时运行 POST的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试将 100,000 个数据点上传到 Web 服务后端.如果我一次运行一个,大约需要 12 个小时.它们同时支持 20 个 API 调用.如何同时运行此 POST 以加快导入速度?

I am trying to upload 100,000 data points to a web service backend. If I run it one at a time, it will take ~12 hours. They support 20 API calls simultaneously. How can I run this POST concurrently so I can speed up the import?

def AddPushTokens():

 import requests
 import csv
 import json

 count=0
 tokenList=[]

 apikey="12345"
 restkey="12345"
 URL="https://api.web.com/1/install/"
 headers={'content-type': 'application/json','Application-Id': apikey,'REST-API-Key':restkey}

 with open('/Users/name/Desktop/push-new.csv','rU') as csvfile:
      deviceTokens=csv.reader(csvfile, delimiter=',')

      for token in deviceTokens:

       deviceToken=token[0].replace("/","")
       deviceType="ios"
       pushToken="pushtoken_"+deviceToken
       payload={"deviceType": deviceType,"deviceToken":deviceToken,"channels":["",pushToken]}
       r = requests.post(URL, data=json.dumps(payload), headers=headers)

       count=count+1
       print "Count: " + str(count)
       print r.content

我正在尝试使用 concurrent.futures.我感到困惑的是如何设置它以便它从 CSV 中提取令牌并将其传递给 load_url?此外,我想确保它通过前 20 次运行请求,然后在 21 次接收并运行下一组 20 次.

I am trying to use concurrent.futures. Where I am confused is how do I set this up so it pulls the token from the CSV and passes it to load_url? Also, I want to make sure that it goes through the first 20 runs the requests, then picks up at 21 and runs the next set of 20.

import concurrent.futures
import requests

URLS = ['https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/']


apikey="12345"
restkey="12345"
URL="https://api.web.com/1/installations/"
headers={'content-type': 'application/json','X-web-Application-Id': apikey,'X-web-REST-API-Key':restkey}


     with open('/Users/name/Desktop/push-new.csv','rU') as csvfile:
     deviceTokens=csv.reader(csvfile, delimiter=',')

     for token in deviceTokens:

          deviceToken=token[0].replace("/","")
          deviceType="ios"
          pushToken="pushtoken_"+deviceToken
          payload={"deviceType": deviceType,"deviceToken":deviceToken,"channels":["",pushToken]}
          r = requests.post(URL, data=json.dumps(payload), headers=headers)


# Retrieve a single page and report the url and contents
def load_url(token):

     URL='https://api.web.com/1/installations/'

     deviceToken=token[0].replace("/","")
     deviceType="ios"
     pushToken="pushtoken_"+deviceToken
     payload={"deviceType": deviceType,"deviceToken":deviceToken,"channels":["",pushToken]}
     r = requests.post(URL, data=json.dumps(payload), headers=headers)

     count=count+1
     print "Count: " + str(count)
     print r.content

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

根据下面的评论更新

import concurrent.futures
import requests
import csv
import json

apikey="ldy0eSCqPz9PsyOLAt35M2b0XrfDZT1NBW69Z7Bw"
restkey="587XASjEYdQwH2UHruA1yeZfT0oX7uAUJ8kWTmE3"
URL="https://api.parse.com/1/installations/"
headers={'content-type': 'application/json','X-Parse-Application-Id': apikey,'X-Parse-REST-API-Key':restkey}

with open('/Users/jgurwin/Desktop/push/push-new.csv','rU') as csvfile:
     deviceTokens=csv.reader(csvfile, delimiter=',')

     for device in deviceTokens:

        token=device[0].replace("/","")

        # Retrieve a single page and report the url and contents

        def load_url(token):

          count=0
          deviceType="ios"
          pushToken="pushtoken_"+token
          payload={"deviceType": deviceType,"deviceToken":token,"channels":["",pushToken]}
          r = requests.post(URL, data=json.dumps(payload), headers=headers)

          count=count+1
          print "Count: " + str(count)
          print r.content


        # We can use a with statement to ensure threads are cleaned up promptly
          with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
              # Start the load operations and mark each future with its URL
              future_to_token = {executor.submit(load_url, token, 60): token for token in deviceTokens}
              for future in concurrent.futures.as_completed(future_to_url):
                  url = future_to_url[future]
                  try:
                      data = future.result()
                  except Exception as exc:
                      print('%r generated an exception: %s' % (url, exc))
                  else:
                      print('%r page is %d bytes' % (url, len(data)))

推荐答案

最简单的方法是使用线程.最简单的方法是使用 gevent 或类似的库(以及 grequests 甚至将 geventrequests 联系在一起,这样你就不必弄清楚如何去做).困难的方法是构建一个事件循环(或者,使用 Twisted 或 Tulip 之类的东西更好)并自己多路复用请求.

The easy way to do this is with threads. The nearly-as-easy way is with gevent or a similar library (and grequests even ties gevent and requests together so you don't have to figure out how to do so). The hard way is building an event loop (or, better, using something like Twisted or Tulip) and multiplexing the requests yourself.

让我们用简单的方法来做.

Let's do it the easy way.

您不想一次运行 100000 个线程.除了需要数百 GB 的堆栈空间,并且您的 CPU 将花费比运行实际代码更多的时间进行上下文切换之外,该服务一次仅支持 20 个连接.所以,您需要 20 个线程.

You don't want to run 100000 threads at once. Besides the fact that it would take hundreds of GB of stack space, and your CPU would spend more time context-switching than running actual code, the service only supports 20 connections at once. So, you want 20 threads.

那么,如何在 20 个线程上运行 100000 个任务?带有线程池执行器(或裸线程池).

So, how do you run 100000 tasks on 20 threads? With a thread pool executor (or a bare thread pool).

concurrent.futures 文档有一个 example 几乎与您想要的相同要做,除了使用 GET 代替 POST 和使用 urllib 代替 requests.只需将 load_url 函数更改为如下所示:

The concurrent.futures docs have an example which is almost identical to what you want to do, except doing GETs instead of POSTs and using urllib instead of requests. Just change the load_url function to something like this:

def load_url(token):
    deviceToken=token[0].replace("/","")
    # … your original code here …
    r = requests.post(URL, data=json.dumps(payload), headers=headers)
    return r.content

...并且该示例将按原样运行.

… and the example will work as-is.

由于您使用的是 Python 2.x,因此 stdlib 中没有 concurrent.futures 模块;你需要向后移植,futures.

Since you're using Python 2.x, you don't have the concurrent.futures module in the stdlib; you'll need the backport, futures.

在 Python(至少是 CPython)中,一次只有一个线程可以完成任何 CPU 工作.如果您的任务花费更多时间通过网络下载(I/O 工作)而不是构建请求和解析响应(CPU 工作),那不是问题.但如果事实并非如此,您将需要使用进程而不是线程.只需要将示例中的 ThreadPoolExecutor 替换为 ProcessPoolExecutor.

In Python (at least CPython), only one thread at a time can do any CPU work. If your tasks spend a lot more time downloading over the network (I/O work) than building requests and parsing responses (CPU work), that's not a problem. But if that isn't true, you'll want to use processes instead of threads. Which only requires replacing the ThreadPoolExecutor in the example with a ProcessPoolExecutor.

如果您想完全在 2.7 stdlib 中执行此操作,则使用 multiprocessing 中内置的线程和进程池几乎同样简单.请参阅使用工人池进程池 API,然后参见multiprocessing.dummy 如果你想使用线程而不是进程.

If you want to do this entirely in the 2.7 stdlib, it's nearly as trivial with the thread and process pools built into the multiprocessing. See Using a pool of workers and the Process Pools API, then see multiprocessing.dummy if you want to use threads instead of processes.

这篇关于在 Python 中同时运行 POST的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆