View Javadoc

1   /**
2    * Copyright 2005-2006 the original author or authors.
3    *
4    * Licensed under the Gnu General Pubic License, Version 2.0 (the
5    * "License"); you may not use this file except in compliance with
6    * the License. You may obtain a copy of the License at
7    *
8    *      http://www.opensource.org/licenses/gpl-license.php
9    *
10   * This program is distributed in the hope that it will be useful,
11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
13   * See the Gnu General Public License for more details.
14   */
15  package org.figure8.join.core.messaging;
16  
17  import org.figure8.join.core.InvalidParameterException;
18  import org.figure8.join.core.Configurable;
19  import org.figure8.join.util.LogUtil;
20  
21  import org.apache.commons.logging.Log;
22  import org.jencks.EndpointFactorySupport;
23  import org.jencks.TargetSourceMessageListener;
24  import org.springframework.aop.target.CommonsPoolTargetSource;
25  import org.springframework.beans.factory.support.RootBeanDefinition;
26  import org.springframework.beans.factory.support.ChildBeanDefinition;
27  import org.springframework.beans.factory.support.DefaultListableBeanFactory;
28  import org.springframework.beans.MutablePropertyValues;
29  
30  import javax.jms.MessageListener;
31  import javax.resource.spi.UnavailableException;
32  import javax.transaction.TransactionManager;
33  import java.util.Properties;
34  import java.util.List;
35  /**
36   * This is an implementation of JCA <code>MessageEndpointFactory</code>
37   * creating MessageEndpoint from bean wrapping information on a {@link
38   * JMSConsumerBean}.
39   * <br/>
40   * This factory takes care if consumer is thread safe or not.
41   * @see javax.resource.spi.endpoint.MessageEndpointFactory
42   * @author <a href="mailto:laurent.broudoux@free.fr">Laurent Broudoux</a>
43   * @version $Revision: 1.2 $
44   */
45  public class JMSConsumerEndpointFactory extends EndpointFactorySupport{
46  
47     // Static -------------------------------------------------------------------
48  
49     /** Get a commons logger. */
50     private static final Log log = LogUtil.getLog(JMSConsumerEndpointFactory.class);
51  
52  
53     // Attributes ---------------------------------------------------------------
54  
55     /** The bean wrapping information on a JMSConsumerBean. */
56     private JMSConsumerBeanInfo consumerInfo = null;
57  
58  
59     // Constructors -------------------------------------------------------------
60  
61     /**
62      * Creates a new JMSConsumerEndpointFactory with consumer info.
63      * @param consumerInfo The bean wrapping information on a JMSConsumerBean
64      */
65     public JMSConsumerEndpointFactory(JMSConsumerBeanInfo consumerInfo){
66        this.consumerInfo = consumerInfo;
67     }
68  
69     /**
70      * Creates a new JMSConsumerEndpointFactory with consumer info
71      * and a transaction manager.
72      * @param consumerInfo The bean wrapping information on a JMSConsumerBean
73      * @param transactionManager TransactionManager for creating inbound JCA connector
74      */
75     public JMSConsumerEndpointFactory(JMSConsumerBeanInfo consumerInfo, TransactionManager transactionManager){
76        this.consumerInfo = consumerInfo;
77        this.transactionManager = transactionManager;
78     }
79  
80  
81     // Public -------------------------------------------------------------------
82  
83     /** @return The bean wrapping information on a JMSConsumerBean */
84     public JMSConsumerBeanInfo getConsumerInfo(){
85        return consumerInfo;
86     }
87     /** @param consumerInfo The bean wrapping information on a JMSConsumerBean */
88     public void setConsumerInfo(JMSConsumerBeanInfo consumerInfo){
89        this.consumerInfo = consumerInfo;
90     }
91  
92  
93     // Implementation of EndpointFactorySupport ---------------------------------
94  
95     /**
96      * Implementation of EndpointFactorySupport. This method deals with the
97      * creation of message listener that will be used as a JCA endpoint. Depending
98      * on the thread safetyness of the consumer, it may be wrapped into a pooling
99      * proxy.
100     * @throws UnavailableException if consumer cannot be created
101     * @return The newly created message listener implementation
102     */
103    protected MessageListener createMessageListener() throws UnavailableException{
104       // Prepare result.
105       MessageListener listener = null;
106       JMSConsumerBean consumer = null;
107 
108       try{
109          // First, assume we are thread safe and assign consumer instance.
110          log.info("Creating a MessageListener of class: " + consumerInfo.getConsumerBeanClass());
111          consumer = consumerInfo.getJMSConsumerBean();
112          listener = consumerInfo.getJMSConsumerBean();
113          log.info("Name of JMSConsumerBean [" + listener + "]: " + ((JMSConsumerBean)listener).getName());
114       }
115       catch (InvalidParameterException e){
116          log.error("Exception while creating a Messagelistener of class: " + consumerInfo.getConsumerBeanClass());
117          log.error("Here's the detailed exception msg: " + e.getMessage());
118          throw new UnavailableException("Cannot create a MessageListener in EndpointFactory", e);
119       }
120 
121       // Then, if not thread safe, wrap consumer into a proxied message listener
122       // that allows consumer pooling. Use Spring facilities for that.
123       if (!consumerInfo.isThreadSafe()){
124          log.debug("Consumer is not thread safe so wrapping it into a PoolableObjectFactory.");
125          // We have to register consumer into a bean factory, ...
126          DefaultListableBeanFactory beanFactory = new DefaultListableBeanFactory();
127 
128 
129          //RootBeanDefinition beanDef = new RootBeanDefinition(listener.getClass(), false);
130          //beanFactory.registerBeanDefinition("listener", beanDef);
131 
132          // This is a test on how to duplicate property values of bean...
133          // If not done, pooled instances will have no property values !!!
134          RootBeanDefinition beanDef = new RootBeanDefinition(consumer.getClass(), false);
135          MutablePropertyValues values = new MutablePropertyValues();
136          values.addPropertyValue("name", consumer.getName());
137 
138          // Add parameters if configurable.
139          if (consumer instanceof Configurable){
140             Properties parameters = new Properties();
141             List consumerParameters = consumerInfo.getConsumerParameterInfos();
142             for (int i=0; i<consumerParameters.size(); i++){
143                JMSConsumerBeanParameterInfo parameter = (JMSConsumerBeanParameterInfo)consumerParameters.get(i);
144                parameters.setProperty(parameter.getName(), parameter.getValue());
145             }
146             values.addPropertyValue("parameters", parameters);
147          }
148 
149          beanDef.setPropertyValues(values);
150          beanFactory.registerBeanDefinition("parent", beanDef);
151          ChildBeanDefinition childDef = new ChildBeanDefinition("parent", null);
152          childDef.setSingleton(false);
153          System.err.println(beanDef.getPropertyValues());
154          childDef.setPropertyValues(beanDef.getPropertyValues());
155          beanFactory.registerBeanDefinition("listener", childDef);
156 
157 
158          // ... initialize a target source with bean, ...
159          CommonsPoolTargetSource target = new CommonsPoolTargetSource();
160          target.setTargetBeanName("listener");
161          target.setBeanFactory(beanFactory);
162          // ... proxy with a target source that is a message listener.
163          TargetSourceMessageListener proxy = new TargetSourceMessageListener();
164          proxy.setTargetSource(target);
165          listener = proxy;
166       }
167       log.info("Created listener: " + listener);
168       return listener;
169    }
170 }