AWS GetActivityTask从相同的状态机执行停止 [英] AWS GetActivityTask beloging from the same state machine execution
问题描述
提前感谢您的时间和回复.
Thanks in advance for your time and response.
我有一个具有以下活动的AWS状态机.
I have an AWS state machine with the following activities.
- 从外部FTP服务器拉出第一个可用的数据文件
- 处理数据(处理时间可能有所不同)
- 将处理后的数据上传到另一个FTP服务器
我有一个在EC2实例中运行的Java应用程序,该实例具有3个线程,并使用如下所示的代码轮询活动. Java应用程序调用适当的工作程序以执行步骤#1,2和3的实际工作. 这里的重点是,这里的所有3个活动都应在同一服务器上进行 步骤从服务器中的文件位置写入和读取.
I have a java application running in an EC2 instance which has 3 threads and polls the activities using code as shown below. The java application invokes appropriate workers to do the actual work for steps #1,2 and 3. The important point here is that all the 3 activities here should happen in the same server as the steps write and read from a file location in the server.
我在FTP服务器上有数百个文件要处理,因此我有5个运行Java应用程序副本的Ec2服务器.
I have hundreds of files to process in the FTP server and so I have 5 Ec2 servers running copies of the java application.
现在,我开始执行5次状态机. 这样一来,我就可以在5台服务器上分配文件处理了.
Now I start 5 executions of the State machine. This would allow me the distribute the file processing across the 5 servers.
但是,我的问题是这个
如何确保 来自给定状态机执行的活动由SAME EC2实例服务器处理.
How can I ensure that Activities from a given State machine execution are handled by the SAME EC2 instance server.
我不希望给定的执行活动由不同的EC2实例处理. 在下面的代码中(来自 https://github.com/goosefraba/aws-step-function-activity-example/blob/master/src/main/java/at/goosefraba/ActivityProcessor.java ), 我看不到属于特定执行的 getActivityTask 的任何方法.
I don't want a given Execution's activities to be handled by different EC2 instances. In the code below (from https://github.com/goosefraba/aws-step-function-activity-example/blob/master/src/main/java/at/goosefraba/ActivityProcessor.java), I don't see any way to getActivityTask belonging to a particular execution.
final ClientConfiguration clientConfiguration = new ClientConfiguration();
clientConfiguration.setSocketTimeout((int) TimeUnit.SECONDS.toMillis(70));
final AWSStepFunctions client = AWSStepFunctionsClientBuilder
.standard()
.withClientConfiguration(clientConfiguration)
.build();
while (true) {
GetActivityTaskResult getActivityTaskResult =
client.getActivityTask(
new GetActivityTaskRequest().withActivityArn(getArn()));
if (getActivityTaskResult.getTaskToken() != null) {
// Do work
}
}
推荐答案
已咨询AWS技术支持,但未获得答案. 最终通过使用以下方法解决了该问题;希望这对其他人有用.
Consulted with AWS technical support but didn't get the answer. Finally solved the problem by using the following approach; hopefully this is useful for others.
-
通过提供唯一的JobId开始执行状态机 (假设是唯一的,它可以是FTPServer中文件的名称).
Start the State machine execution by supplying a unique JobId (which could be the name of the file from the FTPServer, assuming its unique).
{ "JobId":"MyUniqueJobId", }
{ "JobId": "MyUniqueJobId", }
java代码是:
final AWSStepFunctions client = AWSStepFunctionsClientBuilder
.standard()
.build();
StartExecutionRequest startExecutionRequest = new StartExecutionRequest();
// StartReportGenerator execution
startExecutionRequest.setStateMachineArn("arn:aws:states:us-east-1:xxxxxxx:stateMachine:poc");
String uuid = UUID.randomUUID().toString();
startExecutionRequest.setName(uuid);
String inputJson = "{\"JobId\":\"MyUniqueJobId\"}";
startExecutionRequest.setInput(inputJson);
client.startExecution(startExecutionRequest);
-
在检索活动时,检索json并检查其是否等于启动状态机时设置的json;如果相等,则表示我已经开始执行的活动属于该活动. 否则,向活动发送失败,并显示错误" 重试 "
Upon retrieval of the activity, retrieve the json and check if its equal to the one I set when starting the State machine.If its equal, then I have picked up the activity belonging to the execution that I started. Otherwise, send a failure to the Activity with an error "Retry"
GetActivityTaskResult getActivityTaskResult =
client.getActivityTask(
new GetActivityTaskRequest().withActivityArn("activityARN));
if (getActivityTaskResult.getTaskToken() != null) {
log.info("Kicking off {} acitivity ...", getProcessName());
String errorMessage = "";
final JsonNode json = Jackson.jsonNodeOf(getActivityTaskResult.getInput());
String jobId = json.get("JobId").textValue();
if (!jobId.equals("**MyUniqueJobId**")){
log.error("Looking to retrieve JobId MyUniqueJobId, but found " + jobId + " instead; retrying");
errorMessage = "Retry";
client.sendTaskFailure(
new SendTaskFailureRequest()
.withError(errorMessage)
.withTaskToken(getActivityTaskResult.getTaskToken()));
}
}
构造状态机json,以便如果活动失败并显示错误重试",请再次尝试相同的活动.
The state machine json is constructed such that, if the activity fails with an error of "Retry", then try the same activity again.
{
"Comment": "State machine",
"StartAt": "RunActivityOne",
"States": {
"RunActivityOne": {
"Type": "Task",
"TimeoutSeconds": 600,
"ResultPath": "$.Result",
"Resource": "arn:aws:states:us-east-1:xxxxxxx:activity:ActivityOne",
"Catch": [
{
"ErrorEquals": [
"Retry"
],
"ResultPath": "$.Result",
"Next": "RunActivityOne"
},
{
"ErrorEquals": [
"States.TaskFailed",
"States.Timeout"
],
"Next": "RunActivityOneFailure"
}
],
"Next": "RunActivityTwo"
},
"RunActivityOneFailure": {
"Type": "Fail",
"Cause": "RunActivityOneFailure",
"Error": "RunActivityOneFailure"
},
"RunActivityTwo": {
"Type": "Task",
"TimeoutSeconds": 600,
"ResultPath": "$.Result",
"Resource": "arn:aws:states:us-east-1:xxxxxxx:activity:ActivityTwo",
"Catch": [
{
"ErrorEquals": [
"Retry"
],
"ResultPath": "$.Result",
"Next": "RunActivityTwo"
},
{
"ErrorEquals": [
"States.TaskFailed",
"States.Timeout"
],
"Next": "RunActivityTwoFailure"
}
],
"End": true
},
"RunActivityTwoFailure": {
"Type": "Fail",
"Cause": "RunActivityTwoFailure",
"Error": "RunActivityTwoFailure"
}
}
}
这样,我最终将仅处理属于我开始执行的活动. 这种方法的缺点是:
This way, I will ultimately process only the activity that belongs to the execution that I started. The cons of this approach are:
- 我们无法说出最终要选择多少次尝试 正确的活动.
- AWS根据转换次数收费
- We cannot say how many attempts it will take to eventually pick the right activity.
- AWS charges based on the number of transitions
这篇关于AWS GetActivityTask从相同的状态机执行停止的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!