View Javadoc

1   // ========================================================================
2   // Copyright 2006 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // Licensed under the Apache License, Version 2.0 (the "License");
5   // you may not use this file except in compliance with the License.
6   // You may obtain a copy of the License at
7   // http://www.apache.org/licenses/LICENSE-2.0
8   // Unless required by applicable law or agreed to in writing, software
9   // distributed under the License is distributed on an "AS IS" BASIS,
10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  // See the License for the specific language governing permissions and
12  // limitations under the License.
13  //========================================================================
14  
15  package org.mortbay.cometd;
16  
17  import java.util.ArrayList;
18  import java.util.List;
19  import java.util.Queue;
20  
21  import org.cometd.Bayeux;
22  import org.cometd.Channel;
23  import org.cometd.Client;
24  import org.cometd.ClientListener;
25  import org.cometd.DeliverListener;
26  import org.cometd.Extension;
27  import org.cometd.Message;
28  import org.cometd.MessageListener;
29  import org.cometd.QueueListener;
30  import org.cometd.RemoveListener;
31  import org.mortbay.log.Log;
32  import org.mortbay.util.ArrayQueue;
33  import org.mortbay.util.LazyList;
34  import org.mortbay.util.ajax.JSON;
35  
36  /* ------------------------------------------------------------ */
37  /**
38   *
39   * @author gregw
40   */
41  public class ClientImpl implements Client
42  {
43      private String _id;
44      private String _type;
45      private int _responsesPending;
46      private ChannelImpl[] _subscriptions=new ChannelImpl[0]; // copy on write
47      private RemoveListener[] _rListeners; // copy on write
48      private MessageListener[] _syncMListeners; // copy on write
49      private MessageListener[] _asyncMListeners; // copy on write
50      private QueueListener[] _qListeners; // copy on write
51      private DeliverListener[] _dListeners; // copy on write
52      protected AbstractBayeux _bayeux;
53      private String _browserId;
54      private JSON.Literal _advice;
55      private int _batch;
56      private int _maxQueue;
57      private ArrayQueue<Message> _queue=new ArrayQueue<Message>(8,16,this);
58      private long _timeout;
59      private long _interval;
60      private int _lag;
61      private Extension[] _extensions;
62  
63      private boolean _deliverViaMetaConnectOnly;
64      private volatile boolean _isExpired;
65  
66      // manipulated and synchronized by AbstractBayeux
67      int _adviseVersion;
68  
69      /* ------------------------------------------------------------ */
70      protected ClientImpl(AbstractBayeux bayeux)
71      {
72          _bayeux=bayeux;
73          _maxQueue=bayeux.getMaxClientQueue();
74          _bayeux.addClient(this,null);
75          if (_bayeux.isLogInfo())
76              _bayeux.logInfo("newClient: " + this);
77      }
78  
79      /* ------------------------------------------------------------ */
80      protected ClientImpl(AbstractBayeux bayeux, String idPrefix)
81      {
82          _bayeux=bayeux;
83          _maxQueue=0;
84  
85          _bayeux.addClient(this,idPrefix);
86  
87          if (_bayeux.isLogInfo())
88              _bayeux.logInfo("newClient: " + this);
89      }
90  
91      /* ------------------------------------------------------------ */
92      public void addExtension(Extension ext)
93      {
94          _extensions=(Extension[])LazyList.addToArray(_extensions,ext,Extension.class);
95      }
96  
97      /* ------------------------------------------------------------ */
98      Extension[] getExtensions()
99      {
100         return _extensions;
101     }
102 
103     /* ------------------------------------------------------------ */
104     public void deliver(Client from, String toChannel, Object data, String id)
105     {
106         MessageImpl message=_bayeux.newMessage();
107         message.put(Bayeux.CHANNEL_FIELD,toChannel);
108         message.put(Bayeux.DATA_FIELD,data);
109         if (id != null)
110             message.put(Bayeux.ID_FIELD,id);
111 
112         Message m=_bayeux.extendSendBayeux(from,message);
113         if (m != null)
114             doDelivery(from,m);
115         if (m instanceof MessageImpl)
116             ((MessageImpl)m).decRef();
117     }
118 
119     /* ------------------------------------------------------------ */
120     public void deliverLazy(Client from, String toChannel, Object data, String id)
121     {
122         MessageImpl message=_bayeux.newMessage();
123         message.put(Bayeux.CHANNEL_FIELD,toChannel);
124         message.put(Bayeux.DATA_FIELD,data);
125         if (id != null)
126             message.put(Bayeux.ID_FIELD,id);
127         message.setLazy(true);
128         Message m=_bayeux.extendSendBayeux(from,message);
129         if (m != null)
130             doDelivery(from,m);
131         if (m instanceof MessageImpl)
132             ((MessageImpl)m).decRef();
133     }
134 
135     /* ------------------------------------------------------------ */
136     protected void doDelivery(Client from, final Message msg)
137     {
138         final Message message=_bayeux.extendSendClient(from,this,msg);
139         if (message == null)
140             return;
141 
142         MessageListener[] alisteners=null;
143         synchronized(this)
144         {
145             if (_maxQueue < 0)
146             {
147                 // No queue limit, so always queue the message
148                 ((MessageImpl)message).incRef();
149                 _queue.addUnsafe(message);
150             }
151             else
152             {
153                 // We have a queue limit,
154                 boolean queue;
155                 if (_queue.size() >= _maxQueue)
156                 {
157                     // We are over the limit, so consult listeners
158                     if (_qListeners != null && _qListeners.length > 0)
159                     {
160                         queue=true;
161                         for (QueueListener l : _qListeners)
162                             queue &= notifyQueueListener(l, from, message);
163                     }
164                     else
165                         queue=false;
166                 }
167                 else
168                     // we are under limit, so queue the messages.
169                     queue=true;
170 
171                 // queue the message if we are meant to
172                 if (queue)
173                 {
174                     ((MessageImpl)message).incRef();
175                     _queue.addUnsafe(message);
176                 }
177             }
178 
179             // deliver synchronized
180             if (_syncMListeners != null)
181                 for (MessageListener l : _syncMListeners)
182                     notifyMessageListener(l, from, message);
183             alisteners=_asyncMListeners;
184 
185             if (_batch == 0 && _responsesPending < 1 && _queue.size() > 0)
186             {
187                 if (((MessageImpl)message).isLazy())
188                     lazyResume();
189                 else
190                     resume();
191             }
192         }
193 
194         // deliver unsynchronized
195         if (alisteners != null)
196             for (MessageListener l : alisteners)
197                 notifyMessageListener(l, from, message);
198     }
199 
200     private boolean notifyQueueListener(QueueListener listener, Client from, Message message)
201     {
202         try
203         {
204             return listener.queueMaxed(from, this, message);
205         }
206         catch (Throwable x)
207         {
208             Log.debug(x);
209             return false;
210         }
211     }
212 
213     private void notifyMessageListener(MessageListener listener, Client from, Message message)
214     {
215         try
216         {
217             listener.deliver(from, this, message);
218         }
219         catch (Throwable x)
220         {
221             Log.debug(x);
222         }
223     }
224 
225     /* ------------------------------------------------------------ */
226     public void doDeliverListeners()
227     {
228         synchronized(this)
229         {
230             if (_dListeners != null)
231                 for (DeliverListener l : _dListeners)
232                     notifyDeliverListener(l, _queue);
233         }
234     }
235 
236     private void notifyDeliverListener(DeliverListener listener, Queue<Message> queue)
237     {
238         try
239         {
240             listener.deliver(this, queue);
241         }
242         catch (Throwable x)
243         {
244             Log.debug(x);
245         }
246     }
247 
248     /* ------------------------------------------------------------ */
249     public void setMetaConnectDeliveryOnly(boolean deliverViaMetaConnectOnly)
250     {
251         _deliverViaMetaConnectOnly=deliverViaMetaConnectOnly;
252     }
253 
254     /* ------------------------------------------------------------ */
255     public boolean isMetaConnectDeliveryOnly()
256     {
257         return _deliverViaMetaConnectOnly;
258     }
259 
260     /* ------------------------------------------------------------ */
261     public void startBatch()
262     {
263         synchronized(this)
264         {
265             _batch++;
266         }
267     }
268 
269     /* ------------------------------------------------------------ */
270     public void endBatch()
271     {
272         synchronized(this)
273         {
274             if (--_batch == 0 && _responsesPending < 1)
275             {
276                 batch:switch(_queue.size())
277                 {
278                     case 0:
279                         break;
280                     case 1:
281                         if (((MessageImpl)_queue.get(0)).isLazy())
282                             lazyResume();
283                         else
284                             resume();
285                         break;
286                     default:
287                         for (int i=_queue.size();i-->0;)
288                         {
289                             if (!((MessageImpl)_queue.get(i)).isLazy())
290                             {
291                                 resume();
292                                 break batch;
293                             }
294                         }
295                         lazyResume();
296                 }
297             }
298         }
299     }
300 
301     /* ------------------------------------------------------------ */
302     public String getConnectionType()
303     {
304         return _type;
305     }
306 
307     /* ------------------------------------------------------------ */
308     /*
309      * (non-Javadoc)
310      *
311      * @see org.mortbay.cometd.C#getId()
312      */
313     public String getId()
314     {
315         return _id;
316     }
317 
318     /* ------------------------------------------------------------ */
319     public boolean hasMessages()
320     {
321         return _queue.size() > 0;
322     }
323 
324     /* ------------------------------------------------------------ */
325     public boolean hasNonLazyMessages()
326     {
327         synchronized(this)
328         {
329             for (int i=_queue.size(); i-- > 0;)
330             {
331                 if (!((MessageImpl)_queue.getUnsafe(i)).isLazy())
332                     return true;
333             }
334         }
335         return false;
336     }
337 
338     /* ------------------------------------------------------------ */
339     public boolean isLocal()
340     {
341         return true;
342     }
343 
344     /* ------------------------------------------------------------ */
345     /*
346      * (non-Javadoc)
347      *
348      * @see org.cometd.Client#disconnect()
349      */
350     public void disconnect()
351     {
352         synchronized(this)
353         {
354             if (_bayeux.hasClient(_id))
355                 remove(false);
356         }
357     }
358 
359     /* ------------------------------------------------------------ */
360     /*
361      * (non-Javadoc)
362      *
363      * @see dojox.cometd.Client#remove(boolean)
364      */
365     public void remove(boolean timeout)
366     {
367         _isExpired=timeout;
368         Client client=_bayeux.removeClient(_id);
369 
370         if (client != null && _bayeux.isLogInfo())
371             _bayeux.logInfo("Remove client " + client + " timeout=" + timeout);
372 
373         final String browser_id;
374         final RemoveListener[] listeners;
375         synchronized(this)
376         {
377             browser_id=_browserId;
378             _browserId=null;
379             listeners=_rListeners;
380         }
381 
382         if (browser_id != null)
383             _bayeux.clientOffBrowser(browser_id,_id);
384         if (listeners != null)
385             for (RemoveListener l : listeners)
386                 notifyRemoveListener(l, _id, timeout);
387 
388         resume();
389     }
390 
391     private void notifyRemoveListener(RemoveListener listener, String clientId, boolean timeout)
392     {
393         try
394         {
395             listener.removed(clientId, timeout);
396         }
397         catch (Throwable x)
398         {
399             Log.debug(x);
400         }
401     }
402 
403     /* ------------------------------------------------------------ */
404     public boolean isExpired()
405     {
406         return _isExpired;
407     }
408 
409     /* ------------------------------------------------------------ */
410     public int responded()
411     {
412         synchronized(this)
413         {
414             return _responsesPending--;
415         }
416     }
417 
418     /* ------------------------------------------------------------ */
419     public int responsePending()
420     {
421         synchronized(this)
422         {
423             return ++_responsesPending;
424         }
425     }
426 
427     /* ------------------------------------------------------------ */
428     /**
429      * Called by deliver to resume anything waiting on this client lazily
430      */
431     public void lazyResume()
432     {
433     }
434 
435     /* ------------------------------------------------------------ */
436     /**
437      * Called by deliver to resume anything waiting on this client.
438      */
439     public void resume()
440     {
441     }
442 
443     /* ------------------------------------------------------------ */
444     /*
445      * @return the number of messages queued
446      */
447     public int getMessages()
448     {
449         return _queue.size();
450     }
451 
452     /* ------------------------------------------------------------ */
453     public List<Message> takeMessages()
454     {
455         synchronized(this)
456         {
457             ArrayList<Message> list=new ArrayList<Message>(_queue);
458             _queue.clear();
459             return list;
460         }
461     }
462 
463     /* ------------------------------------------------------------ */
464     public void returnMessages(List<Message> messages)
465     {
466         synchronized(this)
467         {
468             _queue.addAll(0,messages);
469         }
470     }
471 
472     /* ------------------------------------------------------------ */
473     @Override
474     public String toString()
475     {
476         return _id;
477     }
478 
479     /* ------------------------------------------------------------ */
480     protected void addSubscription(ChannelImpl channel)
481     {
482         synchronized(this)
483         {
484             _subscriptions=(ChannelImpl[])LazyList.addToArray(_subscriptions,channel,null);
485         }
486     }
487 
488     /* ------------------------------------------------------------ */
489     protected void removeSubscription(ChannelImpl channel)
490     {
491         synchronized(this)
492         {
493             _subscriptions=(ChannelImpl[])LazyList.removeFromArray(_subscriptions,channel);
494         }
495     }
496 
497     /* ------------------------------------------------------------ */
498     protected void setConnectionType(String type)
499     {
500         synchronized(this)
501         {
502             _type=type;
503         }
504     }
505 
506     /* ------------------------------------------------------------ */
507     protected void setId(String id)
508     {
509         synchronized(this)
510         {
511             _id=id;
512         }
513     }
514 
515     /* ------------------------------------------------------------ */
516     public void unsubscribeAll()
517     {
518         ChannelImpl[] subscriptions;
519         synchronized(this)
520         {
521             subscriptions=_subscriptions;
522             _subscriptions=new ChannelImpl[0];
523         }
524         for (ChannelImpl channel : subscriptions)
525             channel.unsubscribe(this);
526 
527     }
528 
529     /* ------------------------------------------------------------ */
530     public void setBrowserId(String id)
531     {
532         if (_browserId != null && !_browserId.equals(id))
533             _bayeux.clientOffBrowser(_browserId,_id);
534         _browserId=id;
535         if (_browserId != null)
536             _bayeux.clientOnBrowser(_browserId,_id);
537     }
538 
539     /* ------------------------------------------------------------ */
540     public String getBrowserId()
541     {
542         return _browserId;
543     }
544 
545     /* ------------------------------------------------------------ */
546     @Override
547     public boolean equals(Object o)
548     {
549         if (!(o instanceof Client))
550             return false;
551         return getId().equals(((Client)o).getId());
552     }
553 
554     /* ------------------------------------------------------------ */
555     /**
556      * Get the advice specific for this Client
557      *
558      * @return advice specific for this client or null
559      */
560     public JSON.Literal getAdvice()
561     {
562         return _advice;
563     }
564 
565     /* ------------------------------------------------------------ */
566     /**
567      * @param advice
568      *            specific for this client
569      */
570     public void setAdvice(JSON.Literal advice)
571     {
572         _advice=advice;
573     }
574 
575     /* ------------------------------------------------------------ */
576     public void addListener(ClientListener listener)
577     {
578         synchronized(this)
579         {
580             if (listener instanceof MessageListener)
581             {
582                 if (listener instanceof MessageListener.Synchronous)
583                     _syncMListeners=(MessageListener[])LazyList.addToArray(_syncMListeners,listener,MessageListener.class);
584                 else
585                     _asyncMListeners=(MessageListener[])LazyList.addToArray(_asyncMListeners,listener,MessageListener.class);
586             }
587 
588             if (listener instanceof RemoveListener)
589                 _rListeners=(RemoveListener[])LazyList.addToArray(_rListeners,listener,RemoveListener.class);
590 
591             if (listener instanceof QueueListener)
592                 _qListeners=(QueueListener[])LazyList.addToArray(_qListeners,listener,QueueListener.class);
593 
594             if (listener instanceof DeliverListener)
595                 _dListeners=(DeliverListener[])LazyList.addToArray(_dListeners,listener,DeliverListener.class);
596         }
597     }
598 
599     /* ------------------------------------------------------------ */
600     public void removeListener(ClientListener listener)
601     {
602         synchronized(this)
603         {
604             if (listener instanceof MessageListener)
605             {
606                 _syncMListeners=(MessageListener[])LazyList.removeFromArray(_syncMListeners,listener);
607                 _asyncMListeners=(MessageListener[])LazyList.removeFromArray(_asyncMListeners,listener);
608             }
609 
610             if (listener instanceof RemoveListener)
611                 _rListeners=(RemoveListener[])LazyList.removeFromArray(_rListeners,listener);
612 
613             if (listener instanceof QueueListener)
614                 _qListeners=(QueueListener[])LazyList.removeFromArray(_qListeners,listener);
615         }
616     }
617 
618     /* ------------------------------------------------------------ */
619     public long getInterval()
620     {
621         return _interval;
622     }
623 
624     /* ------------------------------------------------------------ */
625     /**
626      * Set per client interval
627      *
628      * @param intervalMS
629      *            timeout in MS for longpoll duration or 0 to use default from
630      *            {@link AbstractBayeux#getMaxInterval()}.
631      */
632     public void setInterval(long intervalMS)
633     {
634         _interval=intervalMS;
635     }
636 
637     /* ------------------------------------------------------------ */
638     public long getTimeout()
639     {
640         return _timeout;
641     }
642 
643     /* ------------------------------------------------------------ */
644     /**
645      * Set per client timeout
646      *
647      * @param timeoutMS
648      *            timeout in MS for longpoll duration or 0 to use default from
649      *            {@link AbstractBayeux#getTimeout()}.
650      */
651     public void setTimeout(long timeoutMS)
652     {
653         _timeout=timeoutMS;
654     }
655 
656     /* ------------------------------------------------------------ */
657     public void setMaxQueue(int maxQueue)
658     {
659         _maxQueue=maxQueue;
660     }
661 
662     /* ------------------------------------------------------------ */
663     public int getMaxQueue()
664     {
665         return _maxQueue;
666     }
667 
668     /* ------------------------------------------------------------ */
669     public Queue<Message> getQueue()
670     {
671         return _queue;
672     }
673 
674     /* ------------------------------------------------------------ */
675     /**
676      * @see org.mortbay.cometd.ext.TimesyncExtension
677      * @return The lag in ms as measured by an extension like the
678      *         TimesyncExtension
679      */
680     public int getLag()
681     {
682         return _lag;
683     }
684 
685     /* ------------------------------------------------------------ */
686     /**
687      * @see org.mortbay.cometd.ext.TimesyncExtension
688      * @param lag
689      *            in ms
690      */
691     public void setLag(int lag)
692     {
693         _lag=lag;
694     }
695 
696     /* ------------------------------------------------------------ */
697     /**
698      * Get the subscribed to channels
699      *
700      * @return A copied array of the channels to which this client is subscribed
701      */
702     public Channel[] getSubscriptions()
703     {
704         ChannelImpl[] subscriptions=_subscriptions;
705         if (subscriptions == null)
706             return null;
707         Channel[] channels=new Channel[subscriptions.length];
708         System.arraycopy(subscriptions,0,channels,0,subscriptions.length);
709         return channels;
710     }
711 
712 }