Perl,如何从 url 并行获取数据? [英] Perl, how to fetch data from urls in parallel?

查看:20
本文介绍了Perl,如何从 url 并行获取数据?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要从许多不公开任何服务的网络数据提供者那里获取一些数据,所以我必须写这样的东西,例如使用 WWW::Mechanize:

I need to fetch some data from many web data providers, who do not expose any service, so I have to write something like this, using for example WWW::Mechanize:

use WWW::Mechanize;
@urls = ('http://www.first.data.provider.com', 'http://www.second.data.provider.com', 'http://www.third.data.provider.com');
%results = {};
foreach my $url (@urls) {
 $mech = WWW::Mechanize->new();
 $mech->get($url);
 $mech->form_number(1);
 $mech->set_fields('user' => 'myuser', pass => 'mypass');
 $resp = $mech->submit();
 $results{$url} = parse($resp->content());
}
consume(%results);

是否有一些(可能很简单;-)方法可以同时将数据提取到一个公共的 %results 变量,即:并行地,从所有提供者?

Is there some (possibly simple ;-) way to fetch data to a common %results variable, simultaneously, i.e: in parallel, from all the providers?

推荐答案

threads 在 Perl 中是要避免的.use 线程 主要用于在 Windows 上模拟 UNIX 风格的 fork;除此之外,它毫无意义.

threads are to be avoided in Perl. use threads is mostly for emulating UNIX-style fork on Windows; beyond that, it's pointless.

(如果你在意,这个实现让这个事实非常清楚.在 perl 中,解释器是一个 PerlInterpreter 对象.线程的方式作品是通过制作一堆线程,然后创建一个全新的每个线程中的 PerlInterpreter 对象.线程共享绝对什么都没有,甚至比子进程还少;fork 得到你写时复制,但使用 threads,所有复制都在 Perl 中完成空间!慢!)

(If you care, the implementation makes this fact very clear. In perl, the interpreter is a PerlInterpreter object. The way threads works is by making a bunch of threads, and then creating a brand-new PerlInterpreter object in each thread. Threads share absolutely nothing, even less than child processes do; fork gets you copy-on-write, but with threads, all the copying is done in Perl space! Slow!)

如果你想在同一个进程中同时做很多事情,在 Perl 中这样做的方法是使用事件循环,例如EV活动,或POE,或使用 Coro.(你可以还根据 AnyEvent API 编写您的代码,这将使你使用任何事件循环.这是我更喜欢的.)区别两者之间是您编写代码的方式.

If you'd like to do many things concurrently in the same process, the way to do that in Perl is with an event loop, like EV, Event, or POE, or by using Coro. (You can also write your code in terms of the AnyEvent API, which will let you use any event loop. This is what I prefer.) The difference between the two is how you write your code.

AnyEvent(以及 EV、事件、POE 等)迫使您以面向回调的方式编写代码风格.控制不是从上到下流动的,而是在一个连续传递风格.函数不返回值,它们调用其他函数及其结果.这允许您运行许多 IO并行操作——当给定的 IO 操作产生时结果,您处理这些结果的函数将被调用.什么时候另一个 IO 操作完成,该函数将被调用.和以此类推.

AnyEvent (and EV, Event, POE, and so on) forces you to write your code in a callback-oriented style. Instead of control flowing from top to bottom, control is in a continuation-passing style. Functions don't return values, they call other functions with their results. This allows you to run many IO operations in parallel -- when a given IO operation has yielded results, your function to handle those results will be called. When another IO operation is complete, that function will be called. And so on.

这种方法的缺点是你必须重写你的代码.所以有一个名为 Coro 的模块,它为 Perl 提供了真实的(用户空间)线程,让您从上到下编写代码,但仍然是非阻塞的.(这样做的缺点是大量修改 Perl 的内部结构.但它似乎工作得很好.)

The disadvantage of this approach is that you have to rewrite your code. So there's a module called Coro that gives Perl real (user-space) threads that will let you write your code top-to-bottom, but still be non-blocking. (The disadvantage of this is that it heavily modifies Perl's internals. But it seems to work pretty well.)

所以,既然我们不想重写WWW::Mechanize今晚,我们将使用 Coro.Coro 带有一个名为的模块Coro::LWP 这将使所有对 LWP 的调用都是非阻塞.它将阻塞当前线程(协程",在 Corolingo),但它不会阻塞任何其他线程.这意味着你可以一次性处理大量请求,并在结果变为可用的.并且 Coro 将比您的网络连接更好地扩展;每个协程只使用几 k 的内存,所以很容易有几十个成千上万的人.

So, since we don't want to rewrite WWW::Mechanize tonight, we're going to use Coro. Coro comes with a module called Coro::LWP that will make all calls to LWP be non-blocking. It will block the current thread ("coroutine", in Coro lingo), but it won't block any other threads. That means you can make a ton of requests all at once, and process the results as they become available. And Coro will scale better than your network connection; each coroutine uses just a few k of memory, so it's easy to have tens of thousands of them around.

考虑到这一点,让我们看一些代码.这是一个启动的程序三个并行的 HTTP 请求,并打印每个请求的长度回复.它类似于你在做什么,减去实际加工;但是你可以把你的代码放在我们计算的地方长度,它会起作用.

With that in mind, let's see some code. Here's a program that starts three HTTP requests in parallel, and prints the length of each response. It's similar to what you're doing, minus the actual processing; but you can just put your code in where we calculate the length and it will work the same.

我们将从通常的 Perl 脚本样板开始:

We'll start off with the usual Perl script boilerplate:

#!/usr/bin/env perl

use strict;
use warnings;

然后我们将加载 Coro 特定的模块:

Then we'll load the Coro-specific modules:

use Coro;
use Coro::LWP;
use EV;

Coro 在幕后使用事件循环;它会为你挑选一个,如果你想要,但我们只会明确指定 EV.这是最好的活动循环.

Coro uses an event loop behind the scenes; it will pick one for you if you want, but we'll just specify EV explicitly. It's the best event loop.

然后我们将加载我们工作所需的模块,这就是:

Then we'll load the modules we need for our work, which is just:

use WWW::Mechanize;

现在我们准备好编写我们的程序了.首先,我们需要一个 URL 列表:

Now we're ready to write our program. First, we need a list of URLs:

my @urls = (
    'http://www.google.com/',
    'http://www.jrock.us/',
    'http://stackoverflow.com/',
);

然后我们需要一个函数来生成一个线程并完成我们的工作.做一个Coro 上的新线程,你调用 async 就像 async { body;的线;在这里}.这将创建一个线程,启动它,然后继续程序的其余部分.

Then we need a function to spawn a thread and do our work. To make a new thread on Coro, you call async like async { body; of the thread; goes here }. This will create a thread, start it, and continue with the rest of the program.

sub start_thread($) {
    my $url = shift;
    return async {
        say "Starting $url";
        my $mech = WWW::Mechanize->new;
        $mech->get($url);
        printf "Done with $url, %d bytes
", length $mech->content;
    };
}

这是我们程序的主要内容.我们只是把我们普通的 LWP 程序在异步内部,它将神奇地非阻塞.get 块,但是其他协程会在等待它获取数据时运行来自网络.

So here's the meat of our program. We just put our normal LWP program inside async, and it will be magically non-blocking. get blocks, but the other coroutines will run while waiting for it to get the data from the network.

现在我们只需要启动线程:

Now we just need to start the threads:

start_thread $_ for @urls;

最后,我们要开始处理事件:

And finally, we want to start handling events:

EV::loop;

就是这样.当你运行它时,你会看到一些输出,如:

And that's it. When you run this, you'll see some output like:

Starting http://www.google.com/
Starting http://www.jrock.us/
Starting http://stackoverflow.com/
Done with http://www.jrock.us/, 5456 bytes
Done with http://www.google.com/, 9802 bytes
Done with http://stackoverflow.com/, 194555 bytes

如您所见,请求是并行发出的,而您没有求助于线程

As you can see, the requests are made in parallel, and you didn't have to resort to threads!

更新

您在原始帖子中提到要限制并行运行的 HTTP 请求的数量.一种方法是使用信号量,Coro::Semaphore 在 Coro.

You mentioned in your original post that you want to limit the number of HTTP requests that run in parallel. One way to do that is with a semaphore, Coro::Semaphore in Coro.

信号量就像一个计数器.当您想使用信号量保护的资源时,您可以关闭"信号量.这会减少计数器并继续运行您的程序.但是如果当您尝试关闭信号量时计数器为零,则您的线程/协程将进入睡眠状态,直到它不为零.当计数再次上升时,您的线程将被唤醒,关闭信号量并继续.最后,当您使用完信号量保护的资源时,您向上"信号量并给其他线程运行的机会.

A semaphore is like a counter. When you want to use the resource that a semaphore protects, you "down" the semaphore. This decrements the counter and continues running your program. But if the counter is at zero when you try to down the semaphore, your thread/coroutine will go to sleep until it is non-zero. When the count goes up again, your thread will wake up, down the semaphore, and continue. Finally, when you're done using the resource that the semaphore protects, you "up" the semaphore and give other threads the chance to run.

这让您可以控制对共享资源的访问,例如发出 HTTP 请求".

This lets you control access to a shared resource, like "making HTTP requests".

您需要做的就是创建一个您的 HTTP 请求线程将共享的信号量:

All you need to do is create a semaphore that your HTTP request threads will share:

my $sem = Coro::Semaphore->new(5);

5 的意思是让我们在阻塞之前调用 'down' 5 次",或者换句话说,让有 5 个并发 HTTP 请求".

The 5 means "let us call 'down' 5 times before we block", or, in other words, "let there be 5 concurrent HTTP requests".

在我们添加任何代码之前,让我们先谈谈可能出错的地方.可能发生的不好的事情是线程向下" - 信号量,但永远不会向上" - 完成后.那么没有任何东西可以使用该资源,并且您的程序可能最终什么也不做.有很多方法可能会发生这种情况.如果你写了一些像 $sem->down;做一点事;$sem->up,您可能会感到安全,但是如果做某事"抛出异常怎么办?然后信号量将被保留下来,这很糟糕.

Before we add any code, let's talk about what can go wrong. Something bad that could happen is a thread "down"-ing the semaphore, but never "up"-ing it when it's done. Then nothing can ever use that resource, and your program will probably end up doing nothing. There are lots of ways this could happen. If you wrote some code like $sem->down; do something; $sem->up, you might feel safe, but what if "do something" throws an exception? Then the semaphore will be left down, and that's bad.

幸运的是,Perl 可以轻松拥有范围 Guard 对象,当变量持有对象时,它将自动运行代码超出范围.我们可以让代码成为$sem->up,这样我们就再也不用担心不想要的资源了.

Fortunately, Perl makes it easy to have scope Guard objects, that will automatically run code when the variable holding the object goes out of scope. We can make the code be $sem->up, and then we'll never have to worry about holding a resource when we don't intend to.

Coro::Semaphore 集成了守卫的概念,这意味着你可以说 my $guard = $sem->guard,当控制从您调用 guard 的范围.

Coro::Semaphore integrates the concept of guards, meaning you can say my $guard = $sem->guard, and that will automatically down the semaphore and up it when control flows away from the scope where you called guard.

考虑到这一点,为了限制并行请求的数量,我们所要做的就是 guard 位于使用 HTTP 的协程顶部的信号量:

With that in mind, all we have to do to limit the number of parallel requests is guard the semaphore at the top of our HTTP-using coroutines:

async {
    say "Waiting for semaphore";
    my $guard = $sem->guard;
    say "Starting";
    ...;
    return result;
}

处理评论:

如果您不希望您的程序永远存在,有几个选项.一种是在另一个线程中运行事件循环,然后在每个工作线程上join.这也允许您将结果从线程传递到主程序:

If you don't want your program to live forever, there are a few options. One is to run the event loop in another thread, and then join on each worker thread. This lets you pass results from the thread to the main program, too:

async { EV::loop };

# start all threads
my @running = map { start_thread $_ } @urls;

# wait for each one to return
my @results = map { $_->join } @running;

for my $result (@results) {
    say $result->[0], ': ', $result->[1];
}

您的线程可以返回如下结果:

Your threads can return results like:

sub start_thread($) {
    return async {
        ...;
        return [$url, length $mech->content];
    }
}

这是在数据结构中收集所有结果的一种方法.如果您不想返回东西,请记住所有协程共享状态.所以你可以把:

This is one way to collect all your results in a data structure. If you don't want to return things, remember that all the coroutines share state. So you can put:

my %results;

在程序的顶部,让每个协程更新结果:

at the top of your program, and have each coroutine update the results:

async {
    ...;
    $results{$url} = 'whatever';
};

当所有的协程都运行完毕后,你的哈希值就会被填满.不过,您必须加入每个协程才能知道答案何时准备就绪.

When all the coroutines are done running, your hash will be filled with the results. You'll have to join each coroutine to know when the answer is ready, though.

最后,如果您将此作为 Web 服务的一部分执行,则应使用可感知协程的 Web 服务器,例如 Corona.这将在协程中运行每个 HTTP 请求,除了能够并行发送 HTTP 请求之外,还允许您并行处理多个 HTTP 请求.这将很好地利用内存、CPU 和网络资源,并且非常易于维护!

Finally, if you are doing this as part of a web service, you should use a coroutine-aware web server like Corona. This will run each HTTP request in a coroutine, allowing you to handle multiple HTTP requests in parallel, in addition to being able to send HTTP requests in parallel. This will make very good use of memory, CPU, and network resources, and will be pretty easy to maintain!

(您基本上可以将我们的程序从上面剪切-粘贴到处理 HTTP 请求的协程中;创建新的协程并join 在协程中是可以的.)

(You can basically cut-n-paste our program from above into the coroutine that handles the HTTP request; it's fine to create new coroutines and join inside a coroutine.)

这篇关于Perl,如何从 url 并行获取数据?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆