View Javadoc

1   /**
2    * Copyright 2005-2006 the original author or authors.
3    *
4    * Licensed under the Gnu General Pubic License, Version 2.0 (the
5    * "License"); you may not use this file except in compliance with
6    * the License. You may obtain a copy of the License at
7    *
8    *      http://www.opensource.org/licenses/gpl-license.php
9    *
10   * This program is distributed in the hope that it will be useful,
11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
13   * See the Gnu General Public License for more details.
14   */
15  package org.figure8.join.core.messaging;
16  
17  import org.figure8.join.core.setup.ActiveMQConfigurator;
18  import org.figure8.join.util.LogUtil;
19  
20  import org.apache.commons.logging.Log;
21  import org.activemq.ra.ActiveMQActivationSpec;
22  import org.activemq.ra.ActiveMQResourceAdapter;
23  import org.activemq.ra.ActiveMQEndpointActivationKey;
24  import org.jencks.JCAContainer;
25  import org.jencks.JCAConnector;
26  
27  import javax.resource.ResourceException;
28  import javax.resource.spi.endpoint.MessageEndpointFactory;
29  import javax.transaction.TransactionManager;
30  
31  import java.util.Hashtable;
32  /**
33   * This is an extension of Jencks <code>JCAContainer</code> that can be further
34   * configured depending on <code>ApplicationConfig</code> and using extra properties
35   * provided by a <code>ActiveMQConfigurator</code> instance.
36   *
37   * @see org.figure8.join.core.setup.ApplicationConfig
38   * @see org.figure8.join.core.setup.ActiveMQConfigurator
39   *
40   * @author <a href="mailto:laurent.broudoux@free.fr">Laurent Broudoux</a>
41   * @version $Revision: 1.2 $
42   */
43  public class ConfigurableJencksContainer extends JCAContainer{
44  
45     // Static -------------------------------------------------------------------
46  
47     /** Get a commons logger. */
48     private static Log log = LogUtil.getLog(ConfigurableJencksContainer.class);
49  
50  
51     // Attributes ---------------------------------------------------------------
52  
53     /** Helper object for configuring ActiveMQ connection factory. */
54     private ActiveMQConfigurator configurator = null;
55     /** TransactionManager for creating inbound JCA connectors (optional). */
56     private TransactionManager transactionManager = null;
57  
58     /**
59      * The hash table holding running consumers endpoint and activation spec.
60      * Keys are the name of JMSConsumerBean (unique identifier), values are the
61      * {@link ActiveMQEndpointActivationKey} wrapper provided by ActiveMQ.
62      */
63     private Hashtable runningConsumers = new Hashtable();
64  
65  
66     // Public -------------------------------------------------------------------
67  
68     /** @param configurator Helper for configuring connection factory */
69     public void setActiveMQConfigurator(ActiveMQConfigurator configurator){
70        this.configurator = configurator;
71     }
72     /** @param transactionManager TransactionManager for creating inbound connectors */
73     public void setTransactionManager(TransactionManager transactionManager){
74        this.transactionManager = transactionManager;
75     }
76  
77     /**
78      * Activate a JMS consumer within container using a JCA connector.
79      * @param info The bean wrapping informations on a JMS consumer
80      * @throws ResourceException if consumer cannot be activated into JCA resource adapter
81      */
82     public synchronized void activateJMSConsumerBean(JMSConsumerBeanInfo info) throws ResourceException{
83        // Activate if not already running ...
84        if (runningConsumers.get(info.getName()) == null){
85           log.info("Adding a JCA connector for consumer: " + info.getName());
86           // Add an inbound connector.
87           JCAConnector connector = addConnector();
88  
89           // Configure it with JCA stuffs.
90           ActiveMQActivationSpec spec = createActivationSpec(info);
91           MessageEndpointFactory factory = createMessageEndpointFactory(info);
92           connector.setActivationSpec(spec);
93           connector.setEndpointFactory(factory);
94  
95           // Activate endpoint.
96           log.info("Activating endpoint for activationSpec: " + spec + " using endpointFactory: " + factory);
97           getResourceAdapter().endpointActivation(factory, spec);
98           log.info("Activation done.");
99  
100          // Record activated consumer.
101          ActiveMQEndpointActivationKey endpointActivation = new ActiveMQEndpointActivationKey(factory, spec);
102          runningConsumers.put(info.getName(), endpointActivation);
103       }
104    }
105 
106    /**
107     * Deactivate a JMS consumer within container using the corresponding JCA connector.
108     * @param info The bean wrapping informations on a JMS consumer
109     */
110    public synchronized void deactivateJMSConsumerBean(JMSConsumerBeanInfo info){
111       // Deactivate if running ...
112       if (runningConsumers.get(info.getName()) != null){
113          log.info("Removing the JCA connector for consumer: " + info.getName());
114          // Retrieving details for running consumer.
115          ActiveMQEndpointActivationKey endpointActivation = (ActiveMQEndpointActivationKey)runningConsumers.get(info.getName());
116 
117          // Deactivate endpoint.
118          log.info("Deactivating endpoint for activationSpec: " + endpointActivation.getActivationSpec() +
119                  " using endpointFactory: " + endpointActivation.getMessageEndpointFactory());
120          getResourceAdapter().endpointDeactivation(endpointActivation.getMessageEndpointFactory(),
121                  endpointActivation.getActivationSpec());
122          log.info("Deactivation done.");
123 
124          // Remove deactivated consumer.
125          runningConsumers.remove(info.getName());
126       }
127    }
128 
129 
130    // Protected ----------------------------------------------------------------
131 
132    /**
133     * Create a ResourceAdapter and assign it to this Jencks JCA container.
134     * This method implementation creates an ActiveMQ resource adapter.
135     */
136    protected void createResourceAdapter(){
137       log.info("Creating the ActiveMQ resource adapter");
138       // Create an ActiveMQ resource adapter.
139       ActiveMQResourceAdapter ra = new ActiveMQResourceAdapter();
140       // Set its server URL from configurator.
141       String serverUrl = configurator.getActiveMQProperty("brokerURL");
142       ra.setServerUrl(serverUrl);
143       log.info("RA ServerUrl is: " + ra.getServerUrl());
144       // Set the ra as wrapped ResourceAdapter.
145       setResourceAdapter(ra);
146    }
147 
148    /**
149     * Create corresponding JCA ActivationSpec for a JMS consumer.
150     * This method implementations creates an ActiveMQ activation spec.
151     * @param info The bean wrapping informations on a JMS consumer
152     * @throws ResourceException if spec cannot be activated with JCA resource adapter
153     * @return The newly created ActivationSpec
154     */
155    protected ActiveMQActivationSpec createActivationSpec(JMSConsumerBeanInfo info) throws ResourceException{
156       // Create a ActiveMQ activation spec.
157       ActiveMQActivationSpec spec = new ActiveMQActivationSpec();
158       spec.setResourceAdapter(getResourceAdapter());
159       // Set its properties according to consumer infos.
160       spec.setDestination(info.getDestination());
161       spec.setDestinationType("javax.jms.Topic");
162       if (info.getSelector() != null)
163          spec.setMessageSelector(info.getSelector());
164       // Return activation spec.
165       return spec;
166    }
167 
168    /**
169     * Create a JCA MessageEndpointFactory for a JMS consumer.
170     * This method implementations creates a {@link JMSConsumerEndpointFactory}.
171     * @param info The bean wrapping informations on a JMS consumer
172     * @return The newly created MessageEndpointFactory
173     */
174    protected MessageEndpointFactory createMessageEndpointFactory(JMSConsumerBeanInfo info){
175       // Prepare result.
176       MessageEndpointFactory endpointFactory = null;
177       // Build a JMSConsumerEndpointFactory.
178       if (transactionManager != null)
179          endpointFactory = new JMSConsumerEndpointFactory(info);
180       else
181          endpointFactory = new JMSConsumerEndpointFactory(info, transactionManager);
182       return endpointFactory;
183    }
184 
185 
186    // Override of JCAContainer -------------------------------------------------
187 
188    /**
189     * Override of method from Jencks JCAContainer. This implementation
190     * dynamically creates an ActiveMQ ResourceAdapter depending on informations
191     * provided by {@link ActiveMQConfigurator} object.
192     * @throws Exception IllegalArgumentException if Jencks container is not well configured
193     */
194    public void afterPropertiesSet() throws Exception{
195       // Check if we are on asynchronous side of application.
196       if (configurator.getApplicationConfig().isAsynchronousSide()){
197          log.info("On the asynchronous side of application. Creating container...");
198          // If no resource adapter is supplied, create one.
199          if (getResourceAdapter() == null)
200             createResourceAdapter();
201          // Then, apply the start sequence of super class.
202          super.afterPropertiesSet();
203       }
204    }
205 }