如何使用Laravel Queue在S3上拦截新文件? [英] How to intercept a new file on S3 using Laravel Queues?

查看:124
本文介绍了如何使用Laravel Queue在S3上拦截新文件?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个S3存储桶 mybucket ,当要将新文件复制到该存储桶时,我想执行一些操作.对于通知,我想使用SQS队列 notifiqueue ,因为我的目标是使用 Laravel

I have an S3 bucket, mybucket, and I want to execute something when a new file is copied into that bucket. For the notifications, I want to use an SQS queue, notifiqueue, because my goal is to access that queue with Laravel

由于我是在 CloudFormation 中创建我的基础架构的,因此资源的创建是这样的:

Since I am creating my infrastructure in CloudFormation, the resources are created like this:

NotificationQueue:
  Type: AWS::SQS::Queue
  Properties:
    VisibilityTimeout: 120
    QueueName: 'NotificationQueue'

DataGateBucket:
  Type: AWS::S3::Bucket
  Properties:
    AccessControl: BucketOwnerFullControl
    BucketName: 'mybucket'
    NotificationConfiguration:
      QueueConfigurations:
        - Event: 's3:ObjectCreated:*'
          Queue: !GetAtt NotificationQueue.Arn

每次将新文件保存到存储桶中时,S3都会在SQS中自动创建一个通知.

Each time a new file is saved on the bucket, S3 automatically creates a notification in SQS.

可悲的是,有效负载的格式与Laravel标准作业有效负载不兼容,如果我在 NotificationQueue 上运行辅助进程,则会出现此错误:

Sadly, the format of the payload is NOT COMPATIBLE with Laravel standard job payload, and if I run a worker process on the NotificationQueue I get this error:

local.ERROR: Undefined index: job {"exception":"[object] (ErrorException(code: 0): Undefined index: job at .../vendor/laravel/framework/src/Illuminate/Queue/Jobs/Job.php:273)

为了提供更完整的指示,这是我在通知中得到的内容(将JSON转换为PHP数组之后)

To provide a more complete indication, here is what I get in the notification (after turning JSON into a PHP array)

array:1 [
  "Records" => array:1 [
    0 => array:9 [
      "eventVersion" => "2.1"
      "eventSource" => "aws:s3"
      "awsRegion" => "eu-central-1"
      "eventTime" => "2019-04-23T17:02:41.308Z"
      "eventName" => "ObjectCreated:Put"
      "userIdentity" => array:1 [
        "principalId" => "AWS:XXXXXXXXXXXXXXXXXX"
      ]
      "requestParameters" => array:1 [
        "sourceIPAddress" => "217.64.198.7"
      ]
      "responseElements" => array:2 [
        "x-amz-request-id" => "602CE18B8DE0BE5C"
        "x-amz-id-2" => "wA/A3Jl2XpoxBWJEgQzy11s6O28Cz9Wc6pVi6Ho1vnIrOjqsWkGozlUmqRdpYAfub0MqdF8d/YI="
      ]
      "s3" => array:4 [
        "s3SchemaVersion" => "1.0"
        "configurationId" => "0d4eaa75-5730-495e-b6d4-368bf3690f30"
        "bucket" => array:3 [
          "name" => "mybucket"
          "ownerIdentity" => array:1 [
            "principalId" => "XXXXXXXXXXXXXXXXXX"
          ]
          "arn" => "arn:aws:s3:::mybucket"
        ]
        "object" => array:4 [
          "key" => "dirName/myFile.txt"
          "size" => 1991721
          "eTag" => "824a20edad0091027b5d0fa6d78bb24f"
          "sequencer" => "005CBF452E30AAC02A"
        ]
      ]
    ]
  ]
]

哪种是使用Laravel访问通知的最佳/正确/正确的方法,以便我可以触发其他一些选项来响应文件上传?

Which is the working / best / right way to access the notification using Laravel so that I can trigger some other option in response to the file upload ?

推荐答案

我找到了一种获得所需行为的方法,但是我不确定这是最好的方法,因此我将其发布在这里,也许可以给我反馈

I found a way to obtain the desired behavior, but I am not sure this is the best way, so I post it here and maybe can give me a feedback.

当我们谈论Laravel队列时,很多配置来自 app.php ,特别是来自 Provider 部分.我设法添加了需要覆盖原始 QueueServiceProvider 类并替换它的行为:

When we talk about Laravel Queues, a lot of configuration comes from app.php, and in particular from the Provider section. I managed to add the behavior that I needed overriding Original QueueServiceProvider class and substituting it:

// Here is the original Provider Class
//Illuminate\Queue\QueueServiceProvider::class,
// Here is the overridden Provider
\App\Providers\QueueServiceProvider::class, 

新的 QueueServiceProvider 类如下:

<?php

namespace App\Providers;

use App\Jobs\SqsNotifications\SqsConnector;

class QueueServiceProvider extends \Illuminate\Queue\QueueServiceProvider
{

    /**
     * Register the Amazon SQS queue connector.
     *
     * @param  \Illuminate\Queue\QueueManager  $manager
     * @return void
     */
    protected function registerSqsNotifConnector($manager)
    {
        $manager->addConnector('sqsNotif', function () {
            return new SqsConnector();
        });
    }


    public function registerConnectors($manager){
        parent::registerConnectors($manager);

        // Add the custom SQS notification connector
        $this->registerSqsNotifConnector($manager);
    }
}

注意新的连接器 sqsNotif ,它将需要添加到 queue.php

Notice the new connector sqsNotif, that will need to be added to the queue.php

 'sqsNotif' => [
        'driver' => 'sqsNotif',
        'key' => env('AWS_ACCESS_KEY_ID'),
        'secret' => env('AWS_SECRET_ACCESS_KEY'),
        'prefix' => env('SQS_PREFIX', 'https://sqs.eu-central-1.amazonaws.com/your-account'),
        'queue' => env('SQS_QUEUE', 'your-queue-name'),
        'region' => env('AWS_DEFAULT_REGION', 'eu-central-1'),
],

在新的 QueueServiceProvider 中,我们只注册了一个额外的连接器,其代码为:

In the new QueueServiceProvider we just register an extra connector, which code is:

<?php

namespace App\Jobs\SqsNotifications;

use Aws\Sqs\SqsClient;
use Illuminate\Support\Arr;

class SqsConnector extends \Illuminate\Queue\Connectors\SqsConnector
{

    /**
     * Establish a queue connection.
     *
     * @param  array  $config
     * @return \Illuminate\Contracts\Queue\Queue
     */
    public function connect(array $config)
    {
         $config = $this->getDefaultConfiguration($config);

        if ($config['key'] && $config['secret']) {
            $config['credentials'] = Arr::only($config, ['key', 'secret', 'token']);
        }

        return new SqsQueue(
            new SqsClient($config), $config['queue'], $config['prefix'] ?? ''
        );
    }
}

也以这种方式重新定义了SqsQueue:

The SqsQueue is redefined too, in this way:

<?php

namespace App\Jobs\SqsNotifications;

class SqsQueue extends \Illuminate\Queue\SqsQueue
{
   /**
    * Pop the next job off of the queue.
    *
    * @param  string  $queue
    * @return \Illuminate\Contracts\Queue\Job|null
    */
    public function pop($queue = null)
    {
        $response = $this->sqs->receiveMessage([
            'QueueUrl' => $queue = $this->getQueue($queue),
            'AttributeNames' => ['ApproximateReceiveCount'],
        ]);

        if (! is_null($response['Messages']) && count($response['Messages']) > 0) {
            return new SqsJob(
                $this->container, $this->sqs, $response['Messages'][0],
                $this->connectionName, $queue
            );
        }
    }
}

最后一个丢失的部分是SqsJob,其定义如下:

And the last missing piece is SqsJob, defined like this:

<?php

namespace App\Jobs\SqsNotifications;

use Illuminate\Queue\Jobs\JobName;

/**
 * Class SqsJob
 * @package App\Jobs\SqsNotifications
 *
 * Alternate SQS job that is used in case of S3 notifications
 */
class SqsJob extends \Illuminate\Queue\Jobs\SqsJob
{

    /**
     * Get the name of the queued job class.
     *
     * @return string
     */
    public function getName()
    {

        $bucketName = '';

        // Define the name of the Process based on the bucket name
        switch($this->payload()['Records'][0]['s3']['bucket']['name']){
            case 'mybucket':
                $bucketName = 'NewMyBucketFileJob';
                break;
        }

        return $bucketName;
    }

   /**
    * Fire the job.
    *
    * @return void
    */
    public function fire()
    {
        // Mimic the original behavior with a different payload
        $payload = $this->payload();
        [$class, $method] = JobName::parse('\App\Jobs\\' . $this->getName() . '@handle');
        ($this->instance = $this->resolve($class))->{$method}($payload);

        // The Job wasn't automatically deleted, so we need to delete it manually once the process went fine
        $this->delete();
    }
}

这时,我只需要在 NewMyBucketFileJob 中定义处理作业,例如下面的作业:

At this point, I just need to define the processing Job, for example like the one below, in a NewMyBucketFileJob:

<?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;

class ProcessDataGateNewFile implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    /**
     * Create a new job instance.
     *
     * @return void
     */
    public function __construct()
    {
    }

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle($data)
    {        
        // Print the whole data structure
        print_r($data);
        // Or just the name of the uploaded file
        print_r($data['Records'][0]['s3']['object']['key']);
    }
}

此过程有效,因此这是一个解决方案,但是涉及许多类扩展,并且如果将来的发行版中内部队列实现将被更改,则它非常脆弱.老实说,我想知道是否有更简单或更强大的功能

This process works, so this is a solution, but involves a lot of class extensions, and it's quite fragile in case the internal queue implementation will be changed in the future releases. I am honestly wondering if there is something easier or more robust

这篇关于如何使用Laravel Queue在S3上拦截新文件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆