节点关闭后,hazelcast ScheduledExecutorService丢失了任务 [英] hazelcast ScheduledExecutorService lost tasks after node shutdown

查看:148
本文介绍了节点关闭后,hazelcast ScheduledExecutorService丢失了任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用hazelcast ScheduledExecutorService执行一些定期任务.我正在使用hazelcast 3.8.1.

I'm trying to use hazelcast ScheduledExecutorService to execute some periodic tasks. I'm using hazelcast 3.8.1.

我先启动一个节点,然后再启动另一个节点,然后任务在两个节点之间分配并正确执行.

I start one node and then the other, and the tasks are distributed between both nodes and properly executed.

如果我关闭第一个节点,那么第二个节点将开始执行以前在第一个节点上的定期任务.

If I shutdown the first node, then the second one will start to execute the periodic tasks that were previously on the first node.

问题在于,如果我停止第二个节点而不是第一个节点,则它的任务不会重新安排到第一个节点.即使我有更多节点,也会发生这种情况.如果我关闭最后一个节点以接收任务,这些任务将丢失.

The problem is that, if I stop the second node instead of the first, then its tasks are not rescheduled to the first one. This happens even if I have more nodes. If I shutdown the last node to receive tasks, those tasks are lost.

关机总是通过ctrl + c

The shutdown is always done with ctrl+c

我创建了一个测试应用程序,其中包含一些来自hazelcast示例的示例代码以及一些我在网络上找到的代码.我启动了这个应用程序的两个实例.

I've created a test application, with some sample code from hazelcast examples and with some pieces of code I've found on the web. I start two instances of this app.

public class MasterMember {

/**
 * The constant LOG.
 */
final static Logger logger = LoggerFactory.getLogger(MasterMember.class);

public static void main(String[] args) throws Exception {

    Config config = new Config();
    config.setProperty("hazelcast.logging.type", "slf4j");
    config.getScheduledExecutorConfig("scheduler").
    setPoolSize(16).setCapacity(100).setDurability(1);

    final HazelcastInstance instance = Hazelcast.newHazelcastInstance(config);

    Runtime.getRuntime().addShutdownHook(new Thread() {

        HazelcastInstance threadInstance = instance;

        @Override
        public void run() {
            logger.info("Application shutdown");

            for (int i = 0; i < 12; i++) {
                logger.info("Verifying whether it is safe to close this instance");
                boolean isSafe = getResultsForAllInstances(hzi -> {
                    if (hzi.getLifecycleService().isRunning()) {
                        return hzi.getPartitionService().forceLocalMemberToBeSafe(10, TimeUnit.SECONDS);
                    }
                    return true;
                });

                if (isSafe) {
                    logger.info("Verifying whether cluster is safe.");
                    isSafe = getResultsForAllInstances(hzi -> {
                        if (hzi.getLifecycleService().isRunning()) {
                            return hzi.getPartitionService().isClusterSafe();
                        }
                        return true;
                    });

                    if (isSafe) {
                        System.out.println("is safe.");
                        break;
                    }
                }

                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }

            threadInstance.shutdown();

        }

        private boolean getResultsForAllInstances(
                Function<HazelcastInstance, Boolean> hazelcastInstanceBooleanFunction) {

            return Hazelcast.getAllHazelcastInstances().stream().map(hazelcastInstanceBooleanFunction).reduce(true,
                    (old, next) -> old && next);
        }
    });

    new Thread(() -> {

        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        IScheduledExecutorService scheduler = instance.getScheduledExecutorService("scheduler");
        scheduler.scheduleAtFixedRate(named("1", new EchoTask("1")), 5, 10, TimeUnit.SECONDS);
        scheduler.scheduleAtFixedRate(named("2", new EchoTask("2")), 5, 10, TimeUnit.SECONDS);
        scheduler.scheduleAtFixedRate(named("3", new EchoTask("3")), 5, 10, TimeUnit.SECONDS);
        scheduler.scheduleAtFixedRate(named("4", new EchoTask("4")), 5, 10, TimeUnit.SECONDS);
        scheduler.scheduleAtFixedRate(named("5", new EchoTask("5")), 5, 10, TimeUnit.SECONDS);
        scheduler.scheduleAtFixedRate(named("6", new EchoTask("6")), 5, 10, TimeUnit.SECONDS);
    }).start();

    new Thread(() -> {

        try {
            // delays init
            Thread.sleep(20000);

            while (true) {

                IScheduledExecutorService scheduler = instance.getScheduledExecutorService("scheduler");
                final Map<Member, List<IScheduledFuture<Object>>> allScheduledFutures =
                        scheduler.getAllScheduledFutures();

                // check if the subscription already exists as a task, if so, stop it
                for (final List<IScheduledFuture<Object>> entry : allScheduledFutures.values()) {
                    for (final IScheduledFuture<Object> objectIScheduledFuture : entry) {
                        logger.info(
                                "TaskStats: name {} isDone() {} isCanceled() {} total runs {} delay (sec) {} other statistics {} ",
                                objectIScheduledFuture.getHandler().getTaskName(), objectIScheduledFuture.isDone(),
                                objectIScheduledFuture.isCancelled(),
                                objectIScheduledFuture.getStats().getTotalRuns(),
                                objectIScheduledFuture.getDelay(TimeUnit.SECONDS),
                                objectIScheduledFuture.getStats());
                    }
                }

                Thread.sleep(15000);

            }

        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }).start();

    while (true) {
        Thread.sleep(1000);
    }
    // Hazelcast.shutdownAll();
}
}

和任务

public class EchoTask implements Runnable, Serializable {

/**
 * serialVersionUID
 */
private static final long serialVersionUID = 5505122140975508363L;

final Logger logger = LoggerFactory.getLogger(EchoTask.class);

private final String msg;

public EchoTask(String msg) {
    this.msg = msg;
}

@Override
public void run() {
    logger.info("--> " + msg);
}
}

我做错了什么?

预先感谢

-编辑-

修改(并在上面更新)代码以使用日志而不是system.out.添加了任务统计信息的记录和Config对象的固定用法.

Modified (and updated above) the code to use log instead of system.out. Added logging of task statistics and fixed usage of the Config object.

日志:

Node1_log

Node2_log

忘记提及我要等到所有任务都在第一个节点上运行之后,再开始第二个任务.

Forgot to mention that I wait until all the task are running in the first node before starting the second one.

推荐答案

Bruno,感谢您的举报,这确实是一个错误.不幸的是,对于多个节点,它并没有那么明显,而只有两个.如您所见,它不会丢失任务,而是在迁移后将其取消.但是,您的修复并不安全,因为可以取消某个Task,并且同时具有一个空的Future,例如.当您取消主副本时,从未有过的备份只会得到结果.该修补程序与您所做的非常接近,因此在 migrationMode 中的 prepareForReplication()中,我们避免设置结果.我将很快为此修复,仅运行更多测试.此版本将在主版本和更高版本中可用.

Bruno, thanks for reporting this, and it really is a bug. Unfortunately it was not so obvious with multiple nodes as it is with just two. As you figured by your answer its not losing the task, but rather keep it cancelled after a migration. Your fix, however is not safe because a Task can be cancelled and have null Future at the same time, eg. when you cancel the master replica, the backup which never had a future, just gets the result. The fix is very close to what you did, so in the prepareForReplication() when in migrationMode we avoid setting the result. I will push a fix for that shortly, just running a few more tests. This will be available in master and later versions.

如果您不介意,我记录了您的发现问题, https://github.com/hazelcast/hazelcast/issues/10603 ,您可以在那里跟踪其状态.

I logged an issue with your finding, if you don't mind, https://github.com/hazelcast/hazelcast/issues/10603 you can keep track of its status there.

这篇关于节点关闭后,hazelcast ScheduledExecutorService丢失了任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆