View Javadoc

1   // ========================================================================
2   // Copyright 2006 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // Licensed under the Apache License, Version 2.0 (the "License");
5   // you may not use this file except in compliance with the License.
6   // You may obtain a copy of the License at 
7   // http://www.apache.org/licenses/LICENSE-2.0
8   // Unless required by applicable law or agreed to in writing, software
9   // distributed under the License is distributed on an "AS IS" BASIS,
10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  // See the License for the specific language governing permissions and
12  // limitations under the License.
13  //========================================================================
14  
15  package org.mortbay.cometd;
16  
17  import java.util.ArrayList;
18  import java.util.List;
19  import java.util.Queue;
20  
21  import org.cometd.Bayeux;
22  import org.cometd.Channel;
23  import org.cometd.Client;
24  import org.cometd.ClientListener;
25  import org.cometd.DeliverListener;
26  import org.cometd.Extension;
27  import org.cometd.Message;
28  import org.cometd.MessageListener;
29  import org.cometd.QueueListener;
30  import org.cometd.RemoveListener;
31  import org.mortbay.util.ArrayQueue;
32  import org.mortbay.util.LazyList;
33  import org.mortbay.util.ajax.JSON;
34  
35  /* ------------------------------------------------------------ */
36  /**
37   * 
38   * @author gregw
39   */
40  public class ClientImpl implements Client
41  {
42      private String _id;
43      private String _type;
44      private int _responsesPending;
45      private ChannelImpl[] _subscriptions=new ChannelImpl[0]; // copy on write
46      private RemoveListener[] _rListeners; // copy on write
47      private MessageListener[] _syncMListeners; // copy on write
48      private MessageListener[] _asyncMListeners; // copy on write
49      private QueueListener[] _qListeners; // copy on write
50      private DeliverListener[] _dListeners; // copy on write
51      protected AbstractBayeux _bayeux;
52      private String _browserId;
53      private JSON.Literal _advice;
54      private int _batch;
55      private int _maxQueue;
56      private ArrayQueue<Message> _queue=new ArrayQueue<Message>(8,16,this);
57      private long _timeout;
58      private long _interval;
59      private int _lag;
60      private Extension[] _extensions;
61  
62      private boolean _deliverViaMetaConnectOnly;
63      private volatile boolean _isExpired;
64  
65      // manipulated and synchronized by AbstractBayeux
66      int _adviseVersion;
67  
68      /* ------------------------------------------------------------ */
69      protected ClientImpl(AbstractBayeux bayeux)
70      {
71          _bayeux=bayeux;
72          _maxQueue=bayeux.getMaxClientQueue();
73          _bayeux.addClient(this,null);
74          if (_bayeux.isLogInfo())
75              _bayeux.logInfo("newClient: " + this);
76      }
77  
78      /* ------------------------------------------------------------ */
79      protected ClientImpl(AbstractBayeux bayeux, String idPrefix)
80      {
81          _bayeux=bayeux;
82          _maxQueue=0;
83  
84          _bayeux.addClient(this,idPrefix);
85  
86          if (_bayeux.isLogInfo())
87              _bayeux.logInfo("newClient: " + this);
88      }
89  
90      /* ------------------------------------------------------------ */
91      public void addExtension(Extension ext)
92      {
93          _extensions=(Extension[])LazyList.addToArray(_extensions,ext,Extension.class);
94      }
95  
96      /* ------------------------------------------------------------ */
97      Extension[] getExtensions()
98      {
99          return _extensions;
100     }
101 
102     /* ------------------------------------------------------------ */
103     public void deliver(Client from, String toChannel, Object data, String id)
104     {
105         MessageImpl message=_bayeux.newMessage();
106         message.put(Bayeux.CHANNEL_FIELD,toChannel);
107         message.put(Bayeux.DATA_FIELD,data);
108         if (id != null)
109             message.put(Bayeux.ID_FIELD,id);
110 
111         Message m=_bayeux.extendSendBayeux(from,message);
112         if (m != null)
113             doDelivery(from,m);
114         if (m instanceof MessageImpl)
115             ((MessageImpl)m).decRef();
116     }
117 
118     /* ------------------------------------------------------------ */
119     public void deliverLazy(Client from, String toChannel, Object data, String id)
120     {
121         MessageImpl message=_bayeux.newMessage();
122         message.put(Bayeux.CHANNEL_FIELD,toChannel);
123         message.put(Bayeux.DATA_FIELD,data);
124         if (id != null)
125             message.put(Bayeux.ID_FIELD,id);
126         message.setLazy(true);
127         Message m=_bayeux.extendSendBayeux(from,message);
128         if (m != null)
129             doDelivery(from,m);
130         if (m instanceof MessageImpl)
131             ((MessageImpl)m).decRef();
132     }
133 
134     /* ------------------------------------------------------------ */
135     protected void doDelivery(Client from, final Message msg)
136     {
137         final Message message=_bayeux.extendSendClient(from,this,msg);
138         if (message == null)
139             return;
140 
141         MessageListener[] alisteners=null;
142         synchronized(this)
143         {
144             if (_maxQueue < 0)
145             {
146                 // No queue limit, so always queue the message
147                 ((MessageImpl)message).incRef();
148                 _queue.addUnsafe(message);
149             }
150             else
151             {
152                 // We have a queue limit,
153                 boolean queue;
154                 if (_queue.size() >= _maxQueue)
155                 {
156                     // We are over the limit, so consult listeners
157                     if (_qListeners != null && _qListeners.length > 0)
158                     {
159                         queue=true;
160                         for (QueueListener l : _qListeners)
161                             queue&=l.queueMaxed(from,this,message);
162                     }
163                     else
164                         queue=false;
165                 }
166                 else
167                     // we are under limit, so queue the messages.
168                     queue=true;
169 
170                 // queue the message if we are meant to
171                 if (queue)
172                 {
173                     ((MessageImpl)message).incRef();
174                     _queue.addUnsafe(message);
175                 }
176             }
177 
178             // deliver synchronized
179             if (_syncMListeners != null)
180                 for (MessageListener l : _syncMListeners)
181                     l.deliver(from,this,message);
182             alisteners=_asyncMListeners;
183 
184             if (_batch == 0 && _responsesPending < 1 && _queue.size() > 0)
185             {
186                 if (((MessageImpl)message).isLazy())
187                     lazyResume();
188                 else
189                     resume();
190             }
191         }
192 
193         // deliver unsynchronized
194         if (alisteners != null)
195             for (MessageListener l : alisteners)
196                 l.deliver(from,this,message);
197     }
198 
199     /* ------------------------------------------------------------ */
200     public void doDeliverListeners()
201     {
202         synchronized(this)
203         {
204             if (_dListeners != null)
205                 for (DeliverListener l : _dListeners)
206                     l.deliver(this,_queue);
207         }
208     }
209 
210     /* ------------------------------------------------------------ */
211     public void setMetaConnectDeliveryOnly(boolean deliverViaMetaConnectOnly)
212     {
213         _deliverViaMetaConnectOnly=deliverViaMetaConnectOnly;
214     }
215 
216     /* ------------------------------------------------------------ */
217     public boolean isMetaConnectDeliveryOnly()
218     {
219         return _deliverViaMetaConnectOnly;
220     }
221 
222     /* ------------------------------------------------------------ */
223     public void startBatch()
224     {
225         synchronized(this)
226         {
227             _batch++;
228         }
229     }
230 
231     /* ------------------------------------------------------------ */
232     public void endBatch()
233     {
234         synchronized(this)
235         {
236             if (--_batch == 0 && _responsesPending < 1)
237             {
238                 batch:switch(_queue.size())
239                 {
240                     case 0:
241                         break;
242                     case 1:
243                         if (((MessageImpl)_queue.get(0)).isLazy())
244                             lazyResume();
245                         else
246                             resume();
247                         break;
248                     default:
249                         for (int i=_queue.size();i-->0;)
250                         {
251                             if (!((MessageImpl)_queue.get(i)).isLazy())
252                             {
253                                 resume();
254                                 break batch;
255                             }
256                         }
257                         lazyResume();
258                 }
259             }
260         }
261     }
262 
263     /* ------------------------------------------------------------ */
264     public String getConnectionType()
265     {
266         return _type;
267     }
268 
269     /* ------------------------------------------------------------ */
270     /*
271      * (non-Javadoc)
272      * 
273      * @see org.mortbay.cometd.C#getId()
274      */
275     public String getId()
276     {
277         return _id;
278     }
279 
280     /* ------------------------------------------------------------ */
281     public boolean hasMessages()
282     {
283         return _queue.size() > 0;
284     }
285 
286     /* ------------------------------------------------------------ */
287     public boolean hasNonLazyMessages()
288     {
289         synchronized(this)
290         {
291             for (int i=_queue.size(); i-- > 0;)
292             {
293                 if (!((MessageImpl)_queue.getUnsafe(i)).isLazy())
294                     return true;
295             }
296         }
297         return false;
298     }
299 
300     /* ------------------------------------------------------------ */
301     public boolean isLocal()
302     {
303         return true;
304     }
305 
306     /* ------------------------------------------------------------ */
307     /*
308      * (non-Javadoc)
309      * 
310      * @see org.cometd.Client#disconnect()
311      */
312     public void disconnect()
313     {
314         synchronized(this)
315         {
316             if (_bayeux.hasClient(_id))
317                 remove(false);
318         }
319     }
320 
321     /* ------------------------------------------------------------ */
322     /*
323      * (non-Javadoc)
324      * 
325      * @see dojox.cometd.Client#remove(boolean)
326      */
327     public void remove(boolean timeout)
328     {
329         _isExpired=timeout;
330         Client client=_bayeux.removeClient(_id);
331 
332         if (client != null && _bayeux.isLogInfo())
333             _bayeux.logInfo("Remove client " + client + " timeout=" + timeout);
334 
335         final String browser_id;
336         final RemoveListener[] listeners;
337         synchronized(this)
338         {
339             browser_id=_browserId;
340             _browserId=null;
341             listeners=_rListeners;
342         }
343 
344         if (browser_id != null)
345             _bayeux.clientOffBrowser(browser_id,_id);
346         if (listeners != null)
347             for (RemoveListener l : listeners)
348                 l.removed(_id,timeout);
349 
350         resume();
351     }
352 
353     /* ------------------------------------------------------------ */
354     public boolean isExpired()
355     {
356         return _isExpired;
357     }
358 
359     /* ------------------------------------------------------------ */
360     public int responded()
361     {
362         synchronized(this)
363         {
364             return _responsesPending--;
365         }
366     }
367 
368     /* ------------------------------------------------------------ */
369     public int responsePending()
370     {
371         synchronized(this)
372         {
373             return ++_responsesPending;
374         }
375     }
376 
377     /* ------------------------------------------------------------ */
378     /**
379      * Called by deliver to resume anything waiting on this client lazily
380      */
381     public void lazyResume()
382     {
383     }
384     
385     /* ------------------------------------------------------------ */
386     /**
387      * Called by deliver to resume anything waiting on this client.
388      */
389     public void resume()
390     {
391     }
392 
393     /* ------------------------------------------------------------ */
394     /*
395      * @return the number of messages queued
396      */
397     public int getMessages()
398     {
399         return _queue.size();
400     }
401 
402     /* ------------------------------------------------------------ */
403     public List<Message> takeMessages()
404     {
405         synchronized(this)
406         {
407             ArrayList<Message> list=new ArrayList<Message>(_queue);
408             _queue.clear();
409             return list;
410         }
411     }
412 
413     /* ------------------------------------------------------------ */
414     public void returnMessages(List<Message> messages)
415     {
416         synchronized(this)
417         {
418             _queue.addAll(0,messages);
419         }
420     }
421 
422     /* ------------------------------------------------------------ */
423     @Override
424     public String toString()
425     {
426         return _id;
427     }
428 
429     /* ------------------------------------------------------------ */
430     protected void addSubscription(ChannelImpl channel)
431     {
432         synchronized(this)
433         {
434             _subscriptions=(ChannelImpl[])LazyList.addToArray(_subscriptions,channel,null);
435         }
436     }
437 
438     /* ------------------------------------------------------------ */
439     protected void removeSubscription(ChannelImpl channel)
440     {
441         synchronized(this)
442         {
443             _subscriptions=(ChannelImpl[])LazyList.removeFromArray(_subscriptions,channel);
444         }
445     }
446 
447     /* ------------------------------------------------------------ */
448     protected void setConnectionType(String type)
449     {
450         synchronized(this)
451         {
452             _type=type;
453         }
454     }
455 
456     /* ------------------------------------------------------------ */
457     protected void setId(String id)
458     {
459         synchronized(this)
460         {
461             _id=id;
462         }
463     }
464 
465     /* ------------------------------------------------------------ */
466     public void unsubscribeAll()
467     {
468         ChannelImpl[] subscriptions;
469         synchronized(this)
470         {
471             subscriptions=_subscriptions;
472             _subscriptions=new ChannelImpl[0];
473         }
474         for (ChannelImpl channel : subscriptions)
475             channel.unsubscribe(this);
476 
477     }
478 
479     /* ------------------------------------------------------------ */
480     public void setBrowserId(String id)
481     {
482         if (_browserId != null && !_browserId.equals(id))
483             _bayeux.clientOffBrowser(_browserId,_id);
484         _browserId=id;
485         if (_browserId != null)
486             _bayeux.clientOnBrowser(_browserId,_id);
487     }
488 
489     /* ------------------------------------------------------------ */
490     public String getBrowserId()
491     {
492         return _browserId;
493     }
494 
495     /* ------------------------------------------------------------ */
496     @Override
497     public boolean equals(Object o)
498     {
499         if (!(o instanceof Client))
500             return false;
501         return getId().equals(((Client)o).getId());
502     }
503 
504     /* ------------------------------------------------------------ */
505     /**
506      * Get the advice specific for this Client
507      * 
508      * @return advice specific for this client or null
509      */
510     public JSON.Literal getAdvice()
511     {
512         return _advice;
513     }
514 
515     /* ------------------------------------------------------------ */
516     /**
517      * @param advice
518      *            specific for this client
519      */
520     public void setAdvice(JSON.Literal advice)
521     {
522         _advice=advice;
523     }
524 
525     /* ------------------------------------------------------------ */
526     public void addListener(ClientListener listener)
527     {
528         synchronized(this)
529         {
530             if (listener instanceof MessageListener)
531             {
532                 if (listener instanceof MessageListener.Synchronous)
533                     _syncMListeners=(MessageListener[])LazyList.addToArray(_syncMListeners,listener,MessageListener.class);
534                 else
535                     _asyncMListeners=(MessageListener[])LazyList.addToArray(_asyncMListeners,listener,MessageListener.class);
536             }
537 
538             if (listener instanceof RemoveListener)
539                 _rListeners=(RemoveListener[])LazyList.addToArray(_rListeners,listener,RemoveListener.class);
540 
541             if (listener instanceof QueueListener)
542                 _qListeners=(QueueListener[])LazyList.addToArray(_qListeners,listener,QueueListener.class);
543 
544             if (listener instanceof DeliverListener)
545                 _dListeners=(DeliverListener[])LazyList.addToArray(_dListeners,listener,DeliverListener.class);
546         }
547     }
548 
549     /* ------------------------------------------------------------ */
550     public void removeListener(ClientListener listener)
551     {
552         synchronized(this)
553         {
554             if (listener instanceof MessageListener)
555             {
556                 _syncMListeners=(MessageListener[])LazyList.removeFromArray(_syncMListeners,listener);
557                 _asyncMListeners=(MessageListener[])LazyList.removeFromArray(_asyncMListeners,listener);
558             }
559 
560             if (listener instanceof RemoveListener)
561                 _rListeners=(RemoveListener[])LazyList.removeFromArray(_rListeners,listener);
562 
563             if (listener instanceof QueueListener)
564                 _qListeners=(QueueListener[])LazyList.removeFromArray(_qListeners,listener);
565         }
566     }
567 
568     /* ------------------------------------------------------------ */
569     public long getInterval()
570     {
571         return _interval;
572     }
573 
574     /* ------------------------------------------------------------ */
575     /**
576      * Set per client interval
577      * 
578      * @param intervalMS
579      *            timeout in MS for longpoll duration or 0 to use default from
580      *            {@link AbstractBayeux#getMaxInterval()}.
581      */
582     public void setInterval(long intervalMS)
583     {
584         _interval=intervalMS;
585     }
586 
587     /* ------------------------------------------------------------ */
588     public long getTimeout()
589     {
590         return _timeout;
591     }
592 
593     /* ------------------------------------------------------------ */
594     /**
595      * Set per client timeout
596      * 
597      * @param timeoutMS
598      *            timeout in MS for longpoll duration or 0 to use default from
599      *            {@link AbstractBayeux#getTimeout()}.
600      */
601     public void setTimeout(long timeoutMS)
602     {
603         _timeout=timeoutMS;
604     }
605 
606     /* ------------------------------------------------------------ */
607     public void setMaxQueue(int maxQueue)
608     {
609         _maxQueue=maxQueue;
610     }
611 
612     /* ------------------------------------------------------------ */
613     public int getMaxQueue()
614     {
615         return _maxQueue;
616     }
617 
618     /* ------------------------------------------------------------ */
619     public Queue<Message> getQueue()
620     {
621         return _queue;
622     }
623 
624     /* ------------------------------------------------------------ */
625     /**
626      * @see org.mortbay.cometd.ext.TimesyncExtension
627      * @return The lag in ms as measured by an extension like the
628      *         TimesyncExtension
629      */
630     public int getLag()
631     {
632         return _lag;
633     }
634 
635     /* ------------------------------------------------------------ */
636     /**
637      * @see org.mortbay.cometd.ext.TimesyncExtension
638      * @param lag
639      *            in ms
640      */
641     public void setLag(int lag)
642     {
643         _lag=lag;
644     }
645 
646     /* ------------------------------------------------------------ */
647     /**
648      * Get the subscribed to channels
649      * 
650      * @return A copied array of the channels to which this client is subscribed
651      */
652     public Channel[] getSubscriptions()
653     {
654         ChannelImpl[] subscriptions=_subscriptions;
655         if (subscriptions == null)
656             return null;
657         Channel[] channels=new Channel[subscriptions.length];
658         System.arraycopy(subscriptions,0,channels,0,subscriptions.length);
659         return channels;
660     }
661 
662 }