运行多个服务器时的定期作业 [英] Periodic jobs when running multiple servers

查看:64
本文介绍了运行多个服务器时的定期作业的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我打算使用Play部署应用程序,并且从未使用过他们的工作".我的部署将足够大,以要求对不同的Play服务器进行负载平衡,但是我的计算量将不足以需要hadoop/storm/others.

I'm planning to deploy an app using Play and have never used their "Jobs" before. My deploy will be large enough to require different Play servers load-balanced, but my computations won't be large enough to need hadoop/storm/others.

我的问题是,如何在Play中处理这种情况?如果我在Play中设置了要每分钟运行的作业,则我不希望每个服务器都在同一时间做完全相同的事情.

My question is, how do I handle this scenario in Play? If I set up a job in Play to run every minute, I don't want every single server to do the exact same thing at the same time.

我只能找到此答案,但是我不喜欢这些选项中的任何一个.

I could only find this answer but I don't like any of those options.

那么,是否有任何工具或最佳实践来协调工作,还是我必须从头做些事情?

So, is there any tools or best practices to coordinate jobs or do I have to do something from scratch?

推荐答案

您可以在数据库中使用表来存储jobLock,但必须在单独的事务中检查/更新此锁(必须使用JPA.newEntityManager为此)

You can use a table in your database to store a jobLock but you have to check/update this lock in a separate transactions (you have to use JPA.newEntityManager for this)

我的JobLock类使用LockMode枚举

My JobLock class uses a LockMode enum

package enums;

public enum LockMode {
    FREE, ACQUIRED;
}

这是JobLock类

package models;

import java.util.Date;
import java.util.List;

import javax.persistence.Entity;
import javax.persistence.EntityManager;
import javax.persistence.EnumType;
import javax.persistence.Enumerated;
import javax.persistence.Version;

import play.Logger;
import play.Play;
import play.data.validation.Required;
import play.db.jpa.JPA;
import play.db.jpa.Model;
import utils.Parser;
import enums.LockMode;
import exceptions.ServiceException;

/**
 * Technical class that allows to manage a lock in the database thus we can
 * synchronize multiple instances that thus cannot run the same job at the same
 * time
 * 
 * @author sebastien
 */
@Entity
public class JobLock extends Model {

    private static final Long MAX_ACQUISITION_DELAY = Parser.parseLong(Play.configuration.getProperty(
            "job.lock.acquisitiondelay", "10000"));

    @Required
    public String jobName;

    public Date acquisitionDate;

    @Required
    @Enumerated(EnumType.STRING)
    public LockMode lockMode;

    @Version
    public int version;

    // STATIC METHODS
    // ~~~~~~~~~~~~~~~

    /**
     * Acquire the lock for the type of job identified by the name parameter.
     * Acquisition of the lock is done on a separate transaction thus is
     * transaction is as small as possible and other instances will see the lock
     * acquisition sooner.
     * <p>
     * If we do not do that, the other instances will be blocked until the
     * instance that acquired the lock have finished is businees transaction
     * which could be long on a job.
     * </p>
     * 
     * @param name
     *            the name that identifies a job category, usually it is the job
     *            simple class name
     * @return the lock object if the acquisition is successfull, null otherwise
     */
    public static JobLock acquireLock(String name) {
        EntityManager em = JPA.newEntityManager();
        try {
            em.getTransaction().begin();
            List<JobLock> locks = em.createQuery("from JobLock where jobName=:name", JobLock.class)
                    .setParameter("name", name).setMaxResults(1).getResultList();
            JobLock lock = locks != null && !locks.isEmpty() ? locks.get(0) : null;
            if (lock == null) {
                lock = new JobLock();
                lock.jobName = name;
                lock.acquisitionDate = new Date();
                lock.lockMode = LockMode.ACQUIRED;
                em.persist(lock);
            } else {
                if (LockMode.ACQUIRED.equals(lock.lockMode)) {
                    if ((System.currentTimeMillis() - lock.acquisitionDate.getTime()) > MAX_ACQUISITION_DELAY) {
                        throw new ServiceException(String.format(
                                "Lock is held for too much time : there is a problem with job %s", name));
                    }
                    return null;
                }
                lock.lockMode = LockMode.ACQUIRED;
                lock.acquisitionDate = new Date();
                lock.willBeSaved = true;
            }
            em.flush();
            em.getTransaction().commit();
            return lock;
        } catch (Exception e) {
            // Do not log exception here because it is normal to have exception
            // in case of multi-node installation, this is the way to avoid
            // multiple job execution
            if (em.getTransaction().isActive()) {
                em.getTransaction().rollback();
            }
            // Maybe we have to inverse the test and to define which exception
            // is not problematic : exception that denotes concurrency in the
            // database are normal
            if (e instanceof ServiceException) {
                throw (ServiceException) e;
            } else {
                return null;
            }
        } finally {
            if (em.isOpen()) {
                em.close();
            }
        }
    }

    /**
     * Release the lock on the database thus another instance can take it. This
     * action change the {@link #lockMode} and set {@link #acquisitionDate} to
     * null. This is done in a separate transaction that can have visibility on
     * what happens on the database during the time of the business transaction
     * 
     * @param lock
     *            the lock to release
     * @return true if we managed to relase the lock and false otherwise
     */
    public static boolean releaseLock(JobLock lock) {
        EntityManager em = JPA.newEntityManager();

        if (lock == null || LockMode.FREE.equals(lock.lockMode)) {
            return false;
        }

        try {
            em.getTransaction().begin();
            lock = em.find(JobLock.class, lock.id);
            lock.lockMode = LockMode.FREE;
            lock.acquisitionDate = null;
            lock.willBeSaved = true;
            em.persist(lock);
            em.flush();
            em.getTransaction().commit();
            return true;
        } catch (Exception e) {
            if (em.getTransaction().isActive()) {
                em.getTransaction().rollback();
            }
            Logger.error(e, "Error during commit of lock release");
            return false;
        } finally {
            if (em.isOpen()) {
                em.close();
            }
        }
    }
}

这是我使用此锁的LockAwareJob

and here is my LockAwareJob that uses this lock

package jobs;

import models.JobLock;
import notifiers.ExceptionMails;
import play.Logger;
import play.jobs.Job;

public abstract class LockAwareJob<V> extends Job<V> {

    @Override
    public final void doJob() throws Exception {
        String name = this.getClass().getSimpleName();
        try {
            JobLock lock = JobLock.acquireLock(name);
            if (lock != null) {
                Logger.info("Starting %s", name);
                try {
                    doJobWithLock(lock);
                } finally {
                    if (!JobLock.releaseLock(lock)) {
                        Logger.error("Lock acquired but cannot be released for %s", name);
                    }
                    Logger.info("End of %s", name);
                }
            } else {
                Logger.info("Another node is running %s : nothing to do", name);
            }
        } catch (Exception ex) {
            ExceptionMails.exception(ex, String.format("Error while executing job %s", name));
            throw ex;
        }
    }

    @Override
    public final V doJobWithResult() throws Exception {
        String name = this.getClass().getSimpleName();
        try {
            JobLock lock = JobLock.acquireLock(name);
            if (lock != null) {
                Logger.info("Starting %s", name);
                try {
                    return resultWithLock(lock);
                } finally {
                    if (!JobLock.releaseLock(lock)) {
                        Logger.error("Lock acquired but cannot be released for %s", name);
                    }
                    Logger.info("End of %s", name);
                }
            } else {
                Logger.info("Another node is running %s : nothing to do", name);
                return resultWithoutLock();
            }
        } catch (Exception ex) {
            ExceptionMails.exception(ex, String.format("Error while executing job %s", name));
            throw ex;
        }
    }

    public void doJobWithLock(JobLock lock) throws Exception {
    }

    public V resultWithLock(JobLock lock) throws Exception {
        doJobWithLock(lock);
        return null;
    }

    public V resultWithoutLock() throws Exception {
        return null;
    }
}

在我的log4j.properties中,我添加了一条特殊行,以避免每次实例获取作业锁失败时都会出错

In my log4j.properties I add a special line to avoid having an error each time an instance failed acquiring the job lock

log4j.logger.org.hibernate.event.def.AbstractFlushingEventListener=FATAL

使用此解决方案,您还可以使用JobLock ID来存储与此作业关联的参数(例如,上次运行日期)

With this solution you can also use the JobLock id to store parameters associated with this job (last run date for example)

这篇关于运行多个服务器时的定期作业的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆