View Javadoc

1   // ========================================================================
2   // Copyright 2006 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // Licensed under the Apache License, Version 2.0 (the "License");
5   // you may not use this file except in compliance with the License.
6   // You may obtain a copy of the License at 
7   // http://www.apache.org/licenses/LICENSE-2.0
8   // Unless required by applicable law or agreed to in writing, software
9   // distributed under the License is distributed on an "AS IS" BASIS,
10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  // See the License for the specific language governing permissions and
12  // limitations under the License.
13  // ========================================================================
14  
15  package org.mortbay.cometd;
16  
17  import java.util.Arrays;
18  import java.util.Collection;
19  import java.util.List;
20  import java.util.concurrent.ConcurrentHashMap;
21  import java.util.concurrent.ConcurrentMap;
22  
23  import org.cometd.Bayeux;
24  import org.cometd.Channel;
25  import org.cometd.ChannelBayeuxListener;
26  import org.cometd.ChannelListener;
27  import org.cometd.Client;
28  import org.cometd.DataFilter;
29  import org.cometd.Message;
30  import org.cometd.SubscriptionListener;
31  import org.mortbay.log.Log;
32  import org.mortbay.util.LazyList;
33  
34  /* ------------------------------------------------------------ */
35  /**
36   * A Bayuex Channel
37   * 
38   * @author gregw
39   * 
40   */
41  public class ChannelImpl implements Channel
42  {
43      protected AbstractBayeux _bayeux;
44      private volatile ClientImpl[] _subscribers=new ClientImpl[0]; // copy on
45      // write
46      private volatile DataFilter[] _dataFilters=new DataFilter[0]; // copy on
47      // write
48      private volatile SubscriptionListener[] _subscriptionListeners=new SubscriptionListener[0]; // copy
49      // on
50      // write
51      private ChannelId _id;
52      private ConcurrentMap<String,ChannelImpl> _children=new ConcurrentHashMap<String,ChannelImpl>();
53      private ChannelImpl _wild;
54      private ChannelImpl _wildWild;
55      private boolean _persistent;
56      private int _split;
57      private boolean _lazy;
58  
59      /* ------------------------------------------------------------ */
60      protected ChannelImpl(String id, AbstractBayeux bayeux)
61      {
62          _id=new ChannelId(id);
63          _bayeux=bayeux;
64      }
65  
66      /* ------------------------------------------------------------ */
67      /**
68       * A Lazy channel marks published messages as lazy. Lazy messages are queued
69       * but do not wake up waiting clients.
70       * 
71       * @return true if message is lazy
72       */
73      public boolean isLazy()
74      {
75          return _lazy;
76      }
77  
78      /* ------------------------------------------------------------ */
79      /**
80       * A Lazy channel marks published messages as lazy. Lazy messages are queued
81       * but do not wake up waiting clients.
82       * 
83       * @param lazy
84       *            true if message is lazy
85       */
86      public void setLazy(boolean lazy)
87      {
88          _lazy=lazy;
89      }
90  
91      /* ------------------------------------------------------------ */
92      public void addChild(ChannelImpl channel)
93      {
94          ChannelId child=channel.getChannelId();
95          if (!_id.isParentOf(child))
96          {
97              throw new IllegalArgumentException(_id + " not parent of " + child);
98          }
99  
100         String next=child.getSegment(_id.depth());
101 
102         if ((child.depth() - _id.depth()) == 1)
103         {
104             // synchronize add and doRemove to avoid concurrent updates
105             synchronized(this)
106             {
107                 // add the channel to this channels
108                 ChannelImpl old=_children.putIfAbsent(next,channel);
109                 if (old != null)
110                     throw new IllegalArgumentException("Already Exists");
111 
112                 if (ChannelId.WILD.equals(next))
113                     _wild=channel;
114                 else if (ChannelId.WILDWILD.equals(next))
115                     _wildWild=channel;
116                 _bayeux.addChannel(channel);
117             }
118         }
119         else
120         {
121             ChannelImpl branch=(ChannelImpl)_bayeux.getChannel((_id.depth() == 0?"/":(_id.toString() + "/")) + next,true);
122             branch.addChild(channel);
123         }
124     }
125 
126     /* ------------------------------------------------------------ */
127     /**
128      * @param filter
129      */
130     public void addDataFilter(DataFilter filter)
131     {
132         synchronized(this)
133         {
134             _dataFilters=(DataFilter[])LazyList.addToArray(_dataFilters,filter,null);
135         }
136     }
137 
138     /* ------------------------------------------------------------ */
139     /**
140      * @return
141      */
142     public ChannelId getChannelId()
143     {
144         return _id;
145     }
146 
147     /* ------------------------------------------------------------ */
148     public ChannelImpl getChild(ChannelId id)
149     {
150         String next=id.getSegment(_id.depth());
151         if (next == null)
152             return null;
153 
154         ChannelImpl channel=_children.get(next);
155 
156         if (channel == null || channel.getChannelId().depth() == id.depth())
157         {
158             return channel;
159         }
160         return channel.getChild(id);
161     }
162 
163     /* ------------------------------------------------------------ */
164     public void getChannels(List<Channel> list)
165     {
166         synchronized(this)
167         {
168             list.add(this);
169             for (ChannelImpl channel : _children.values())
170                 channel.getChannels(list);
171         }
172     }
173 
174     /* ------------------------------------------------------------ */
175     public int getChannelCount()
176     {
177         return _children.size();
178     }
179 
180     /* ------------------------------------------------------------ */
181     /**
182      * @return
183      */
184     public String getId()
185     {
186         return _id.toString();
187     }
188 
189     /* ------------------------------------------------------------ */
190     public boolean isPersistent()
191     {
192         return _persistent;
193     }
194 
195     /* ------------------------------------------------------------ */
196     public void deliver(Client from, Iterable<Client> to, Object data, String id)
197     {
198         MessageImpl message=_bayeux.newMessage();
199         message.put(Bayeux.CHANNEL_FIELD,getId());
200         message.put(Bayeux.DATA_FIELD,data);
201         if (id != null)
202             message.put(Bayeux.ID_FIELD,id);
203 
204         Message m=_bayeux.extendSendBayeux(from,message);
205 
206         if (m != null)
207         {
208             for (Client t : to)
209                 ((ClientImpl)t).doDelivery(from,m);
210         }
211         if (m instanceof MessageImpl)
212             ((MessageImpl)m).decRef();
213     }
214 
215     /* ------------------------------------------------------------ */
216     public void publish(Client fromClient, Object data, String msgId)
217     {
218         _bayeux.doPublish(getChannelId(),fromClient,data,msgId,false);
219     }
220 
221     /* ------------------------------------------------------------ */
222     public void publishLazy(Client fromClient, Object data, String msgId)
223     {
224         _bayeux.doPublish(getChannelId(),fromClient,data,msgId,true);
225     }
226 
227     /* ------------------------------------------------------------ */
228     public boolean remove()
229     {
230         return _bayeux.removeChannel(this);
231     }
232 
233     /* ------------------------------------------------------------ */
234     public boolean doRemove(ChannelImpl channel, List<ChannelBayeuxListener> listeners)
235     {
236         ChannelId channelId=channel.getChannelId();
237         int diff=channel._id.depth() - _id.depth();
238 
239         if (diff >= 1)
240         {
241             String key=channelId.getSegment(_id.depth());
242             ChannelImpl child=_children.get(key);
243 
244             if (child != null)
245             {
246                 // is it this child we are removing?
247                 if (diff == 1)
248                 {
249                     if (!child.isPersistent())
250                     {
251                         // synchronize add and doRemove to avoid concurrent
252                         // updates
253                         synchronized(this)
254                         {
255                             if (child.getChannelCount() > 0)
256                             {
257                                 // remove the children of the child
258                                 for (ChannelImpl c : child._children.values())
259                                     child.doRemove(c,listeners);
260                             }
261 
262                             // remove the child
263                             _children.remove(key);
264                         }
265                         for (ChannelBayeuxListener l : listeners)
266                             l.channelRemoved(channel);
267                         return true;
268                     }
269                     return false;
270                 }
271 
272                 boolean removed=child.doRemove(channel,listeners);
273                 if (removed && !child.isPersistent() && child.getChannelCount() == 0 && child.getSubscriberCount() == 0)
274                 {
275                     // synchronize add and doRemove to avoid concurrent updates
276                     synchronized(this)
277                     {
278                         _children.remove(key);
279                     }
280                     for (ChannelBayeuxListener l : listeners)
281                         l.channelRemoved(channel);
282                 }
283 
284                 return removed;
285             }
286 
287         }
288         return false;
289     }
290 
291     /* ------------------------------------------------------------ */
292     /**
293      * @param filter
294      */
295     public DataFilter removeDataFilter(DataFilter filter)
296     {
297         synchronized(this)
298         {
299             _dataFilters=(DataFilter[])LazyList.removeFromArray(_dataFilters,filter);
300             return filter;
301         }
302     }
303 
304     /* ------------------------------------------------------------ */
305     public void setPersistent(boolean persistent)
306     {
307         _persistent=persistent;
308     }
309 
310     /* ------------------------------------------------------------ */
311     /**
312      * @param client
313      */
314     public void subscribe(Client client)
315     {
316         if (!(client instanceof ClientImpl))
317             throw new IllegalArgumentException("Client instance not obtained from Bayeux.newClient()");
318 
319         synchronized(this)
320         {
321             for (ClientImpl c : _subscribers)
322             {
323                 if (client.equals(c))
324                     return;
325             }
326             _subscribers=(ClientImpl[])LazyList.addToArray(_subscribers,client,null);
327 
328             for (SubscriptionListener l : _subscriptionListeners)
329                 l.subscribed(client,this);
330         }
331 
332         ((ClientImpl)client).addSubscription(this);
333     }
334 
335     /* ------------------------------------------------------------ */
336     @Override
337     public String toString()
338     {
339         return _id.toString();
340     }
341 
342     /* ------------------------------------------------------------ */
343     /**
344      * @param client
345      */
346     public void unsubscribe(Client client)
347     {
348         if (!(client instanceof ClientImpl))
349             throw new IllegalArgumentException("Client instance not obtained from Bayeux.newClient()");
350         ((ClientImpl)client).removeSubscription(this);
351         synchronized(this)
352         {
353             _subscribers=(ClientImpl[])LazyList.removeFromArray(_subscribers,client);
354 
355             for (SubscriptionListener l : _subscriptionListeners)
356                 l.unsubscribed(client,this);
357 
358             if (!_persistent && _subscribers.length == 0 && _children.size() == 0)
359                 remove();
360         }
361     }
362 
363     /* ------------------------------------------------------------ */
364     protected void doDelivery(ChannelId to, Client from, Message msg)
365     {
366         int tail=to.depth() - _id.depth();
367 
368         Object data=msg.getData();
369 
370         // if we have data, filter it
371         if (data != null)
372         {
373             Object old=data;
374 
375             try
376             {
377                 switch(tail)
378                 {
379                     case 0:
380                     {
381                         final DataFilter[] filters=_dataFilters;
382                         for (DataFilter filter : filters)
383                         {
384                             data=filter.filter(from,this,data);
385                             if (data == null)
386                                 return;
387                         }
388                     }
389                         break;
390 
391                     case 1:
392                         if (_wild != null)
393                         {
394                             final DataFilter[] filters=_wild._dataFilters;
395                             for (DataFilter filter : filters)
396                             {
397                                 data=filter.filter(from,this,data);
398                                 if (data == null)
399                                     return;
400                             }
401                         }
402 
403                     default:
404                         if (_wildWild != null)
405                         {
406                             final DataFilter[] filters=_wildWild._dataFilters;
407                             for (DataFilter filter : filters)
408                             {
409                                 data=filter.filter(from,this,data);
410                                 if (data == null)
411                                     return;
412                             }
413                         }
414                 }
415             }
416             catch(IllegalStateException e)
417             {
418                 Log.ignore(e);
419                 return;
420             }
421 
422             // TODO this may not be correct if the message is reused.
423             // probably should close message ?
424             if (data != old)
425                 msg.put(AbstractBayeux.DATA_FIELD,data);
426         }
427 
428         switch(tail)
429         {
430             case 0:
431             {
432                 if (_lazy && msg instanceof MessageImpl)
433                     ((MessageImpl)msg).setLazy(true);
434 
435                 final ClientImpl[] subscribers=_subscribers;
436                 if (subscribers.length > 0)
437                 {
438                     // fair delivery
439                     int split=_split++ % _subscribers.length;
440                     for (int i=split; i < subscribers.length; i++)
441                         subscribers[i].doDelivery(from,msg);
442                     for (int i=0; i < split; i++)
443                         subscribers[i].doDelivery(from,msg);
444                 }
445                 break;
446             }
447 
448             case 1:
449                 if (_wild != null)
450                 {
451                     if (_wild._lazy && msg instanceof MessageImpl)
452                         ((MessageImpl)msg).setLazy(true);
453                     final ClientImpl[] subscribers=_wild._subscribers;
454                     for (ClientImpl client : subscribers)
455                         client.doDelivery(from,msg);
456                 }
457 
458             default:
459             {
460                 if (_wildWild != null)
461                 {
462                     if (_wildWild._lazy && msg instanceof MessageImpl)
463                         ((MessageImpl)msg).setLazy(true);
464                     final ClientImpl[] subscribers=_wildWild._subscribers;
465                     for (ClientImpl client : subscribers)
466                         client.doDelivery(from,msg);
467                 }
468                 String next=to.getSegment(_id.depth());
469                 ChannelImpl channel=_children.get(next);
470                 if (channel != null)
471                     channel.doDelivery(to,from,msg);
472             }
473         }
474     }
475 
476     /* ------------------------------------------------------------ */
477     public Collection<Client> getSubscribers()
478     {
479         synchronized(this)
480         {
481             return Arrays.asList((Client[])_subscribers);
482         }
483     }
484 
485     /* ------------------------------------------------------------ */
486     public int getSubscriberCount()
487     {
488         synchronized(this)
489         {
490             return _subscribers.length;
491         }
492     }
493 
494     /* ------------------------------------------------------------ */
495     /*
496      * (non-Javadoc)
497      * 
498      * @see dojox.cometd.Channel#getFilters()
499      */
500     public Collection<DataFilter> getDataFilters()
501     {
502         synchronized(this)
503         {
504             return Arrays.asList(_dataFilters);
505         }
506     }
507 
508     /* ------------------------------------------------------------ */
509     public void addListener(ChannelListener listener)
510     {
511         synchronized(this)
512         {
513             if (listener instanceof SubscriptionListener)
514                 _subscriptionListeners=(SubscriptionListener[])LazyList.addToArray(_subscriptionListeners,listener,null);
515         }
516     }
517 
518     public void removeListener(ChannelListener listener)
519     {
520         synchronized(this)
521         {
522             if (listener instanceof SubscriptionListener)
523             {
524                 _subscriptionListeners=(SubscriptionListener[])LazyList.removeFromArray(_subscriptionListeners,listener);
525             }
526         }
527     }
528 }