View Javadoc

1   /**
2    * Copyright 2005-2006 the original author or authors.
3    *
4    * Licensed under the Gnu General Pubic License, Version 2.0 (the
5    * "License"); you may not use this file except in compliance with
6    * the License. You may obtain a copy of the License at
7    *
8    *      http://www.opensource.org/licenses/gpl-license.php
9    *
10   * This program is distributed in the hope that it will be useful,
11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
13   * See the Gnu General Public License for more details.
14   */
15  package org.figure8.join.core.messaging;
16  
17  import org.figure8.join.core.setup.ActiveMQConfigurator;
18  import org.figure8.join.util.LogUtil;
19  
20  import org.apache.commons.logging.Log;
21  import org.activemq.broker.BrokerContainer;
22  import org.activemq.broker.BrokerConnector;
23  import org.activemq.broker.impl.BrokerContainerImpl;
24  import org.activemq.spring.BrokerFactoryBean;
25  import org.activemq.spring.SpringBrokerContainerFactory;
26  import org.activemq.transport.TransportServerChannel;
27  import org.activemq.transport.tcp.TcpTransportServerChannel;
28  
29  import java.net.URI;
30  import java.util.List;
31  /**
32   * This factory is an extension of the Spring <code>BrokerFactory</code>
33   * provided by ActiveMQ. This configurable broker factory only creates broker
34   * if we are on the asynchronous side of the application. It also replaces
35   * the URL of the TCP transport layer provided into the configuration file by
36   * the one configured by application administrator (during the setup process,
37   * through the <code>ActiveMQConfigurator</code> helper object).
38   *
39   * @see org.figure8.join.core.setup.ApplicationConfig
40   * @see org.figure8.join.core.setup.ActiveMQConfigurator
41   *
42   * @author <a href="mailto:laurent.broudoux@free.fr">Laurent Broudoux</a>
43   * @version $Revision: 1.3 $
44   */
45  public class ConfigurableBrokerFactoryBean extends BrokerFactoryBean{
46  
47     // Static -------------------------------------------------------------------
48  
49     /** Get a commons logger. */
50     private static Log log = LogUtil.getLog(ConfigurableBrokerFactoryBean.class);
51  
52  
53     // Attributes ---------------------------------------------------------------
54  
55     /** The wrapped ActiveMQ broker container. */
56     protected BrokerContainer broker = null;
57     /** Helper object for configuring ActiveMQ connection factory. */
58     private ActiveMQConfigurator configurator = null;
59  
60  
61     // Public -------------------------------------------------------------------
62  
63     /** @param configurator Helper for configuring connection factory */
64     public void setActiveMQConfigurator(ActiveMQConfigurator configurator){
65        this.configurator = configurator;
66     }
67  
68  
69     // Protected ----------------------------------------------------------------
70  
71     /**
72      * Update the url of the TCP trasnport server channel of the
73      * previously created borker. This later is replaced with the one
74      * coming from {@link ActiveMQConfigurator}.
75      */
76     protected void updateBrokerTransportChannelUrl() throws Exception{
77        if (broker != null){
78           log.info("Updating the url of Tcp transport channel");
79           // Browse the available transport connectors.
80           List connectors = broker.getTransportConnectors();
81           for (int i=0; i < connectors.size(); i++){
82              BrokerConnector connector = (BrokerConnector)connectors.get(i);
83              // Find the Tcp server channel.
84              TransportServerChannel channel = connector.getServerChannel();
85              if (channel instanceof TcpTransportServerChannel){
86                 // Get its URL from configurator.
87                 String serverUrl = configurator.getActiveMQProperty("brokerURL");
88                 URI serverUri = new URI(serverUrl);
89                 // Build a new Tcp channel with our URI because existing one cannot be changed.
90                 TcpTransportServerChannel tcpChannel = ((TcpTransportServerChannel)channel);
91                 TcpTransportServerChannel ourChannel = new TcpTransportServerChannel(tcpChannel.getWireFormat(), serverUri);
92                 tcpChannel.stop();
93                 // Replace the existing connector by our channel.
94                 broker.removeConnector(connector);
95                 broker.addConnector(ourChannel);
96                 log.info("TcpTransport url is: " + ourChannel.getUrl());
97                 break;
98              }
99           }
100       }
101    }
102 
103 
104    // Override of BrokerFactoryBean --------------------------------------------
105 
106    /**
107     * We must ovveride this method cause broker container is private
108     * into ActiveMQ <code>BrokerFactoryBean</code>.
109     * @throws Exception this does not happen (return null if no object)
110     * @return The created ActiveMQ broker.
111     */
112    public Object getObject() throws Exception{
113       return broker;
114    }
115 
116    /**
117     * Override of broker creation method from ActiveMQ BrokerFactory. This
118     * implementation only creates and starts the broker if we are on the
119     * asynchronous side of the application. More over, the transport url
120     * of the broker is replaced by the one from {@link ActiveMQConfigurator}.
121     * @throws Exception IllegalArgumentException if broker factory is not well configured
122     */
123    public void afterPropertiesSet() throws Exception{
124       // Check if we are on asynchronous side of application.
125       if (configurator.getApplicationConfig().isAsynchronousSide()){
126          log.info("On the asynchronous side of application. Creating broker...");
127          if (getConfig() == null)
128             throw new IllegalArgumentException("config property must be set");
129          // Create broker.
130          broker = SpringBrokerContainerFactory.newInstance(getConfig());
131          // Update broker transport uri before starting.
132          try {updateBrokerTransportChannelUrl();}
133          catch (Exception e){
134             log.error("Exception while updating the url of the TcpTransport layer");
135             log.error("Here's the detailed message: " + e.getMessage());
136             throw e;
137          }
138          broker.start();
139       }
140       else{
141          // Initialize a dummy broker to not get Spring crash on startup...
142          broker = new BrokerContainerImpl("dummyBroker");
143       }
144    }
145 
146    /**
147     * Stop the wrapped broker before destroy. We must ovveride this method
148     * cause broker container is private into ActiveMQ <code>BrokerFactoryBean</code>.
149     * @throws Exception if something wrong occurs during broker stop
150     */
151    public void destroy() throws Exception{
152       if (broker != null)
153          broker.stop();
154    }
155 }