防止线程在java中重复处理 [英] Preventing thread from duplicate processing in java
问题描述
我有一个 JMS
监听器作为一个主题收听的线程运行。一旦收到消息,我就会生成一个新的 Thread
来处理有界消息。因此,对于每个传入的消息,我产生了一个新的线程
。
我有一个场景,当重复消息在顺序注入时也会被处理订购。我需要防止这个被处理。我尝试使用 ConcurrentHashMap
来保存处理时间,只要 Thread
生成并删除,我就会在该处理中添加它很快从地图线程
完成执行。但是当我尝试使用同时以同时方式传递相同内容的场景时,它没有帮助。
I have a JMS
listener running as a thread listening to a topic. As soon a message comes in, I spawn a new Thread
to process the in-bounded message. So for each incoming message I spawn a new Thread
.
I have a scenario where duplicate message is also being processed when it is injected immediately in a sequential order. I need to prevent this from being processed. I tried using a ConcurrentHashMap
to hold the process times where I add in the entry as soon as Thread
is spawn and remove it from the map as soon Thread
completes its execution. But it did not help when I tried with the scenario where I passed in same one after the another in concurrent fashion.
在你进入实际情况之前我的问题的概要代码库
General Outline of my issue before you plunge into the actual code base
onMessage(){
processIncomingMessage(){
ExecutorService executorService = Executors.newFixedThreadPool(1000);
//Map is used to make an entry before i spawn a new thread to process incoming message
//Map contains "Key as the incoming message" and "value as boolean"
//check map for duplicate check
//The below check is failing and allowing duplicate messages to be processed in parallel
if(entryisPresentInMap){
//return doing nothing
}else{
//spawn a new thread for each incoming message
//also ensure a duplicate message being processed when it in process by an active thread
executorService.execute(new Runnable() {
@Override
public void run() {
try {
//actuall business logic
}finally{
//remove entry from the map so after processing is done with the message
}
}
}
}
独立示例模仿场景
public class DuplicateCheck {
private static Map<String,Boolean> duplicateCheckMap =
new ConcurrentHashMap<String,Boolean>(1000);
private static String name=null;
private static String[] nameArray = new String[20];
public static void processMessage(String message){
System.out.println("Processed message =" +message);
}
public static void main(String args[]){
nameArray[0] = "Peter";
nameArray[1] = "Peter";
nameArray[2] = "Adam";
for(int i=0;i<=nameArray.length;i++){
name=nameArray[i];
if(duplicateCheckMap.get(name)!=null && duplicateCheckMap.get(name)){
System.out.println("Thread detected for processing your name ="+name);
return;
}
addNameIntoMap(name);
new Thread(new Runnable() {
@Override
public void run() {
try {
processMessage(name);
} catch (Exception e) {
System.out.println(e.getMessage());
} finally {
freeNameFromMap(name);
}
}
}).start();
}
}
private static synchronized void addNameIntoMap(String name) {
if (name != null) {
duplicateCheckMap.put(name, true);
System.out.println("Thread processing the "+name+" is added to the status map");
}
}
private static synchronized void freeNameFromMap(String name) {
if (name != null) {
duplicateCheckMap.remove(name);
System.out.println("Thread processing the "+name+" is released from the status map");
}
}
代码片段低于
Snippet of the code is below
public void processControlMessage(final Message message) {
RDPWorkflowControlMessage rdpWorkflowControlMessage= unmarshallControlMessage(message);
final String workflowName = rdpWorkflowControlMessage.getWorkflowName();
final String controlMessageEvent=rdpWorkflowControlMessage.getControlMessage().value();
if(controlMessageStateMap.get(workflowName)!=null && controlMessageStateMap.get(workflowName)){
log.info("Cache cleanup for the workflow :"+workflowName+" is already in progress");
return;
}else {
log.info("doing nothing");
}
Semaphore controlMessageLock = new Semaphore(1);
try{
controlMessageLock.acquire();
synchronized(this){
new Thread(new Runnable(){
@Override
public void run() {
try {
lock.lock();
log.info("Processing Workflow Control Message for the workflow :"+workflowName);
if (message instanceof TextMessage) {
if ("REFRESH".equalsIgnoreCase(controlMessageEvent)) {
clearControlMessageBuffer();
enableControlMessageStatus(workflowName);
List<String> matchingValues=new ArrayList<String>();
matchingValues.add(workflowName);
ConcreteSetDAO tasksSetDAO=taskEventListener.getConcreteSetDAO();
ConcreteSetDAO workflowSetDAO=workflowEventListener.getConcreteSetDAO();
tasksSetDAO.deleteMatchingRecords(matchingValues);
workflowSetDAO.deleteMatchingRecords(matchingValues);
fetchNewWorkflowItems();
addShutdownHook(workflowName);
}
}
} catch (Exception e) {
log.error("Error extracting item of type RDPWorkflowControlMessage from message "
+ message);
} finally {
disableControlMessageStatus(workflowName);
lock.unlock();
}
}
}).start();
}
} catch (InterruptedException ie) {
log.info("Interrupted Exception during control message lock acquisition"+ie);
}finally{
controlMessageLock.release();
}
}
private void addShutdownHook(final String workflowName) {
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
disableControlMessageStatus(workflowName);
}
});
log.info("Shut Down Hook Attached for the thread processing the workflow :"+workflowName);
}
private RDPWorkflowControlMessage unmarshallControlMessage(Message message) {
RDPWorkflowControlMessage rdpWorkflowControlMessage = null;
try {
TextMessage textMessage = (TextMessage) message;
rdpWorkflowControlMessage = marshaller.unmarshalItem(textMessage.getText(), RDPWorkflowControlMessage.class);
} catch (Exception e) {
log.error("Error extracting item of type RDPWorkflowTask from message "
+ message);
}
return rdpWorkflowControlMessage;
}
private void fetchNewWorkflowItems() {
initSSL();
List<RDPWorkflowTask> allTasks=initAllTasks();
taskEventListener.addRDPWorkflowTasks(allTasks);
workflowEventListener.updateWorkflowStatus(allTasks);
}
private void clearControlMessageBuffer() {
taskEventListener.getRecordsForUpdate().clear();
workflowEventListener.getRecordsForUpdate().clear();
}
private synchronized void enableControlMessageStatus(String workflowName) {
if (workflowName != null) {
controlMessageStateMap.put(workflowName, true);
log.info("Thread processing the "+workflowName+" is added to the status map");
}
}
private synchronized void disableControlMessageStatus(String workflowName) {
if (workflowName != null) {
controlMessageStateMap.remove(workflowName);
log.info("Thread processing the "+workflowName+" is released from the status map");
}
}
我修改了我的代码以包含以下提供的建议但是仍然无法正常工作
I have modified my code to incorporate suggestions provided below but still it is not working
public void processControlMessage(final Message message) {
ExecutorService executorService = Executors.newFixedThreadPool(1000);
try{
lock.lock();
RDPWorkflowControlMessage rdpWorkflowControlMessage= unmarshallControlMessage(message);
final String workflowName = rdpWorkflowControlMessage.getWorkflowName();
final String controlMessageEvent=rdpWorkflowControlMessage.getControlMessage().value();
if(controlMessageStateMap.get(workflowName)!=null && controlMessageStateMap.get(workflowName)){
log.info("Cache cleanup for the workflow :"+workflowName+" is already in progress");
return;
}else {
log.info("doing nothing");
}
enableControlMessageStatus(workflowName);
executorService.execute(new Runnable() {
@Override
public void run() {
try {
//actual code
fetchNewWorkflowItems();
addShutdownHook(workflowName);
}
}
} catch (Exception e) {
log.error("Error extracting item of type RDPWorkflowControlMessage from message "
+ message);
} finally {
disableControlMessageStatus(workflowName);
}
}
});
} finally {
executorService.shutdown();
lock.unlock();
}
}
private void addShutdownHook(final String workflowName) {
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
disableControlMessageStatus(workflowName);
}
});
log.info("Shut Down Hook Attached for the thread processing the workflow :"+workflowName);
}
private synchronized void enableControlMessageStatus(String workflowName) {
if (workflowName != null) {
controlMessageStateMap.put(workflowName, true);
log.info("Thread processing the "+workflowName+" is added to the status map");
}
}
private synchronized void disableControlMessageStatus(String workflowName) {
if (workflowName != null) {
controlMessageStateMap.remove(workflowName);
log.info("Thread processing the "+workflowName+" is released from the status map");
}
}
推荐答案
问题现在已解决。非常感谢@awsome的方法。当线程已经在处理传入的重复消息时,它避免重复。如果没有线程正在处理,那么它就会被接收
The issue is fixed now. Many thanks to @awsome for the approach. It is avoiding the duplicates when a thread is already processing the incoming duplicate message. If no thread is processing then it gets picked up
public void processControlMessage(final Message message) {
try {
lock.lock();
RDPWorkflowControlMessage rdpWorkflowControlMessage = unmarshallControlMessage(message);
final String workflowName = rdpWorkflowControlMessage.getWorkflowName();
final String controlMessageEvent = rdpWorkflowControlMessage.getControlMessage().value();
new Thread(new Runnable() {
@Override
public void run() {
try {
if (message instanceof TextMessage) {
if ("REFRESH".equalsIgnoreCase(controlMessageEvent)) {
if (tryAddingWorkflowNameInStatusMap(workflowName)) {
log.info("Processing Workflow Control Message for the workflow :"+ workflowName);
addShutdownHook(workflowName);
clearControlMessageBuffer();
List<String> matchingValues = new ArrayList<String>();
matchingValues.add(workflowName);
ConcreteSetDAO tasksSetDAO = taskEventListener.getConcreteSetDAO();
ConcreteSetDAO workflowSetDAO = workflowEventListener.getConcreteSetDAO();
tasksSetDAO.deleteMatchingRecords(matchingValues);
workflowSetDAO.deleteMatchingRecords(matchingValues);
List<RDPWorkflowTask> allTasks=fetchNewWorkflowItems(workflowName);
updateTasksAndWorkflowSet(allTasks);
removeWorkflowNameFromProcessingMap(workflowName);
} else {
log.info("Cache clean up is already in progress for the workflow ="+ workflowName);
return;
}
}
}
} catch (Exception e) {
log.error("Error extracting item of type RDPWorkflowControlMessage from message "
+ message);
}
}
}).start();
} finally {
lock.unlock();
}
}
private boolean tryAddingWorkflowNameInStatusMap(final String workflowName) {
if(controlMessageStateMap.get(workflowName)==null){
synchronized (this) {
if(controlMessageStateMap.get(workflowName)==null){
log.info("Adding an entry in to the map for the workflow ="+workflowName);
controlMessageStateMap.put(workflowName, true);
return true;
}
}
}
return false;
}
private synchronized void removeWorkflowNameFromProcessingMap(String workflowName) {
if (workflowName != null
&& (controlMessageStateMap.get(workflowName) != null && controlMessageStateMap
.get(workflowName))) {
controlMessageStateMap.remove(workflowName);
log.info("Thread processing the " + workflowName+ " is released from the status map");
}
}
这篇关于防止线程在java中重复处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!