1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.mortbay.cometd;
16
17 import java.util.ArrayList;
18 import java.util.List;
19 import java.util.Queue;
20
21 import org.cometd.Bayeux;
22 import org.cometd.Channel;
23 import org.cometd.Client;
24 import org.cometd.ClientListener;
25 import org.cometd.DeliverListener;
26 import org.cometd.Extension;
27 import org.cometd.Message;
28 import org.cometd.MessageListener;
29 import org.cometd.QueueListener;
30 import org.cometd.RemoveListener;
31 import org.mortbay.util.ArrayQueue;
32 import org.mortbay.util.LazyList;
33 import org.mortbay.util.ajax.JSON;
34
35
36
37
38
39
40 public class ClientImpl implements Client
41 {
42 private String _id;
43 private String _type;
44 private int _responsesPending;
45 private ChannelImpl[] _subscriptions=new ChannelImpl[0];
46 private RemoveListener[] _rListeners;
47 private MessageListener[] _syncMListeners;
48 private MessageListener[] _asyncMListeners;
49 private QueueListener[] _qListeners;
50 private DeliverListener[] _dListeners;
51 protected AbstractBayeux _bayeux;
52 private String _browserId;
53 private JSON.Literal _advice;
54 private int _batch;
55 private int _maxQueue;
56 private ArrayQueue<Message> _queue=new ArrayQueue<Message>(8,16,this);
57 private long _timeout;
58 private long _interval;
59 private int _lag;
60 private Extension[] _extensions;
61
62 private boolean _deliverViaMetaConnectOnly;
63 private volatile boolean _isExpired;
64
65
66 int _adviseVersion;
67
68
69 protected ClientImpl(AbstractBayeux bayeux)
70 {
71 _bayeux=bayeux;
72 _maxQueue=bayeux.getMaxClientQueue();
73 _bayeux.addClient(this,null);
74 if (_bayeux.isLogInfo())
75 _bayeux.logInfo("newClient: " + this);
76 }
77
78
79 protected ClientImpl(AbstractBayeux bayeux, String idPrefix)
80 {
81 _bayeux=bayeux;
82 _maxQueue=0;
83
84 _bayeux.addClient(this,idPrefix);
85
86 if (_bayeux.isLogInfo())
87 _bayeux.logInfo("newClient: " + this);
88 }
89
90
91 public void addExtension(Extension ext)
92 {
93 _extensions=(Extension[])LazyList.addToArray(_extensions,ext,Extension.class);
94 }
95
96
97 Extension[] getExtensions()
98 {
99 return _extensions;
100 }
101
102
103 public void deliver(Client from, String toChannel, Object data, String id)
104 {
105 MessageImpl message=_bayeux.newMessage();
106 message.put(Bayeux.CHANNEL_FIELD,toChannel);
107 message.put(Bayeux.DATA_FIELD,data);
108 if (id != null)
109 message.put(Bayeux.ID_FIELD,id);
110
111 Message m=_bayeux.extendSendBayeux(from,message);
112 if (m != null)
113 doDelivery(from,m);
114 if (m instanceof MessageImpl)
115 ((MessageImpl)m).decRef();
116 }
117
118
119 public void deliverLazy(Client from, String toChannel, Object data, String id)
120 {
121 MessageImpl message=_bayeux.newMessage();
122 message.put(Bayeux.CHANNEL_FIELD,toChannel);
123 message.put(Bayeux.DATA_FIELD,data);
124 if (id != null)
125 message.put(Bayeux.ID_FIELD,id);
126 message.setLazy(true);
127 Message m=_bayeux.extendSendBayeux(from,message);
128 if (m != null)
129 doDelivery(from,m);
130 if (m instanceof MessageImpl)
131 ((MessageImpl)m).decRef();
132 }
133
134
135 protected void doDelivery(Client from, final Message msg)
136 {
137 final Message message=_bayeux.extendSendClient(from,this,msg);
138 if (message == null)
139 return;
140
141 MessageListener[] alisteners=null;
142 synchronized(this)
143 {
144 if (_maxQueue < 0)
145 {
146
147 ((MessageImpl)message).incRef();
148 _queue.addUnsafe(message);
149 }
150 else
151 {
152
153 boolean queue;
154 if (_queue.size() >= _maxQueue)
155 {
156
157 if (_qListeners != null && _qListeners.length > 0)
158 {
159 queue=true;
160 for (QueueListener l : _qListeners)
161 queue&=l.queueMaxed(from,this,message);
162 }
163 else
164 queue=false;
165 }
166 else
167
168 queue=true;
169
170
171 if (queue)
172 {
173 ((MessageImpl)message).incRef();
174 _queue.addUnsafe(message);
175 }
176 }
177
178
179 if (_syncMListeners != null)
180 for (MessageListener l : _syncMListeners)
181 l.deliver(from,this,message);
182 alisteners=_asyncMListeners;
183
184 if (_batch == 0 && _responsesPending < 1 && _queue.size() > 0)
185 {
186 if (((MessageImpl)message).isLazy())
187 lazyResume();
188 else
189 resume();
190 }
191 }
192
193
194 if (alisteners != null)
195 for (MessageListener l : alisteners)
196 l.deliver(from,this,message);
197 }
198
199
200 public void doDeliverListeners()
201 {
202 synchronized(this)
203 {
204 if (_dListeners != null)
205 for (DeliverListener l : _dListeners)
206 l.deliver(this,_queue);
207 }
208 }
209
210
211 public void setMetaConnectDeliveryOnly(boolean deliverViaMetaConnectOnly)
212 {
213 _deliverViaMetaConnectOnly=deliverViaMetaConnectOnly;
214 }
215
216
217 public boolean isMetaConnectDeliveryOnly()
218 {
219 return _deliverViaMetaConnectOnly;
220 }
221
222
223 public void startBatch()
224 {
225 synchronized(this)
226 {
227 _batch++;
228 }
229 }
230
231
232 public void endBatch()
233 {
234 synchronized(this)
235 {
236 if (--_batch == 0 && _responsesPending < 1)
237 {
238 batch:switch(_queue.size())
239 {
240 case 0:
241 break;
242 case 1:
243 if (((MessageImpl)_queue.get(0)).isLazy())
244 lazyResume();
245 else
246 resume();
247 break;
248 default:
249 for (int i=_queue.size();i-->0;)
250 {
251 if (!((MessageImpl)_queue.get(i)).isLazy())
252 {
253 resume();
254 break batch;
255 }
256 }
257 lazyResume();
258 }
259 }
260 }
261 }
262
263
264 public String getConnectionType()
265 {
266 return _type;
267 }
268
269
270
271
272
273
274
275 public String getId()
276 {
277 return _id;
278 }
279
280
281 public boolean hasMessages()
282 {
283 return _queue.size() > 0;
284 }
285
286
287 public boolean hasNonLazyMessages()
288 {
289 synchronized(this)
290 {
291 for (int i=_queue.size(); i-- > 0;)
292 {
293 if (!((MessageImpl)_queue.getUnsafe(i)).isLazy())
294 return true;
295 }
296 }
297 return false;
298 }
299
300
301 public boolean isLocal()
302 {
303 return true;
304 }
305
306
307
308
309
310
311
312 public void disconnect()
313 {
314 synchronized(this)
315 {
316 if (_bayeux.hasClient(_id))
317 remove(false);
318 }
319 }
320
321
322
323
324
325
326
327 public void remove(boolean timeout)
328 {
329 _isExpired=timeout;
330 Client client=_bayeux.removeClient(_id);
331
332 if (client != null && _bayeux.isLogInfo())
333 _bayeux.logInfo("Remove client " + client + " timeout=" + timeout);
334
335 final String browser_id;
336 final RemoveListener[] listeners;
337 synchronized(this)
338 {
339 browser_id=_browserId;
340 _browserId=null;
341 listeners=_rListeners;
342 }
343
344 if (browser_id != null)
345 _bayeux.clientOffBrowser(browser_id,_id);
346 if (listeners != null)
347 for (RemoveListener l : listeners)
348 l.removed(_id,timeout);
349
350 resume();
351 }
352
353
354 public boolean isExpired()
355 {
356 return _isExpired;
357 }
358
359
360 public int responded()
361 {
362 synchronized(this)
363 {
364 return _responsesPending--;
365 }
366 }
367
368
369 public int responsePending()
370 {
371 synchronized(this)
372 {
373 return ++_responsesPending;
374 }
375 }
376
377
378
379
380
381 public void lazyResume()
382 {
383 }
384
385
386
387
388
389 public void resume()
390 {
391 }
392
393
394
395
396
397 public int getMessages()
398 {
399 return _queue.size();
400 }
401
402
403 public List<Message> takeMessages()
404 {
405 synchronized(this)
406 {
407 ArrayList<Message> list=new ArrayList<Message>(_queue);
408 _queue.clear();
409 return list;
410 }
411 }
412
413
414 public void returnMessages(List<Message> messages)
415 {
416 synchronized(this)
417 {
418 _queue.addAll(0,messages);
419 }
420 }
421
422
423 @Override
424 public String toString()
425 {
426 return _id;
427 }
428
429
430 protected void addSubscription(ChannelImpl channel)
431 {
432 synchronized(this)
433 {
434 _subscriptions=(ChannelImpl[])LazyList.addToArray(_subscriptions,channel,null);
435 }
436 }
437
438
439 protected void removeSubscription(ChannelImpl channel)
440 {
441 synchronized(this)
442 {
443 _subscriptions=(ChannelImpl[])LazyList.removeFromArray(_subscriptions,channel);
444 }
445 }
446
447
448 protected void setConnectionType(String type)
449 {
450 synchronized(this)
451 {
452 _type=type;
453 }
454 }
455
456
457 protected void setId(String id)
458 {
459 synchronized(this)
460 {
461 _id=id;
462 }
463 }
464
465
466 public void unsubscribeAll()
467 {
468 ChannelImpl[] subscriptions;
469 synchronized(this)
470 {
471 subscriptions=_subscriptions;
472 _subscriptions=new ChannelImpl[0];
473 }
474 for (ChannelImpl channel : subscriptions)
475 channel.unsubscribe(this);
476
477 }
478
479
480 public void setBrowserId(String id)
481 {
482 if (_browserId != null && !_browserId.equals(id))
483 _bayeux.clientOffBrowser(_browserId,_id);
484 _browserId=id;
485 if (_browserId != null)
486 _bayeux.clientOnBrowser(_browserId,_id);
487 }
488
489
490 public String getBrowserId()
491 {
492 return _browserId;
493 }
494
495
496 @Override
497 public boolean equals(Object o)
498 {
499 if (!(o instanceof Client))
500 return false;
501 return getId().equals(((Client)o).getId());
502 }
503
504
505
506
507
508
509
510 public JSON.Literal getAdvice()
511 {
512 return _advice;
513 }
514
515
516
517
518
519
520 public void setAdvice(JSON.Literal advice)
521 {
522 _advice=advice;
523 }
524
525
526 public void addListener(ClientListener listener)
527 {
528 synchronized(this)
529 {
530 if (listener instanceof MessageListener)
531 {
532 if (listener instanceof MessageListener.Synchronous)
533 _syncMListeners=(MessageListener[])LazyList.addToArray(_syncMListeners,listener,MessageListener.class);
534 else
535 _asyncMListeners=(MessageListener[])LazyList.addToArray(_asyncMListeners,listener,MessageListener.class);
536 }
537
538 if (listener instanceof RemoveListener)
539 _rListeners=(RemoveListener[])LazyList.addToArray(_rListeners,listener,RemoveListener.class);
540
541 if (listener instanceof QueueListener)
542 _qListeners=(QueueListener[])LazyList.addToArray(_qListeners,listener,QueueListener.class);
543
544 if (listener instanceof DeliverListener)
545 _dListeners=(DeliverListener[])LazyList.addToArray(_dListeners,listener,DeliverListener.class);
546 }
547 }
548
549
550 public void removeListener(ClientListener listener)
551 {
552 synchronized(this)
553 {
554 if (listener instanceof MessageListener)
555 {
556 _syncMListeners=(MessageListener[])LazyList.removeFromArray(_syncMListeners,listener);
557 _asyncMListeners=(MessageListener[])LazyList.removeFromArray(_asyncMListeners,listener);
558 }
559
560 if (listener instanceof RemoveListener)
561 _rListeners=(RemoveListener[])LazyList.removeFromArray(_rListeners,listener);
562
563 if (listener instanceof QueueListener)
564 _qListeners=(QueueListener[])LazyList.removeFromArray(_qListeners,listener);
565 }
566 }
567
568
569 public long getInterval()
570 {
571 return _interval;
572 }
573
574
575
576
577
578
579
580
581
582 public void setInterval(long intervalMS)
583 {
584 _interval=intervalMS;
585 }
586
587
588 public long getTimeout()
589 {
590 return _timeout;
591 }
592
593
594
595
596
597
598
599
600
601 public void setTimeout(long timeoutMS)
602 {
603 _timeout=timeoutMS;
604 }
605
606
607 public void setMaxQueue(int maxQueue)
608 {
609 _maxQueue=maxQueue;
610 }
611
612
613 public int getMaxQueue()
614 {
615 return _maxQueue;
616 }
617
618
619 public Queue<Message> getQueue()
620 {
621 return _queue;
622 }
623
624
625
626
627
628
629
630 public int getLag()
631 {
632 return _lag;
633 }
634
635
636
637
638
639
640
641 public void setLag(int lag)
642 {
643 _lag=lag;
644 }
645
646
647
648
649
650
651
652 public Channel[] getSubscriptions()
653 {
654 ChannelImpl[] subscriptions=_subscriptions;
655 if (subscriptions == null)
656 return null;
657 Channel[] channels=new Channel[subscriptions.length];
658 System.arraycopy(subscriptions,0,channels,0,subscriptions.length);
659 return channels;
660 }
661
662 }