AWS GetActivityTask从相同的状态机执行停止 [英] AWS GetActivityTask beloging from the same state machine execution

查看:81
本文介绍了AWS GetActivityTask从相同的状态机执行停止的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

提前感谢您的时间和回复.

Thanks in advance for your time and response.

我有一个具有以下活动的AWS状态机.

I have an AWS state machine with the following activities.

  1. 从外部FTP服务器拉出第一个可用的数据文件
  2. 处理数据(处理时间可能有所不同)
  3. 将处理后的数据上传到另一个FTP服务器

我有一个在EC2实例中运行的Java应用程序,该实例具有3个线程,并使用如下所示的代码轮询活动. Java应用程序调用适当的工作程序以执行步骤#1,2和3的实际工作. 这里的重点是,这里的所有3个活动都应在同一服务器上进行 步骤从服务器中的文件位置写入和读取.

I have a java application running in an EC2 instance which has 3 threads and polls the activities using code as shown below. The java application invokes appropriate workers to do the actual work for steps #1,2 and 3. The important point here is that all the 3 activities here should happen in the same server as the steps write and read from a file location in the server.

我在FTP服务器上有数百个文件要处理,因此我有5个运行Java应用程序副本的Ec2服务器.

I have hundreds of files to process in the FTP server and so I have 5 Ec2 servers running copies of the java application.

现在,我开始执行5次状态机. 这样一来,我就可以在5台服务器上分配文件处理了.

Now I start 5 executions of the State machine. This would allow me the distribute the file processing across the 5 servers.

但是,我的问题是这个

如何确保 来自给定状态机执行的活动由SAME EC2实例服务器处理.

How can I ensure that Activities from a given State machine execution are handled by the SAME EC2 instance server.

我不希望给定的执行活动由不同的EC2实例处理. 在下面的代码中(来自 https://github.com/goosefraba/aws-step-function-activity-example/blob/master/src/main/java/at/goosefraba/ActivityProcessor.java ), 我看不到属于特定执行的 getActivityTask 的任何方法.

I don't want a given Execution's activities to be handled by different EC2 instances. In the code below (from https://github.com/goosefraba/aws-step-function-activity-example/blob/master/src/main/java/at/goosefraba/ActivityProcessor.java), I don't see any way to getActivityTask belonging to a particular execution.

  final ClientConfiguration clientConfiguration = new ClientConfiguration();
    clientConfiguration.setSocketTimeout((int) TimeUnit.SECONDS.toMillis(70));

    final AWSStepFunctions client = AWSStepFunctionsClientBuilder
            .standard()
            .withClientConfiguration(clientConfiguration)
            .build();

    while (true) {
        GetActivityTaskResult getActivityTaskResult =
                client.getActivityTask(
                        new GetActivityTaskRequest().withActivityArn(getArn()));
        if (getActivityTaskResult.getTaskToken() != null) {
                // Do work
        }
    }

推荐答案

已咨询AWS技术支持,但未获得答案. 最终通过使用以下方法解决了该问题;希望这对其他人有用.

Consulted with AWS technical support but didn't get the answer. Finally solved the problem by using the following approach; hopefully this is useful for others.

  1. 通过提供唯一的JobId开始执行状态机 (假设是唯一的,它可以是FTPServer中文件的名称).

  1. Start the State machine execution by supplying a unique JobId (which could be the name of the file from the FTPServer, assuming its unique).

{ "JobId":"MyUniqueJobId", }

{ "JobId": "MyUniqueJobId", }

java代码是:

        final AWSStepFunctions client = AWSStepFunctionsClientBuilder
                .standard()
                .build();
        StartExecutionRequest startExecutionRequest = new StartExecutionRequest();
        // StartReportGenerator execution
        startExecutionRequest.setStateMachineArn("arn:aws:states:us-east-1:xxxxxxx:stateMachine:poc");
        String uuid = UUID.randomUUID().toString();
        startExecutionRequest.setName(uuid);
        String inputJson = "{\"JobId\":\"MyUniqueJobId\"}";
        startExecutionRequest.setInput(inputJson);
        client.startExecution(startExecutionRequest);

  1. 在检索活动时,检索json并检查其是否等于启动状态机时设置的json;如果相等,则表示我已经开始执行的活动属于该活动. 否则,向活动发送失败,并显示错误" 重试 "

  1. Upon retrieval of the activity, retrieve the json and check if its equal to the one I set when starting the State machine.If its equal, then I have picked up the activity belonging to the execution that I started. Otherwise, send a failure to the Activity with an error "Retry"

GetActivityTaskResult getActivityTaskResult =
   client.getActivityTask(
                new GetActivityTaskRequest().withActivityArn("activityARN));

if (getActivityTaskResult.getTaskToken() != null) {
    log.info("Kicking off {} acitivity ...", getProcessName());
    String errorMessage = "";
    final JsonNode json = Jackson.jsonNodeOf(getActivityTaskResult.getInput());
    String jobId = json.get("JobId").textValue();
    if (!jobId.equals("**MyUniqueJobId**")){
        log.error("Looking to retrieve JobId MyUniqueJobId, but found " + jobId + " instead; retrying");
        errorMessage = "Retry";
        client.sendTaskFailure(
                new SendTaskFailureRequest()
                        .withError(errorMessage)
                        .withTaskToken(getActivityTaskResult.getTaskToken()));
    }
}

  • 构造状态机json,以便如果活动失败并显示错误重试",请再次尝试相同的活动.

  • The state machine json is constructed such that, if the activity fails with an error of "Retry", then try the same activity again.

    {
      "Comment": "State machine",
      "StartAt": "RunActivityOne",
      "States": {
        "RunActivityOne": {
          "Type": "Task",
          "TimeoutSeconds": 600,
          "ResultPath": "$.Result",
          "Resource": "arn:aws:states:us-east-1:xxxxxxx:activity:ActivityOne",
          "Catch": [
            {
              "ErrorEquals": [
                "Retry"
              ],
              "ResultPath": "$.Result",
              "Next": "RunActivityOne"
            },
            {
              "ErrorEquals": [
                "States.TaskFailed",
                "States.Timeout"
              ],
              "Next": "RunActivityOneFailure"
            }
          ],
          "Next": "RunActivityTwo"
        },
        "RunActivityOneFailure": {
          "Type": "Fail",
          "Cause": "RunActivityOneFailure",
          "Error": "RunActivityOneFailure"
        },
        "RunActivityTwo": {
          "Type": "Task",
          "TimeoutSeconds": 600,
          "ResultPath": "$.Result",
          "Resource": "arn:aws:states:us-east-1:xxxxxxx:activity:ActivityTwo",
          "Catch": [
            {
              "ErrorEquals": [
                "Retry"
              ],
              "ResultPath": "$.Result",
              "Next": "RunActivityTwo"
            },
            {
              "ErrorEquals": [
                "States.TaskFailed",
                "States.Timeout"
              ],
              "Next": "RunActivityTwoFailure"
            }
          ],
          "End": true
        },
        "RunActivityTwoFailure": {
          "Type": "Fail",
          "Cause": "RunActivityTwoFailure",
          "Error": "RunActivityTwoFailure"
        }
      }
    }

    这样,我最终将仅处理属于我开始执行的活动. 这种方法的缺点是:

    This way, I will ultimately process only the activity that belongs to the execution that I started. The cons of this approach are:

    • 我们无法说出最终要选择多少次尝试 正确的活动.
    • AWS根据转换次数收费
    • We cannot say how many attempts it will take to eventually pick the right activity.
    • AWS charges based on the number of transitions

    这篇关于AWS GetActivityTask从相同的状态机执行停止的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

  • 查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆