1 /**
2 * Copyright 2005-2006 the original author or authors.
3 *
4 * Licensed under the Gnu General Pubic License, Version 2.0 (the
5 * "License"); you may not use this file except in compliance with
6 * the License. You may obtain a copy of the License at
7 *
8 * http://www.opensource.org/licenses/gpl-license.php
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
13 * See the Gnu General Public License for more details.
14 */
15 package org.figure8.join.core.messaging;
16
17 import org.figure8.join.util.LogUtil;
18
19 import org.apache.commons.logging.Log;
20
21 import javax.jms.Message;
22
23 import java.util.List;
24 import java.util.ArrayList;
25 /**
26 * This is a simple, dummy consumer that just echoes messages using commons
27 * logging and store processed messages into a list. List can be flushed using
28 * <code>flushMessages()</code> method. This implementation is suited for
29 * tests and debugging mainly.
30 * @author <a href="mailto:laurent.broudoux@free.fr">Laurent Broudoux</a>
31 * @version $Revision: 1.3 $
32 */
33 public class EchoJMSConsumerBean implements JMSConsumerBean{
34
35
36
37 /** Commons logger. */
38 protected static Log log = LogUtil.getLog(EchoJMSConsumerBean.class);
39
40
41
42
43 /** The name identifying this consumer. */
44 private String name;
45 /** List of processed messages */
46 private List messages = new ArrayList();
47
48
49
50
51 /** @return All the messages on the list so far, clearing the buffer. */
52 public synchronized List flushMessages(){
53 List answer = new ArrayList(messages);
54 messages.clear();
55 return answer;
56 }
57
58 /** @return true if consumer has received messages, false otherwise. */
59 public boolean hasReceivedMessage(){
60 return !messages.isEmpty();
61 }
62
63 /**
64 * Wait until <b>messageCount</b> messages have been received. This
65 * is useful when testing production and consumption of messages. The
66 * wait period is not infinite : it's 10 times 1 second.
67 * @param messageCount Number of message to wait for
68 */
69 public void waitForMessagesToArrive(int messageCount){
70
71 Object semaphore = new Object();
72 long start = System.currentTimeMillis();
73
74 for (int i=0; i<10; i++){
75 try{
76 if (messages.size() >= messageCount)
77 break;
78 synchronized (semaphore){
79 semaphore.wait(1000);
80 }
81 }
82 catch (InterruptedException e) {
83 System.err.println("Caught: " + e);
84 }
85 }
86
87 long end = System.currentTimeMillis() - start;
88 System.err.println("End of wait for " + end + " millis");
89 }
90
91
92
93
94 /**
95 * Just echo messages using commons logger and add them to messages list.
96 * @param message JMS Message to process
97 */
98 public synchronized void onMessage(Message message){
99 log.info("[" + this + "] Processing the message: " + message);
100 messages.add(message);
101 }
102
103 /**
104 * Retrieve the name of this consumer.
105 * @return The name identifying this consumer
106 */
107 public String getName(){
108 return name;
109 }
110
111 /**
112 * Set the name identifying this consumer.
113 * @param name The name of this consumer
114 */
115 public void setName(String name){
116 this.name = name;
117 }
118
119 /**
120 * Implement this method to stop current process and free resources.
121 * This method should not throw exceptions.
122 */
123 public void stop(){
124 log.info("Stopping the EchoJMSConsumerBean...");
125 }
126 }