1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.mortbay.cometd;
16
17 import java.util.ArrayList;
18 import java.util.List;
19 import java.util.Queue;
20
21 import org.cometd.Bayeux;
22 import org.cometd.Channel;
23 import org.cometd.Client;
24 import org.cometd.ClientListener;
25 import org.cometd.DeliverListener;
26 import org.cometd.Extension;
27 import org.cometd.Message;
28 import org.cometd.MessageListener;
29 import org.cometd.QueueListener;
30 import org.cometd.RemoveListener;
31 import org.mortbay.log.Log;
32 import org.mortbay.util.ArrayQueue;
33 import org.mortbay.util.LazyList;
34 import org.mortbay.util.ajax.JSON;
35
36
37
38
39
40
41 public class ClientImpl implements Client
42 {
43 private String _id;
44 private String _type;
45 private int _responsesPending;
46 private ChannelImpl[] _subscriptions=new ChannelImpl[0];
47 private RemoveListener[] _rListeners;
48 private MessageListener[] _syncMListeners;
49 private MessageListener[] _asyncMListeners;
50 private QueueListener[] _qListeners;
51 private DeliverListener[] _dListeners;
52 protected AbstractBayeux _bayeux;
53 private String _browserId;
54 private JSON.Literal _advice;
55 private int _batch;
56 private int _maxQueue;
57 private ArrayQueue<Message> _queue=new ArrayQueue<Message>(8,16,this);
58 private long _timeout;
59 private long _interval;
60 private int _lag;
61 private Extension[] _extensions;
62
63 private boolean _deliverViaMetaConnectOnly;
64 private volatile boolean _isExpired;
65
66
67 int _adviseVersion;
68
69
70 protected ClientImpl(AbstractBayeux bayeux)
71 {
72 _bayeux=bayeux;
73 _maxQueue=bayeux.getMaxClientQueue();
74 _bayeux.addClient(this,null);
75 if (_bayeux.isLogInfo())
76 _bayeux.logInfo("newClient: " + this);
77 }
78
79
80 protected ClientImpl(AbstractBayeux bayeux, String idPrefix)
81 {
82 _bayeux=bayeux;
83 _maxQueue=0;
84
85 _bayeux.addClient(this,idPrefix);
86
87 if (_bayeux.isLogInfo())
88 _bayeux.logInfo("newClient: " + this);
89 }
90
91
92 public void addExtension(Extension ext)
93 {
94 _extensions=(Extension[])LazyList.addToArray(_extensions,ext,Extension.class);
95 }
96
97
98 Extension[] getExtensions()
99 {
100 return _extensions;
101 }
102
103
104 public void deliver(Client from, String toChannel, Object data, String id)
105 {
106 MessageImpl message=_bayeux.newMessage();
107 message.put(Bayeux.CHANNEL_FIELD,toChannel);
108 message.put(Bayeux.DATA_FIELD,data);
109 if (id != null)
110 message.put(Bayeux.ID_FIELD,id);
111
112 Message m=_bayeux.extendSendBayeux(from,message);
113 if (m != null)
114 doDelivery(from,m);
115 if (m instanceof MessageImpl)
116 ((MessageImpl)m).decRef();
117 }
118
119
120 public void deliverLazy(Client from, String toChannel, Object data, String id)
121 {
122 MessageImpl message=_bayeux.newMessage();
123 message.put(Bayeux.CHANNEL_FIELD,toChannel);
124 message.put(Bayeux.DATA_FIELD,data);
125 if (id != null)
126 message.put(Bayeux.ID_FIELD,id);
127 message.setLazy(true);
128 Message m=_bayeux.extendSendBayeux(from,message);
129 if (m != null)
130 doDelivery(from,m);
131 if (m instanceof MessageImpl)
132 ((MessageImpl)m).decRef();
133 }
134
135
136 protected void doDelivery(Client from, final Message msg)
137 {
138 final Message message=_bayeux.extendSendClient(from,this,msg);
139 if (message == null)
140 return;
141
142 MessageListener[] alisteners=null;
143 synchronized(this)
144 {
145 if (_maxQueue < 0)
146 {
147
148 ((MessageImpl)message).incRef();
149 _queue.addUnsafe(message);
150 }
151 else
152 {
153
154 boolean queue;
155 if (_queue.size() >= _maxQueue)
156 {
157
158 if (_qListeners != null && _qListeners.length > 0)
159 {
160 queue=true;
161 for (QueueListener l : _qListeners)
162 queue &= notifyQueueListener(l, from, message);
163 }
164 else
165 queue=false;
166 }
167 else
168
169 queue=true;
170
171
172 if (queue)
173 {
174 ((MessageImpl)message).incRef();
175 _queue.addUnsafe(message);
176 }
177 }
178
179
180 if (_syncMListeners != null)
181 for (MessageListener l : _syncMListeners)
182 notifyMessageListener(l, from, message);
183 alisteners=_asyncMListeners;
184
185 if (_batch == 0 && _responsesPending < 1 && _queue.size() > 0)
186 {
187 if (((MessageImpl)message).isLazy())
188 lazyResume();
189 else
190 resume();
191 }
192 }
193
194
195 if (alisteners != null)
196 for (MessageListener l : alisteners)
197 notifyMessageListener(l, from, message);
198 }
199
200 private boolean notifyQueueListener(QueueListener listener, Client from, Message message)
201 {
202 try
203 {
204 return listener.queueMaxed(from, this, message);
205 }
206 catch (Throwable x)
207 {
208 Log.debug(x);
209 return false;
210 }
211 }
212
213 private void notifyMessageListener(MessageListener listener, Client from, Message message)
214 {
215 try
216 {
217 listener.deliver(from, this, message);
218 }
219 catch (Throwable x)
220 {
221 Log.debug(x);
222 }
223 }
224
225
226 public void doDeliverListeners()
227 {
228 synchronized(this)
229 {
230 if (_dListeners != null)
231 for (DeliverListener l : _dListeners)
232 notifyDeliverListener(l, _queue);
233 }
234 }
235
236 private void notifyDeliverListener(DeliverListener listener, Queue<Message> queue)
237 {
238 try
239 {
240 listener.deliver(this, queue);
241 }
242 catch (Throwable x)
243 {
244 Log.debug(x);
245 }
246 }
247
248
249 public void setMetaConnectDeliveryOnly(boolean deliverViaMetaConnectOnly)
250 {
251 _deliverViaMetaConnectOnly=deliverViaMetaConnectOnly;
252 }
253
254
255 public boolean isMetaConnectDeliveryOnly()
256 {
257 return _deliverViaMetaConnectOnly;
258 }
259
260
261 public void startBatch()
262 {
263 synchronized(this)
264 {
265 _batch++;
266 }
267 }
268
269
270 public void endBatch()
271 {
272 synchronized(this)
273 {
274 if (--_batch == 0 && _responsesPending < 1)
275 {
276 batch:switch(_queue.size())
277 {
278 case 0:
279 break;
280 case 1:
281 if (((MessageImpl)_queue.get(0)).isLazy())
282 lazyResume();
283 else
284 resume();
285 break;
286 default:
287 for (int i=_queue.size();i-->0;)
288 {
289 if (!((MessageImpl)_queue.get(i)).isLazy())
290 {
291 resume();
292 break batch;
293 }
294 }
295 lazyResume();
296 }
297 }
298 }
299 }
300
301
302 public String getConnectionType()
303 {
304 return _type;
305 }
306
307
308
309
310
311
312
313 public String getId()
314 {
315 return _id;
316 }
317
318
319 public boolean hasMessages()
320 {
321 return _queue.size() > 0;
322 }
323
324
325 public boolean hasNonLazyMessages()
326 {
327 synchronized(this)
328 {
329 for (int i=_queue.size(); i-- > 0;)
330 {
331 if (!((MessageImpl)_queue.getUnsafe(i)).isLazy())
332 return true;
333 }
334 }
335 return false;
336 }
337
338
339 public boolean isLocal()
340 {
341 return true;
342 }
343
344
345
346
347
348
349
350 public void disconnect()
351 {
352 synchronized(this)
353 {
354 if (_bayeux.hasClient(_id))
355 remove(false);
356 }
357 }
358
359
360
361
362
363
364
365 public void remove(boolean timeout)
366 {
367 _isExpired=timeout;
368 Client client=_bayeux.removeClient(_id);
369
370 if (client != null && _bayeux.isLogInfo())
371 _bayeux.logInfo("Remove client " + client + " timeout=" + timeout);
372
373 final String browser_id;
374 final RemoveListener[] listeners;
375 synchronized(this)
376 {
377 browser_id=_browserId;
378 _browserId=null;
379 listeners=_rListeners;
380 }
381
382 if (browser_id != null)
383 _bayeux.clientOffBrowser(browser_id,_id);
384 if (listeners != null)
385 for (RemoveListener l : listeners)
386 notifyRemoveListener(l, _id, timeout);
387
388 resume();
389 }
390
391 private void notifyRemoveListener(RemoveListener listener, String clientId, boolean timeout)
392 {
393 try
394 {
395 listener.removed(clientId, timeout);
396 }
397 catch (Throwable x)
398 {
399 Log.debug(x);
400 }
401 }
402
403
404 public boolean isExpired()
405 {
406 return _isExpired;
407 }
408
409
410 public int responded()
411 {
412 synchronized(this)
413 {
414 return _responsesPending--;
415 }
416 }
417
418
419 public int responsePending()
420 {
421 synchronized(this)
422 {
423 return ++_responsesPending;
424 }
425 }
426
427
428
429
430
431 public void lazyResume()
432 {
433 }
434
435
436
437
438
439 public void resume()
440 {
441 }
442
443
444
445
446
447 public int getMessages()
448 {
449 return _queue.size();
450 }
451
452
453 public List<Message> takeMessages()
454 {
455 synchronized(this)
456 {
457 ArrayList<Message> list=new ArrayList<Message>(_queue);
458 _queue.clear();
459 return list;
460 }
461 }
462
463
464 public void returnMessages(List<Message> messages)
465 {
466 synchronized(this)
467 {
468 _queue.addAll(0,messages);
469 }
470 }
471
472
473 @Override
474 public String toString()
475 {
476 return _id;
477 }
478
479
480 protected void addSubscription(ChannelImpl channel)
481 {
482 synchronized(this)
483 {
484 _subscriptions=(ChannelImpl[])LazyList.addToArray(_subscriptions,channel,null);
485 }
486 }
487
488
489 protected void removeSubscription(ChannelImpl channel)
490 {
491 synchronized(this)
492 {
493 _subscriptions=(ChannelImpl[])LazyList.removeFromArray(_subscriptions,channel);
494 }
495 }
496
497
498 protected void setConnectionType(String type)
499 {
500 synchronized(this)
501 {
502 _type=type;
503 }
504 }
505
506
507 protected void setId(String id)
508 {
509 synchronized(this)
510 {
511 _id=id;
512 }
513 }
514
515
516 public void unsubscribeAll()
517 {
518 ChannelImpl[] subscriptions;
519 synchronized(this)
520 {
521 subscriptions=_subscriptions;
522 _subscriptions=new ChannelImpl[0];
523 }
524 for (ChannelImpl channel : subscriptions)
525 channel.unsubscribe(this);
526
527 }
528
529
530 public void setBrowserId(String id)
531 {
532 if (_browserId != null && !_browserId.equals(id))
533 _bayeux.clientOffBrowser(_browserId,_id);
534 _browserId=id;
535 if (_browserId != null)
536 _bayeux.clientOnBrowser(_browserId,_id);
537 }
538
539
540 public String getBrowserId()
541 {
542 return _browserId;
543 }
544
545
546 @Override
547 public boolean equals(Object o)
548 {
549 if (!(o instanceof Client))
550 return false;
551 return getId().equals(((Client)o).getId());
552 }
553
554
555
556
557
558
559
560 public JSON.Literal getAdvice()
561 {
562 return _advice;
563 }
564
565
566
567
568
569
570 public void setAdvice(JSON.Literal advice)
571 {
572 _advice=advice;
573 }
574
575
576 public void addListener(ClientListener listener)
577 {
578 synchronized(this)
579 {
580 if (listener instanceof MessageListener)
581 {
582 if (listener instanceof MessageListener.Synchronous)
583 _syncMListeners=(MessageListener[])LazyList.addToArray(_syncMListeners,listener,MessageListener.class);
584 else
585 _asyncMListeners=(MessageListener[])LazyList.addToArray(_asyncMListeners,listener,MessageListener.class);
586 }
587
588 if (listener instanceof RemoveListener)
589 _rListeners=(RemoveListener[])LazyList.addToArray(_rListeners,listener,RemoveListener.class);
590
591 if (listener instanceof QueueListener)
592 _qListeners=(QueueListener[])LazyList.addToArray(_qListeners,listener,QueueListener.class);
593
594 if (listener instanceof DeliverListener)
595 _dListeners=(DeliverListener[])LazyList.addToArray(_dListeners,listener,DeliverListener.class);
596 }
597 }
598
599
600 public void removeListener(ClientListener listener)
601 {
602 synchronized(this)
603 {
604 if (listener instanceof MessageListener)
605 {
606 _syncMListeners=(MessageListener[])LazyList.removeFromArray(_syncMListeners,listener);
607 _asyncMListeners=(MessageListener[])LazyList.removeFromArray(_asyncMListeners,listener);
608 }
609
610 if (listener instanceof RemoveListener)
611 _rListeners=(RemoveListener[])LazyList.removeFromArray(_rListeners,listener);
612
613 if (listener instanceof QueueListener)
614 _qListeners=(QueueListener[])LazyList.removeFromArray(_qListeners,listener);
615 }
616 }
617
618
619 public long getInterval()
620 {
621 return _interval;
622 }
623
624
625
626
627
628
629
630
631
632 public void setInterval(long intervalMS)
633 {
634 _interval=intervalMS;
635 }
636
637
638 public long getTimeout()
639 {
640 return _timeout;
641 }
642
643
644
645
646
647
648
649
650
651 public void setTimeout(long timeoutMS)
652 {
653 _timeout=timeoutMS;
654 }
655
656
657 public void setMaxQueue(int maxQueue)
658 {
659 _maxQueue=maxQueue;
660 }
661
662
663 public int getMaxQueue()
664 {
665 return _maxQueue;
666 }
667
668
669 public Queue<Message> getQueue()
670 {
671 return _queue;
672 }
673
674
675
676
677
678
679
680 public int getLag()
681 {
682 return _lag;
683 }
684
685
686
687
688
689
690
691 public void setLag(int lag)
692 {
693 _lag=lag;
694 }
695
696
697
698
699
700
701
702 public Channel[] getSubscriptions()
703 {
704 ChannelImpl[] subscriptions=_subscriptions;
705 if (subscriptions == null)
706 return null;
707 Channel[] channels=new Channel[subscriptions.length];
708 System.arraycopy(subscriptions,0,channels,0,subscriptions.length);
709 return channels;
710 }
711
712 }