迭代的基于输入的活动提供了使用的AWS SWF列表 [英] Iterate over the list of activities based on the input provided using AWS SWF

查看:183
本文介绍了迭代的基于输入的活动提供了使用的AWS SWF列表的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个服务器(EC2实例)。在一台服务器(服务器1)我有5个批次,并在另一个(服务器2)我有6个批次。我裹每批进入的活动和下面的工作流程实现类给出。我想基础上,给予执行日期遍历活动(整个活动,包括服务器1和2)。例如,如果日期比当前日期较小然后,执行这两个服务器1和2的所有从给定的日期到当前日期开始的活动。如果执行日期等于当前日期,然后,执行这两个服务器1和2为当前日期的所有活动。此外,如果任何一天的活动中抛出任何异常的话,不执行活动的第二天(小于=当前日期)。

I have two servers(EC2 instances). In one server(server 1) i have 5 Batch and on another(server 2) i have 6 Batch. I wrapped each batch into activities and the work flow implementation class is given below. I want to iterate over the activities(entire activities, including server 1 and 2) based on the give execution date. For example, if the date is lesser than the current date then, execute all the activities of both server 1 and 2 starting from the given date up to the current date. If the execution date is equal to current date then, execute all the activities of both server 1 and 2 for current date. Also, if any of the activity for a day throws any exception then, don't execute the activities for the next day(<=current date).

 public class JobWorkflowImpl implements JobWorkflow{

   private DecisionContextProvider contextProvider
     = new DecisionContextProviderImpl();

   private WorkflowClock clock
     = contextProvider.getDecisionContext().getWorkflowClock();

    private BS1JobActivitiesClient bs1ActivitiesClient 
     = new BS1JobActivitiesClientImpl();
    private BS2JobActivitiesClient bs2ActivitiesClient 
     = new BS2JobActivitiesClientImpl();

    @Override
    public void executeJob(Date exedate) {
       Date today = new Date(clock.currentTimeMillis());
       Date toProcess = exedate;
       // All date manipulations are pseudocode here as I'm lazy 
       // to look up the real ones.
       Promise<Void> previousDateDone = null;
       while(toProcess <= today) {
          // Create chain of executeJobForExactDate 
          // linked by previousDateDone to ensure that they are executed sequentially.
          // null Promise is treated as ready promise by the framework.
          previousDateDone = executeJobForExactDate(toProcess, previousDateDone);
          toProcess.addDay(1);
       }
    }

    Promise<Void> void executeJobForExactDate(Date date, Promise<Void> previous) {

       Settable<Integer> firstServerDone = new Settable<Integer>();
       Settable<Integer> secondServerDone = new Settable<Integer>();

       Settable<Integer> resultODLSLBs1 = new Settable<Integer>();

       //TODO Iterate over the activities
        new TryCatchFinally(previous){

            @Override
            protected void doTry(){
                Promise<Integer> resultFARBs1 = bs1ActivitiesClient.executecommand1(date);
                Promise<Integer> resultUAMLFBs1 = bs1ActivitiesClient.executecommand2(date, resultFARBs1);
                Promise<Integer> resultLLPBs1 = bs1ActivitiesClient.executecommand3(date, resultUAMLFBs1);
                Promise<Integer> resultODLDLBs1 = bs1ActivitiesClient.executecommand4(date, resultLLPBs1);
                // Chain links result of the activity execution 
                // to an aready existing Settable.
                resultODLSLBs1.chain(bs1ActivitiesClient.executecommand5(date, resultODLDLBs1));
            }

            @Override
            protected void doCatch(Throwable e){
               throw new MyException("Failed");
            }

            @Override
            protected void doFinally() throws Throwable {
                firstServerDone.set(null);
            }
        };

        new TryCatchFinally(previous){

            @Override
            protected void doTry()  {
                Promise<Integer> resultFARBs2 = bs2ActivitiesClient.executecommand1(date);
                Promise<Integer> resultUAMLFBs2 = bs2ActivitiesClient.executecommand2(date, resultFARBs2);
                Promise<Integer> resultLLPBs2 = bs2ActivitiesClient.executecommand3(date, resultUAMLFBs2);
                Promise<Integer> resultODLDLBs2 = bs2ActivitiesClient.executecommand4(date, resultLLPBs2);
                Promise<Integer> resultODLSLBs2 = bs2ActivitiesClient.executecommand5(date, resultODLDLBs2, resultODLSLBs1);
                bs2ActivitiesClient.executecommand6(date, resultODLSLBs2);
            }

            @Override
            protected void doCatch(Throwable e){
                throw new MyException("Failed");
            }

            @Override
            protected void doFinally(){
                secondServerDone.set(null);
            }

        };
        // AndPromise is done when all of its constructor parameters are done.
        // I decided to consider the date processing done when both 
        // TryCatchFinallies are done. You can implement more complex logic depending on
        // the business requirements. One option is to wrap both TryCatcFinallies 
        // in another TryCatchFinally.
        return new AndPromise(firstServerDone, secondServerDone);
    }
}

的问题在于,如果服务器1的任何活动抛出任何异常则,它正在取消所有尚未开始用于服务器1和服务器2的活性,但我想只尚未内执行的活动服务器应被取消作为它自己的服务器活动失败,则其他服务器应当继续尽可能(即地方达它是相关的)。

The problem is that, if any activity of server 1 throws any exception then, it is cancelling the all the activity that have not started for both server 1 and server 2. But I want only the activity that have not executed within a server should get cancelled as its own server activity has failed, the other server should continue as far as possible(i.e. the place up to which it is dependent).

推荐答案

最棘手的部分是,流程框架要求您使用,它提供了通过的决定范围内的时钟。否则,它不会正常工作。剩下的只是异步Java编程。

The tricky part is that the Flow Framework requires that you use the Clock that it provides through the decision context. Otherwise it is not going to function correctly. The rest is just asynchronous Java programming.

public class JobWorkflowImpl implements JobWorkflow{

   private DecisionContextProvider contextProvider
     = new DecisionContextProviderImpl();

   private WorkflowClock clock
     = contextProvider.getDecisionContext().getWorkflowClock();

    private BS1JobActivitiesClient bs1ActivitiesClient 
     = new BS1JobActivitiesClientImpl();
    private BS2JobActivitiesClient bs2ActivitiesClient 
     = new BS2JobActivitiesClientImpl();

    @Override
    public void executeJob(Date exedate) {
       Date today = new Date(clock.currentTimeMillis());
       Date toProcess = exedate;
       // All date manipulations are pseudocode here as I'm lazy 
       // to look up the real ones.
       Promise<Void> previousDateDone = null;
       while(toProcess <= today) {
          // Create chain of executeJobForExactDate 
          // linked by previousDateDone to ensure that they are executed sequentially.
          // null Promise is treated as ready promise by the framework.
          previousDateDone = executeJobForExactDate(toProcess, previousDateDone);
          toProcess.addDay(1);
       }
    }

    Promise<Void> void executeJobForExactDate(Date date, Promise<Void> previous) {

       Settable<Integer> firstServerDone = new Settable<Integer>();
       Settable<Integer> secondServerDone = new Settable<Integer>();

       Settable<Integer> resultODLSLBs1 = new Settable<Integer>();

       //TODO Iterate over the activities
        new TryCatchFinally(previous){

            @Override
            protected void doTry(){
                Promise<Integer> resultFARBs1 = bs1ActivitiesClient.executecommand1(date);
                Promise<Integer> resultUAMLFBs1 = bs1ActivitiesClient.executecommand2(date, resultFARBs1);
                Promise<Integer> resultLLPBs1 = bs1ActivitiesClient.executecommand3(date, resultUAMLFBs1);
                Promise<Integer> resultODLDLBs1 = bs1ActivitiesClient.executecommand4(date, resultLLPBs1);
                // Chain links result of the activity execution 
                // to an aready existing Settable.
                resultODLSLBs1.chain(bs1ActivitiesClient.executecommand5(date, resultODLDLBs1));
            }

            @Override
            protected void doCatch(Throwable e){
                System.out.println("Failed to execute BS1 daily job");
            }

            @Override
            protected void doFinally() throws Throwable {
                firstServerDone.set(null);
            }
        };

        new TryCatchFinally(previous){

            @Override
            protected void doTry()  {
                Promise<Integer> resultFARBs2 = bs2ActivitiesClient.executecommand1(date);
                Promise<Integer> resultUAMLFBs2 = bs2ActivitiesClient.executecommand2(date, resultFARBs2);
                Promise<Integer> resultLLPBs2 = bs2ActivitiesClient.executecommand3(date, resultUAMLFBs2);
                Promise<Integer> resultODLDLBs2 = bs2ActivitiesClient.executecommand4(date, resultLLPBs2);
                Promise<Integer> resultODLSLBs2 = bs2ActivitiesClient.executecommand5(date, resultODLDLBs2, resultODLSLBs1);
                bs2ActivitiesClient.executecommand6(date, resultODLSLBs2);
            }

            @Override
            protected void doCatch(Throwable e){
                System.out.println("Failed to execute BS2 daily job");
            }

            @Override
            protected void doFinally(){
                secondServerDone.set(null);
            }

        };
        // AndPromise is done when all of its constructor parameters are done.
        // I decided to consider the date processing done when both 
        // TryCatchFinallies are done. You can implement more complex logic depending on
        // the business requirements. One option is to wrap both TryCatcFinallies 
        // in another TryCatchFinally.
        return new AndPromise(firstServerDone, secondServerDone);
    }
}

这篇关于迭代的基于输入的活动提供了使用的AWS SWF列表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆