View Javadoc

1   // ========================================================================
2   // Copyright 2008 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // Licensed under the Apache License, Version 2.0 (the "License");
5   // you may not use this file except in compliance with the License.
6   // You may obtain a copy of the License at 
7   // http://www.apache.org/licenses/LICENSE-2.0
8   // Unless required by applicable law or agreed to in writing, software
9   // distributed under the License is distributed on an "AS IS" BASIS,
10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  // See the License for the specific language governing permissions and
12  // limitations under the License.
13  //========================================================================
14  package org.mortbay.cometd;
15  
16  import java.lang.reflect.Method;
17  import java.util.Map;
18  import java.util.concurrent.ConcurrentHashMap;
19  
20  import org.cometd.Bayeux;
21  import org.cometd.Channel;
22  import org.cometd.Client;
23  import org.cometd.Listener;
24  import org.cometd.Message;
25  import org.cometd.MessageListener;
26  import org.mortbay.component.LifeCycle;
27  import org.mortbay.log.Log;
28  import org.mortbay.thread.QueuedThreadPool;
29  import org.mortbay.thread.ThreadPool;
30  
31  /* ------------------------------------------------------------ */
32  /**
33   * Abstract Bayeux Service class. This is a base class to assist with the
34   * creation of server side @ link Bayeux} clients that provide services to
35   * remote Bayeux clients. The class provides a Bayeux {@link Client} and
36   * {@link Listener} together with convenience methods to map subscriptions to
37   * methods on the derived class and to send responses to those methods.
38   * 
39   * <p>
40   * If a {@link #set_threadPool(ThreadPool)} is set, then messages are handled in
41   * their own threads. This is desirable if the handling of a message can take
42   * considerable time and it is desired not to hold up the delivering thread
43   * (typically a HTTP request handling thread).
44   * 
45   * <p>
46   * If the BayeuxService is constructed asynchronously (the default), then
47   * messages are delivered unsynchronized and multiple simultaneous calls to
48   * handling methods may occur.
49   * 
50   * <p>
51   * If the BayeuxService is constructed as a synchronous service, then message
52   * delivery is synchronized on the internal {@link Client} instances used and
53   * only a single call will be made to the handler method (unless a thread pool
54   * is used).
55   * 
56   * @see MessageListener
57   * @author gregw
58   * 
59   */
60  public abstract class BayeuxService
61  {
62      private String _name;
63      private Bayeux _bayeux;
64      private Client _client;
65      private Map<String,Method> _methods=new ConcurrentHashMap<String,Method>();
66      private ThreadPool _threadPool;
67      private MessageListener _listener;
68      private boolean _seeOwn=false;
69  
70      /* ------------------------------------------------------------ */
71      /**
72       * Instantiate the service. Typically the derived constructor will call @
73       * #subscribe(String, String)} to map subscriptions to methods.
74       * 
75       * @param bayeux
76       *            The bayeux instance.
77       * @param name
78       *            The name of the service (used as client ID prefix).
79       */
80      public BayeuxService(Bayeux bayeux, String name)
81      {
82          this(bayeux,name,0,false);
83      }
84  
85      /* ------------------------------------------------------------ */
86      /**
87       * Instantiate the service. Typically the derived constructor will call @
88       * #subscribe(String, String)} to map subscriptions to methods.
89       * 
90       * @param bayeux
91       *            The bayeux instance.
92       * @param name
93       *            The name of the service (used as client ID prefix).
94       * @param maxThreads
95       *            The size of a ThreadPool to create to handle messages.
96       */
97      public BayeuxService(Bayeux bayeux, String name, int maxThreads)
98      {
99          this(bayeux,name,maxThreads,false);
100     }
101 
102     /* ------------------------------------------------------------ */
103     /**
104      * Instantiate the service. Typically the derived constructor will call @
105      * #subscribe(String, String)} to map subscriptions to methods.
106      * 
107      * @param bayeux
108      *            The bayeux instance.
109      * @param name
110      *            The name of the service (used as client ID prefix).
111      * @param maxThreads
112      *            The size of a ThreadPool to create to handle messages.
113      * @param synchronous
114      *            True if message delivery will be synchronized on the client.
115      */
116     public BayeuxService(Bayeux bayeux, String name, int maxThreads, boolean synchronous)
117     {
118         if (maxThreads > 0)
119             setThreadPool(new QueuedThreadPool(maxThreads));
120         _name=name;
121         _bayeux=bayeux;
122         _client=_bayeux.newClient(name);
123         _listener=(synchronous)?new SyncListen():new AsyncListen();
124         _client.addListener(_listener);
125 
126     }
127 
128     /* ------------------------------------------------------------ */
129     public Bayeux getBayeux()
130     {
131         return _bayeux;
132     }
133 
134     /* ------------------------------------------------------------ */
135     public Client getClient()
136     {
137         return _client;
138     }
139 
140     /* ------------------------------------------------------------ */
141     public ThreadPool getThreadPool()
142     {
143         return _threadPool;
144     }
145 
146     /* ------------------------------------------------------------ */
147     /**
148      * Set the threadpool. If the {@link ThreadPool} is a {@link LifeCycle},
149      * then it is started by this method.
150      * 
151      * @param pool
152      */
153     public void setThreadPool(ThreadPool pool)
154     {
155         try
156         {
157             if (pool instanceof LifeCycle)
158                 if (!((LifeCycle)pool).isStarted())
159                     ((LifeCycle)pool).start();
160         }
161         catch(Exception e)
162         {
163             throw new IllegalStateException(e);
164         }
165         _threadPool=pool;
166     }
167 
168     /* ------------------------------------------------------------ */
169     public boolean isSeeOwnPublishes()
170     {
171         return _seeOwn;
172     }
173 
174     /* ------------------------------------------------------------ */
175     public void setSeeOwnPublishes(boolean own)
176     {
177         _seeOwn=own;
178     }
179 
180     /* ------------------------------------------------------------ */
181     /**
182      * Subscribe to a channel. Subscribe to channel and map a method to handle
183      * received messages. The method must have a unique name and one of the
184      * following signatures:
185      * <ul>
186      * <li><code>myMethod(Client fromClient,Object data)</code></li>
187      * <li><code>myMethod(Client fromClient,Object data,String id)</code></li>
188      * <li>
189      * <code>myMethod(Client fromClient,String channel,Object data,String id)</code>
190      * </li>
191      * </li>
192      * 
193      * The data parameter can be typed if the type of the data object published
194      * by the client is known (typically Map<String,Object>). If the type of the
195      * data parameter is {@link Message} then the message object itself is
196      * passed rather than just the data.
197      * <p>
198      * Typically a service will subscribe to a channel in the "/service/**"
199      * space which is not a broadcast channel. Messages published to these
200      * channels are only delivered to server side clients like this service.
201      * 
202      * <p>
203      * Any object returned by a mapped subscription method is delivered to the
204      * calling client and not broadcast. If the method returns void or null,
205      * then no response is sent. A mapped subscription method may also call
206      * {@link #send(Client, String, Object, String)} to deliver a response
207      * message(s) to different clients and/or channels. It may also publish
208      * methods via the normal {@link Bayeux} API.
209      * <p>
210      * 
211      * 
212      * @param channelId
213      *            The channel to subscribe to
214      * @param methodName
215      *            The name of the method on this object to call when messages
216      *            are recieved.
217      */
218     protected void subscribe(String channelId, String methodName)
219     {
220         Method method=null;
221 
222         Class<?> c=this.getClass();
223         while(c != null && c != Object.class)
224         {
225             Method[] methods=c.getDeclaredMethods();
226             for (int i=methods.length; i-- > 0;)
227             {
228                 if (methodName.equals(methods[i].getName()))
229                 {
230                     if (method != null)
231                         throw new IllegalArgumentException("Multiple methods called '" + methodName + "'");
232                     method=methods[i];
233                 }
234             }
235             c=c.getSuperclass();
236         }
237 
238         if (method == null)
239             throw new NoSuchMethodError(methodName);
240         int params=method.getParameterTypes().length;
241         if (params < 2 || params > 4)
242             throw new IllegalArgumentException("Method '" + methodName + "' does not have 2or3 parameters");
243         if (!Client.class.isAssignableFrom(method.getParameterTypes()[0]))
244             throw new IllegalArgumentException("Method '" + methodName + "' does not have Client as first parameter");
245 
246         Channel channel=_bayeux.getChannel(channelId,true);
247 
248         if (((ChannelImpl)channel).getChannelId().isWild())
249         {
250             final Method m=method;
251             Client wild_client=_bayeux.newClient(_name + "-wild");
252             wild_client.addListener(_listener instanceof MessageListener.Asynchronous?new AsyncWildListen(wild_client,m):new SyncWildListen(wild_client,m));
253             channel.subscribe(wild_client);
254         }
255         else
256         {
257             _methods.put(channelId,method);
258             channel.subscribe(_client);
259         }
260     }
261 
262     /* ------------------------------------------------------------ */
263     /**
264      * Send data to a individual client. The data passed is sent to the client
265      * as the "data" member of a message with the given channel and id. The
266      * message is not published on the channel and is thus not broadcast to all
267      * channel subscribers. However to the target client, the message appears as
268      * if it was broadcast.
269      * <p>
270      * Typcially this method is only required if a service method sends
271      * response(s) to channels other than the subscribed channel. If the
272      * response is to be sent to the subscribed channel, then the data can
273      * simply be returned from the subscription method.
274      * 
275      * @param toClient
276      *            The target client
277      * @param onChannel
278      *            The channel the message is for
279      * @param data
280      *            The data of the message
281      * @param id
282      *            The id of the message (or null for a random id).
283      */
284     protected void send(Client toClient, String onChannel, Object data, String id)
285     {
286         toClient.deliver(getClient(),onChannel,data,id);
287     }
288 
289     /* ------------------------------------------------------------ */
290     /**
291      * Handle Exception. This method is called when a mapped subscription method
292      * throws and exception while handling a message.
293      * 
294      * @param fromClient
295      * @param toClient
296      * @param msg
297      * @param th
298      */
299     protected void exception(Client fromClient, Client toClient, Map<String,Object> msg, Throwable th)
300     {
301         System.err.println(msg);
302         th.printStackTrace();
303     }
304 
305     /* ------------------------------------------------------------ */
306     private void invoke(final Method method, final Client fromClient, final Client toClient, final Message msg)
307     {
308         if (_threadPool == null)
309             doInvoke(method,fromClient,toClient,msg);
310         else
311         {
312             _threadPool.dispatch(new Runnable()
313             {
314                 public void run()
315                 {
316                     try
317                     {
318                         ((MessageImpl)msg).incRef();
319                         doInvoke(method,fromClient,toClient,msg);
320                     }
321                     finally
322                     {
323                         ((MessageImpl)msg).decRef();
324                     }
325                 }
326             });
327         }
328     }
329 
330     /* ------------------------------------------------------------ */
331     private void doInvoke(Method method, Client fromClient, Client toClient, Message msg)
332     {
333         String channel=(String)msg.get(Bayeux.CHANNEL_FIELD);
334         Object data=msg.get(Bayeux.DATA_FIELD);
335         String id=msg.getId();
336 
337         if (method != null)
338         {
339             try
340             {
341                 Class<?>[] args=method.getParameterTypes();
342                 Object arg=Message.class.isAssignableFrom(args[1])?msg:data;
343 
344                 Object reply=null;
345                 switch(method.getParameterTypes().length)
346                 {
347                     case 2:
348                         reply=method.invoke(this,fromClient,arg);
349                         break;
350                     case 3:
351                         reply=method.invoke(this,fromClient,arg,id);
352                         break;
353                     case 4:
354                         reply=method.invoke(this,fromClient,channel,arg,id);
355                         break;
356                 }
357 
358                 if (reply != null)
359                     send(fromClient,channel,reply,id);
360             }
361             catch(Exception e)
362             {
363                 Log.debug("method",method);
364                 exception(fromClient,toClient,msg,e);
365             }
366             catch(Error e)
367             {
368                 Log.debug("method",method);
369                 exception(fromClient,toClient,msg,e);
370             }
371         }
372     }
373 
374     /* ------------------------------------------------------------ */
375     /* ------------------------------------------------------------ */
376     private class AsyncListen implements MessageListener, MessageListener.Asynchronous
377     {
378         public void deliver(Client fromClient, Client toClient, Message msg)
379         {
380             if (!_seeOwn && fromClient == getClient())
381                 return;
382             String channel=(String)msg.get(Bayeux.CHANNEL_FIELD);
383             Method method=_methods.get(channel);
384             invoke(method,fromClient,toClient,msg);
385         }
386     }
387 
388     /* ------------------------------------------------------------ */
389     /* ------------------------------------------------------------ */
390     private class SyncListen implements MessageListener, MessageListener.Synchronous
391     {
392         public void deliver(Client fromClient, Client toClient, Message msg)
393         {
394             if (!_seeOwn && fromClient == getClient())
395                 return;
396             String channel=(String)msg.get(Bayeux.CHANNEL_FIELD);
397             Method method=_methods.get(channel);
398             invoke(method,fromClient,toClient,msg);
399         }
400     }
401 
402     /* ------------------------------------------------------------ */
403     /* ------------------------------------------------------------ */
404     private class SyncWildListen implements MessageListener, MessageListener.Synchronous
405     {
406         Client _client;
407         Method _method;
408 
409         public SyncWildListen(Client client, Method method)
410         {
411             _client=client;
412             _method=method;
413         }
414 
415         public void deliver(Client fromClient, Client toClient, Message msg)
416         {
417             if (!_seeOwn && (fromClient == _client || fromClient == getClient()))
418                 return;
419             invoke(_method,fromClient,toClient,msg);
420         }
421     };
422 
423     /* ------------------------------------------------------------ */
424     /* ------------------------------------------------------------ */
425     private class AsyncWildListen implements MessageListener, MessageListener.Asynchronous
426     {
427         Client _client;
428         Method _method;
429 
430         public AsyncWildListen(Client client, Method method)
431         {
432             _client=client;
433             _method=method;
434         }
435 
436         public void deliver(Client fromClient, Client toClient, Message msg)
437         {
438             if (!_seeOwn && (fromClient == _client || fromClient == getClient()))
439                 return;
440             invoke(_method,fromClient,toClient,msg);
441         }
442     };
443 
444 }