Description of the problem
I have a JMSlistener working as a thread listening on a topic. As soon as a message appears, I create a new one Threadto handle the limited message. Therefore, for each incoming message, I create a new one Thread.
I have a scenario where a duplicate message is also processed when it is entered immediately in sequential order. I need this to not be processed. I tried to use ConcurrentHashMapprocess time storage when I add to a record as soon as it Threadappears and is removed from the map as soon as it Threadcompletes its execution. But that did not help when I tried with a scenario in which I went the same one after another in parallel.
General information about my problem before plunging into the actual code base
onMessage(){
processIncomingMessage(){
ExecutorService executorService = Executors.newFixedThreadPool(1000);
if(entryisPresentInMap){
}else{
executorService.execute(new Runnable() {
@Override
public void run() {
try {
}finally{
}
}
}
}
public class DuplicateCheck {
private static Map<String,Boolean> duplicateCheckMap =
new ConcurrentHashMap<String,Boolean>(1000);
private static String name=null;
private static String[] nameArray = new String[20];
public static void processMessage(String message){
System.out.println("Processed message =" +message);
}
public static void main(String args[]){
nameArray[0] = "Peter";
nameArray[1] = "Peter";
nameArray[2] = "Adam";
for(int i=0;i<=nameArray.length;i++){
name=nameArray[i];
if(duplicateCheckMap.get(name)!=null && duplicateCheckMap.get(name)){
System.out.println("Thread detected for processing your name ="+name);
return;
}
addNameIntoMap(name);
new Thread(new Runnable() {
@Override
public void run() {
try {
processMessage(name);
} catch (Exception e) {
System.out.println(e.getMessage());
} finally {
freeNameFromMap(name);
}
}
}).start();
}
}
private static synchronized void addNameIntoMap(String name) {
if (name != null) {
duplicateCheckMap.put(name, true);
System.out.println("Thread processing the "+name+" is added to the status map");
}
}
private static synchronized void freeNameFromMap(String name) {
if (name != null) {
duplicateCheckMap.remove(name);
System.out.println("Thread processing the "+name+" is released from the status map");
}
}
public void processControlMessage(final Message message) {
RDPWorkflowControlMessage rdpWorkflowControlMessage= unmarshallControlMessage(message);
final String workflowName = rdpWorkflowControlMessage.getWorkflowName();
final String controlMessageEvent=rdpWorkflowControlMessage.getControlMessage().value();
if(controlMessageStateMap.get(workflowName)!=null && controlMessageStateMap.get(workflowName)){
log.info("Cache cleanup for the workflow :"+workflowName+" is already in progress");
return;
}else {
log.info("doing nothing");
}
Semaphore controlMessageLock = new Semaphore(1);
try{
controlMessageLock.acquire();
synchronized(this){
new Thread(new Runnable(){
@Override
public void run() {
try {
lock.lock();
log.info("Processing Workflow Control Message for the workflow :"+workflowName);
if (message instanceof TextMessage) {
if ("REFRESH".equalsIgnoreCase(controlMessageEvent)) {
clearControlMessageBuffer();
enableControlMessageStatus(workflowName);
List<String> matchingValues=new ArrayList<String>();
matchingValues.add(workflowName);
ConcreteSetDAO tasksSetDAO=taskEventListener.getConcreteSetDAO();
ConcreteSetDAO workflowSetDAO=workflowEventListener.getConcreteSetDAO();
tasksSetDAO.deleteMatchingRecords(matchingValues);
workflowSetDAO.deleteMatchingRecords(matchingValues);
fetchNewWorkflowItems();
addShutdownHook(workflowName);
}
}
} catch (Exception e) {
log.error("Error extracting item of type RDPWorkflowControlMessage from message "
+ message);
} finally {
disableControlMessageStatus(workflowName);
lock.unlock();
}
}
}).start();
}
} catch (InterruptedException ie) {
log.info("Interrupted Exception during control message lock acquisition"+ie);
}finally{
controlMessageLock.release();
}
}
private void addShutdownHook(final String workflowName) {
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
disableControlMessageStatus(workflowName);
}
});
log.info("Shut Down Hook Attached for the thread processing the workflow :"+workflowName);
}
private RDPWorkflowControlMessage unmarshallControlMessage(Message message) {
RDPWorkflowControlMessage rdpWorkflowControlMessage = null;
try {
TextMessage textMessage = (TextMessage) message;
rdpWorkflowControlMessage = marshaller.unmarshalItem(textMessage.getText(), RDPWorkflowControlMessage.class);
} catch (Exception e) {
log.error("Error extracting item of type RDPWorkflowTask from message "
+ message);
}
return rdpWorkflowControlMessage;
}
private void fetchNewWorkflowItems() {
initSSL();
List<RDPWorkflowTask> allTasks=initAllTasks();
taskEventListener.addRDPWorkflowTasks(allTasks);
workflowEventListener.updateWorkflowStatus(allTasks);
}
private void clearControlMessageBuffer() {
taskEventListener.getRecordsForUpdate().clear();
workflowEventListener.getRecordsForUpdate().clear();
}
private synchronized void enableControlMessageStatus(String workflowName) {
if (workflowName != null) {
controlMessageStateMap.put(workflowName, true);
log.info("Thread processing the "+workflowName+" is added to the status map");
}
}
private synchronized void disableControlMessageStatus(String workflowName) {
if (workflowName != null) {
controlMessageStateMap.remove(workflowName);
log.info("Thread processing the "+workflowName+" is released from the status map");
}
}
, , .
public void processControlMessage(final Message message) {
ExecutorService executorService = Executors.newFixedThreadPool(1000);
try{
lock.lock();
RDPWorkflowControlMessage rdpWorkflowControlMessage= unmarshallControlMessage(message);
final String workflowName = rdpWorkflowControlMessage.getWorkflowName();
final String controlMessageEvent=rdpWorkflowControlMessage.getControlMessage().value();
if(controlMessageStateMap.get(workflowName)!=null && controlMessageStateMap.get(workflowName)){
log.info("Cache cleanup for the workflow :"+workflowName+" is already in progress");
return;
}else {
log.info("doing nothing");
}
enableControlMessageStatus(workflowName);
executorService.execute(new Runnable() {
@Override
public void run() {
try {
fetchNewWorkflowItems();
addShutdownHook(workflowName);
}
}
} catch (Exception e) {
log.error("Error extracting item of type RDPWorkflowControlMessage from message "
+ message);
} finally {
disableControlMessageStatus(workflowName);
}
}
});
} finally {
executorService.shutdown();
lock.unlock();
}
}
private void addShutdownHook(final String workflowName) {
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
disableControlMessageStatus(workflowName);
}
});
log.info("Shut Down Hook Attached for the thread processing the workflow :"+workflowName);
}
private synchronized void enableControlMessageStatus(String workflowName) {
if (workflowName != null) {
controlMessageStateMap.put(workflowName, true);
log.info("Thread processing the "+workflowName+" is added to the status map");
}
}
private synchronized void disableControlMessageStatus(String workflowName) {
if (workflowName != null) {
controlMessageStateMap.remove(workflowName);
log.info("Thread processing the "+workflowName+" is released from the status map");
}
}