防止线程在java中重复处理 [英] Preventing thread from duplicate processing in java

查看:175
本文介绍了防止线程在java中重复处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 JMS 监听器作为一个主题收听的线程运行。一旦收到消息,我就会生成一个新的 Thread 来处理有界消息。因此,对于每个传入的消息,我产生了一个新的线程

我有一个场景,当重复消息在顺序注入时也会被处理订购。我需要防止这个被处理。我尝试使用 ConcurrentHashMap 来保存处理时间,只要 Thread 生成并删除,我就会在该处理中添加它很快从地图线程完成执行。但是当我尝试使用同时以同时方式传递相同内容的场景时,它没有帮助。

I have a JMS listener running as a thread listening to a topic. As soon a message comes in, I spawn a new Thread to process the in-bounded message. So for each incoming message I spawn a new Thread.
I have a scenario where duplicate message is also being processed when it is injected immediately in a sequential order. I need to prevent this from being processed. I tried using a ConcurrentHashMap to hold the process times where I add in the entry as soon as Thread is spawn and remove it from the map as soon Thread completes its execution. But it did not help when I tried with the scenario where I passed in same one after the another in concurrent fashion.

在你进入实际情况之前我的问题的概要代码库

General Outline of my issue before you plunge into the actual code base

onMessage(){
    processIncomingMessage(){
        ExecutorService executorService = Executors.newFixedThreadPool(1000);
            //Map is used to make an entry before i spawn a new thread to process incoming message
            //Map contains "Key as the incoming message" and "value as boolean"
            //check map for duplicate check
            //The below check is failing and allowing duplicate messages to be processed in parallel
        if(entryisPresentInMap){ 
                //return doing nothing
        }else{
                //spawn a new thread for each incoming message
                //also ensure a duplicate message being processed when it in process by an active thread
        executorService.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        //actuall business logic
                    }finally{
                        //remove entry from the map so after processing is done with the message
                    }

                }
        }
    }

独立示例模仿场景

public class DuplicateCheck {

private static Map<String,Boolean> duplicateCheckMap =
        new ConcurrentHashMap<String,Boolean>(1000);

private static String name=null;
private static String[] nameArray = new String[20];
public static void processMessage(String message){
    System.out.println("Processed message =" +message);

}

public static void main(String args[]){
    nameArray[0] = "Peter";
    nameArray[1] = "Peter";
    nameArray[2] = "Adam";
    for(int i=0;i<=nameArray.length;i++){
    name=nameArray[i];
    if(duplicateCheckMap.get(name)!=null  && duplicateCheckMap.get(name)){
        System.out.println("Thread detected for processing your name ="+name);
        return;
    }
    addNameIntoMap(name);
    new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                processMessage(name);
            } catch (Exception e) {
                System.out.println(e.getMessage());
            } finally {
                freeNameFromMap(name);
            }
        }
    }).start();
    }
}

private static synchronized void addNameIntoMap(String name) {
    if (name != null) {
        duplicateCheckMap.put(name, true);
        System.out.println("Thread processing the "+name+" is added to the status map");
    }
}

private static synchronized void freeNameFromMap(String name) {
    if (name != null) {
        duplicateCheckMap.remove(name);
        System.out.println("Thread processing the "+name+" is released from the status map");
    }
}



代码片段低于



Snippet of the code is below

public void processControlMessage(final Message message) {
    RDPWorkflowControlMessage rdpWorkflowControlMessage=    unmarshallControlMessage(message);
    final String workflowName = rdpWorkflowControlMessage.getWorkflowName();
    final String controlMessageEvent=rdpWorkflowControlMessage.getControlMessage().value();
    if(controlMessageStateMap.get(workflowName)!=null && controlMessageStateMap.get(workflowName)){
        log.info("Cache cleanup for the workflow :"+workflowName+" is already in progress");
        return;
    }else {
        log.info("doing nothing");
    }
    Semaphore controlMessageLock = new Semaphore(1); 
    try{
    controlMessageLock.acquire();
    synchronized(this){
        new Thread(new Runnable(){
            @Override
            public void run() {
                try {
                    lock.lock();
                    log.info("Processing Workflow Control Message for the workflow :"+workflowName);
                    if (message instanceof TextMessage) {
                    if ("REFRESH".equalsIgnoreCase(controlMessageEvent)) {
                        clearControlMessageBuffer();
                        enableControlMessageStatus(workflowName);
                        List<String> matchingValues=new ArrayList<String>();
                        matchingValues.add(workflowName);
                        ConcreteSetDAO tasksSetDAO=taskEventListener.getConcreteSetDAO();
                        ConcreteSetDAO workflowSetDAO=workflowEventListener.getConcreteSetDAO();
                        tasksSetDAO.deleteMatchingRecords(matchingValues);
                        workflowSetDAO.deleteMatchingRecords(matchingValues);
                        fetchNewWorkflowItems();
                        addShutdownHook(workflowName);
                        }
                    }
                } catch (Exception e) {
                    log.error("Error extracting item of type RDPWorkflowControlMessage from message "
                            + message);
                } finally {
                    disableControlMessageStatus(workflowName);
                    lock.unlock();
                }
            }
        }).start();
    }
    } catch (InterruptedException ie) {
        log.info("Interrupted Exception during control message lock acquisition"+ie);
    }finally{
        controlMessageLock.release();
    }
}

private void addShutdownHook(final String workflowName) {
    Runtime.getRuntime().addShutdownHook(new Thread() {
        public void run() {
            disableControlMessageStatus(workflowName);
        }
    });
    log.info("Shut Down Hook Attached for the thread processing the workflow :"+workflowName);
}

private RDPWorkflowControlMessage unmarshallControlMessage(Message message) {
    RDPWorkflowControlMessage rdpWorkflowControlMessage = null;
    try {
        TextMessage textMessage = (TextMessage) message;
        rdpWorkflowControlMessage = marshaller.unmarshalItem(textMessage.getText(), RDPWorkflowControlMessage.class);
    } catch (Exception e) {
        log.error("Error extracting item of type RDPWorkflowTask from message "
                + message);
    }
    return rdpWorkflowControlMessage;
}

private void fetchNewWorkflowItems() {
    initSSL();
    List<RDPWorkflowTask> allTasks=initAllTasks();
    taskEventListener.addRDPWorkflowTasks(allTasks);
    workflowEventListener.updateWorkflowStatus(allTasks);
}

private void clearControlMessageBuffer() {
    taskEventListener.getRecordsForUpdate().clear();
    workflowEventListener.getRecordsForUpdate().clear();
}

private synchronized void enableControlMessageStatus(String workflowName) {
    if (workflowName != null) {
        controlMessageStateMap.put(workflowName, true);
        log.info("Thread processing the "+workflowName+" is added to the status map");
    }
}

private synchronized void disableControlMessageStatus(String workflowName) {
    if (workflowName != null) {
        controlMessageStateMap.remove(workflowName);
        log.info("Thread processing the "+workflowName+" is released from the status map");
    }
}

我修改了我的代码以包含以下提供的建议但是仍然无法正常工作

I have modified my code to incorporate suggestions provided below but still it is not working

public void processControlMessage(final Message message) {
    ExecutorService executorService = Executors.newFixedThreadPool(1000);
    try{
        lock.lock();
        RDPWorkflowControlMessage rdpWorkflowControlMessage=    unmarshallControlMessage(message);
        final String workflowName = rdpWorkflowControlMessage.getWorkflowName();
        final String controlMessageEvent=rdpWorkflowControlMessage.getControlMessage().value();
        if(controlMessageStateMap.get(workflowName)!=null && controlMessageStateMap.get(workflowName)){
            log.info("Cache cleanup for the workflow :"+workflowName+" is already in progress");
            return;
        }else {
            log.info("doing nothing");
        }
        enableControlMessageStatus(workflowName);
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    //actual code
                        fetchNewWorkflowItems();
                        addShutdownHook(workflowName);
                        }
                    }
                } catch (Exception e) {
                    log.error("Error extracting item of type RDPWorkflowControlMessage from message "
                            + message);
                } finally {
                    disableControlMessageStatus(workflowName);
                }
            }
        });
    } finally {
        executorService.shutdown();
        lock.unlock();
    }
}

private void addShutdownHook(final String workflowName) {
    Runtime.getRuntime().addShutdownHook(new Thread() {
        public void run() {
            disableControlMessageStatus(workflowName);
        }
    });
    log.info("Shut Down Hook Attached for the thread processing the workflow :"+workflowName);
}

private synchronized void enableControlMessageStatus(String workflowName) {
    if (workflowName != null) {
        controlMessageStateMap.put(workflowName, true);
        log.info("Thread processing the "+workflowName+" is added to the status map");
    }
}

private synchronized void disableControlMessageStatus(String workflowName) {
    if (workflowName != null) {
        controlMessageStateMap.remove(workflowName);
        log.info("Thread processing the "+workflowName+" is released from the status map");
    }
}


推荐答案

问题现在已解决。非常感谢@awsome的方法。当线程已经在处理传入的重复消息时,它避免重复。如果没有线程正在处理,那么它就会被接收

The issue is fixed now. Many thanks to @awsome for the approach. It is avoiding the duplicates when a thread is already processing the incoming duplicate message. If no thread is processing then it gets picked up

public void processControlMessage(final Message message) {
    try {
        lock.lock();
        RDPWorkflowControlMessage rdpWorkflowControlMessage = unmarshallControlMessage(message);
        final String workflowName = rdpWorkflowControlMessage.getWorkflowName();
        final String controlMessageEvent = rdpWorkflowControlMessage.getControlMessage().value();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    if (message instanceof TextMessage) {
                        if ("REFRESH".equalsIgnoreCase(controlMessageEvent)) {
                            if (tryAddingWorkflowNameInStatusMap(workflowName)) {
                                log.info("Processing Workflow Control Message for the workflow :"+ workflowName);
                                addShutdownHook(workflowName);
                                clearControlMessageBuffer();
                                List<String> matchingValues = new ArrayList<String>();
                                matchingValues.add(workflowName);
                                ConcreteSetDAO tasksSetDAO = taskEventListener.getConcreteSetDAO();
                                ConcreteSetDAO workflowSetDAO = workflowEventListener.getConcreteSetDAO();
                                tasksSetDAO.deleteMatchingRecords(matchingValues);
                                workflowSetDAO.deleteMatchingRecords(matchingValues);
                                List<RDPWorkflowTask> allTasks=fetchNewWorkflowItems(workflowName);
                                updateTasksAndWorkflowSet(allTasks);
                                removeWorkflowNameFromProcessingMap(workflowName);

                            } else {
                                log.info("Cache clean up is already in progress for the workflow ="+ workflowName);
                                return;
                            }
                        }
                    }
                } catch (Exception e) {
                    log.error("Error extracting item of type RDPWorkflowControlMessage from message "
                            + message);
                }
            }
        }).start();
    } finally {
        lock.unlock();
    }
}

private boolean tryAddingWorkflowNameInStatusMap(final String workflowName) {
    if(controlMessageStateMap.get(workflowName)==null){
        synchronized (this) {
             if(controlMessageStateMap.get(workflowName)==null){
                 log.info("Adding an entry in to the map for the workflow ="+workflowName);
                 controlMessageStateMap.put(workflowName, true);
                 return true;
             }
        }
    }
    return false;
}

private synchronized void removeWorkflowNameFromProcessingMap(String workflowName) {
    if (workflowName != null
            && (controlMessageStateMap.get(workflowName) != null && controlMessageStateMap
                    .get(workflowName))) {
        controlMessageStateMap.remove(workflowName);
        log.info("Thread processing the " + workflowName+ " is released from the status map");
    }
}

这篇关于防止线程在java中重复处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆