1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.mortbay.cometd.client;
16
17 import java.io.IOException;
18 import java.net.URLEncoder;
19 import java.text.ParseException;
20 import java.text.SimpleDateFormat;
21 import java.util.ArrayList;
22 import java.util.Date;
23 import java.util.LinkedList;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Queue;
27 import java.util.Timer;
28 import java.util.TimerTask;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.TimeUnit;
31 import javax.servlet.http.Cookie;
32
33 import org.cometd.Bayeux;
34 import org.cometd.Client;
35 import org.cometd.ClientListener;
36 import org.cometd.Extension;
37 import org.cometd.Message;
38 import org.cometd.MessageListener;
39 import org.cometd.RemoveListener;
40 import org.mortbay.cometd.MessageImpl;
41 import org.mortbay.cometd.MessagePool;
42 import org.mortbay.component.AbstractLifeCycle;
43 import org.mortbay.io.Buffer;
44 import org.mortbay.io.ByteArrayBuffer;
45 import org.mortbay.jetty.HttpHeaders;
46 import org.mortbay.jetty.HttpSchemes;
47 import org.mortbay.jetty.HttpURI;
48 import org.mortbay.jetty.client.Address;
49 import org.mortbay.jetty.client.ContentExchange;
50 import org.mortbay.jetty.client.HttpClient;
51 import org.mortbay.jetty.client.HttpExchange;
52 import org.mortbay.log.Log;
53 import org.mortbay.util.ArrayQueue;
54 import org.mortbay.util.LazyList;
55 import org.mortbay.util.QuotedStringTokenizer;
56 import org.mortbay.util.ajax.JSON;
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71 public class BayeuxClient extends AbstractLifeCycle implements Client
72 {
73 private final static String __TIMER="org.mortbay.cometd.client.Timer";
74 private final static String __JSON="org.mortbay.cometd.client.JSON";
75 private final static String __MSGPOOL="org.mortbay.cometd.MessagePool";
76 protected HttpClient _httpClient;
77
78 protected MessagePool _msgPool;
79 private ArrayQueue<Message> _inQ = new ArrayQueue<Message>();
80 private ArrayQueue<Message> _outQ = new ArrayQueue<Message>();
81 protected Address _cometdAddress;
82 private Exchange _pull;
83 private Exchange _push;
84 private String _path = "/cometd";
85 private boolean _initialized = false;
86 private boolean _disconnecting = false;
87 private boolean _handshook = false;
88 private String _clientId;
89 private org.cometd.Listener _listener;
90 private List<RemoveListener> _rListeners;
91 private List<MessageListener> _mListeners;
92 private int _batch;
93 private boolean _formEncoded;
94 private Map<String, ExpirableCookie> _cookies = new ConcurrentHashMap<String, ExpirableCookie>();
95 private Advice _advice;
96 private Timer _timer;
97 private int _backoffInterval = 0;
98 private int _backoffIncrement = 1000;
99 private int _backoffMaxInterval = 60000;
100 private Buffer _scheme;
101 protected Extension[] _extensions;
102 protected JSON _jsonOut;
103
104
105 public BayeuxClient(HttpClient client, String url)
106 {
107 this(client,url,null);
108 }
109
110
111 public BayeuxClient(HttpClient client, String url, Timer timer)
112 {
113 HttpURI uri = new HttpURI(url);
114 _httpClient = client;
115 _cometdAddress = new Address(uri.getHost(),uri.getPort());
116 _path=uri.getPath();
117 _timer = timer;
118 _scheme = (HttpSchemes.HTTPS.equals(uri.getScheme()))?HttpSchemes.HTTPS_BUFFER:HttpSchemes.HTTP_BUFFER;
119 }
120
121
122 public BayeuxClient(HttpClient client, Address address, String path, Timer timer)
123 {
124 _httpClient = client;
125 _cometdAddress = address;
126 _path = path;
127 _timer = timer;
128 }
129
130
131 public BayeuxClient(HttpClient client, Address address, String uri)
132 {
133 this(client,address,uri,null);
134 }
135
136
137 public void addExtension(Extension ext)
138 {
139 _extensions = (Extension[])LazyList.addToArray(_extensions,ext,Extension.class);
140 }
141
142
143 Extension[] getExtensions()
144 {
145 return _extensions;
146 }
147
148
149
150
151
152
153
154
155 public void setBackOffInterval(int interval)
156 {
157 _backoffInterval = interval;
158 }
159
160
161
162
163
164
165 public int getBackoffInterval()
166 {
167 return _backoffInterval;
168 }
169
170
171
172
173
174
175 public void setBackoffMaxRetries(int retries)
176 {
177 }
178
179
180
181
182
183 public int getBackoffMaxRetries()
184 {
185 return -1;
186 }
187
188
189
190
191
192
193
194 public void setBackoffIncrement(int interval)
195 {
196 _backoffIncrement = interval;
197 }
198
199
200
201
202
203
204 public int getBackoffIncrement()
205 {
206 return _backoffIncrement;
207 }
208
209
210 public void setBackoffMaxInterval(int interval)
211 {
212 _backoffMaxInterval = interval;
213 }
214
215 public int getBackoffMaxInterval()
216 {
217 return _backoffMaxInterval;
218 }
219
220
221
222
223
224
225
226 public String getId()
227 {
228 return _clientId;
229 }
230
231
232 protected void doStart() throws Exception
233 {
234 if (!_httpClient.isStarted())
235 throw new IllegalStateException("!HttpClient.isStarted()");
236
237 synchronized (_httpClient)
238 {
239 if (_jsonOut == null)
240 {
241 _jsonOut = (JSON)_httpClient.getAttribute(__JSON);
242 if (_jsonOut==null)
243 {
244 _jsonOut = new JSON();
245 _httpClient.setAttribute(__JSON,_jsonOut);
246 }
247 }
248
249 if (_timer == null)
250 {
251 _timer = (Timer)_httpClient.getAttribute(__TIMER);
252 if (_timer==null)
253 {
254 _timer = new Timer(__TIMER+"@"+hashCode(),true);
255 _httpClient.setAttribute(__TIMER,_timer);
256 }
257 }
258
259 if (_msgPool == null)
260 {
261 _msgPool = (MessagePool)_httpClient.getAttribute(__MSGPOOL);
262 if (_msgPool==null)
263 {
264 _msgPool = new MessagePool();
265 _httpClient.setAttribute(__MSGPOOL,_msgPool);
266 }
267 }
268 }
269 _disconnecting=false;
270 _pull=null;
271 _push=null;
272 super.doStart();
273 synchronized (_outQ)
274 {
275 if (!_initialized && _pull == null)
276 {
277 _pull = new Handshake();
278 send((Exchange)_pull,false);
279 }
280 }
281 }
282
283
284 protected void doStop() throws Exception
285 {
286 if (!_disconnecting)
287 disconnect();
288 super.doStop();
289 }
290
291
292 public boolean isPolling()
293 {
294 synchronized (_outQ)
295 {
296 return isRunning() && (_pull != null);
297 }
298 }
299
300
301
302
303
304 public void deliver(Client from, Message message)
305 {
306 if (!isRunning())
307 throw new IllegalStateException("Not running");
308
309 synchronized (_inQ)
310 {
311 if (_mListeners == null)
312 _inQ.add(message);
313 else
314 {
315 for (MessageListener l : _mListeners)
316 notifyMessageListener(l, from, message);
317 }
318 }
319 }
320
321 private void notifyMessageListener(MessageListener listener, Client from, Message message)
322 {
323 try
324 {
325 listener.deliver(from, this, message);
326 }
327 catch (Throwable x)
328 {
329 Log.debug(x);
330 }
331 }
332
333
334
335
336
337
338
339
340 public void deliver(Client from, String toChannel, Object data, String id)
341 {
342 if (!isRunning())
343 throw new IllegalStateException("Not running");
344
345 MessageImpl message = _msgPool.newMessage();
346
347 message.put(Bayeux.CHANNEL_FIELD,toChannel);
348 message.put(Bayeux.DATA_FIELD,data);
349 if (id != null)
350 message.put(Bayeux.ID_FIELD,id);
351
352 synchronized (_inQ)
353 {
354 if (_mListeners == null)
355 {
356 message.incRef();
357 _inQ.add(message);
358 }
359 else
360 {
361 for (MessageListener l : _mListeners)
362 if (l instanceof MessageListener.Synchronous)
363 notifyMessageListener(l, from, message);
364 }
365 }
366
367 if (_mListeners !=null)
368 for (MessageListener l : _mListeners)
369 if (!(l instanceof MessageListener.Synchronous))
370 notifyMessageListener(l, from, message);
371
372 message.decRef();
373 }
374
375
376
377
378
379 public org.cometd.Listener getListener()
380 {
381 synchronized (_inQ)
382 {
383 return _listener;
384 }
385 }
386
387
388
389
390
391
392
393 public boolean hasMessages()
394 {
395 synchronized (_inQ)
396 {
397 return _inQ.size() > 0;
398 }
399 }
400
401
402
403
404
405
406
407 public boolean isLocal()
408 {
409 return false;
410 }
411
412
413
414
415
416
417
418 private void publish(MessageImpl msg)
419 {
420 msg.incRef();
421 synchronized (_outQ)
422 {
423 _outQ.add(msg);
424
425 if (_batch == 0 && _initialized && _push == null)
426 {
427 _push = new Publish();
428 try
429 {
430 send(_push);
431 }
432 catch (IOException e)
433 {
434 metaPublishFail(e,((Publish)_push).getOutboundMessages());
435 }
436 catch (IllegalStateException e)
437 {
438 metaPublishFail(e,((Publish)_push).getOutboundMessages());
439 }
440 }
441 }
442 }
443
444
445
446
447
448
449
450
451 public void publish(String toChannel, Object data, String msgId)
452 {
453 if (!isRunning() || _disconnecting)
454 throw new IllegalStateException("Not running");
455
456 MessageImpl msg = _msgPool.newMessage();
457 msg.put(Bayeux.CHANNEL_FIELD,toChannel);
458 msg.put(Bayeux.DATA_FIELD,data);
459 if (msgId != null)
460 msg.put(Bayeux.ID_FIELD,msgId);
461 publish(msg);
462 msg.decRef();
463 }
464
465
466
467
468
469
470
471 public void subscribe(String toChannel)
472 {
473 if (!isRunning() || _disconnecting)
474 throw new IllegalStateException("Not running");
475
476 MessageImpl msg = _msgPool.newMessage();
477 msg.put(Bayeux.CHANNEL_FIELD,Bayeux.META_SUBSCRIBE);
478 msg.put(Bayeux.SUBSCRIPTION_FIELD,toChannel);
479 publish(msg);
480 msg.decRef();
481 }
482
483
484
485
486
487
488
489 public void unsubscribe(String toChannel)
490 {
491 if (!isRunning() || _disconnecting)
492 throw new IllegalStateException("Not running");
493
494 MessageImpl msg = _msgPool.newMessage();
495 msg.put(Bayeux.CHANNEL_FIELD,Bayeux.META_UNSUBSCRIBE);
496 msg.put(Bayeux.SUBSCRIPTION_FIELD,toChannel);
497 publish(msg);
498 msg.decRef();
499 }
500
501
502
503
504
505
506 public void remove()
507 {
508 disconnect();
509 }
510
511
512
513
514
515 public void disconnect()
516 {
517 if (isStopped() || _disconnecting)
518 throw new IllegalStateException("Not running");
519
520 MessageImpl msg = _msgPool.newMessage();
521 msg.put(Bayeux.CHANNEL_FIELD,Bayeux.META_DISCONNECT);
522
523 synchronized (_outQ)
524 {
525 _outQ.add(msg);
526 _disconnecting = true;
527 if (_batch == 0 && _initialized && _push == null)
528 {
529 _push = new Publish();
530 try
531 {
532 send(_push);
533 }
534 catch (IOException e)
535 {
536 Log.warn(e.toString());
537 Log.debug(e);
538 send(_push,true);
539 }
540 }
541 _initialized = false;
542 }
543 }
544
545
546
547
548
549 public void setListener(org.cometd.Listener listener)
550 {
551 synchronized (_inQ)
552 {
553 if (_listener != null)
554 removeListener(_listener);
555 _listener = listener;
556 if (_listener != null)
557 addListener(_listener);
558 }
559 }
560
561
562
563
564
565
566
567
568 public List<Message> takeMessages()
569 {
570 final LinkedList<Message> list;
571 synchronized (_inQ)
572 {
573 list = new LinkedList<Message>(_inQ);
574 _inQ.clear();
575 }
576 for (Message m : list)
577 if (m instanceof MessageImpl)
578 ((MessageImpl)m).decRef();
579 return list;
580 }
581
582
583
584
585
586
587
588 public void endBatch()
589 {
590 synchronized (_outQ)
591 {
592 if (--_batch <= 0)
593 {
594 _batch = 0;
595 if ((_initialized || _disconnecting) && _push == null && _outQ.size() > 0)
596 {
597 _push = new Publish();
598 try
599 {
600 send(_push);
601 }
602 catch (IOException e)
603 {
604 metaPublishFail(e,((Publish)_push).getOutboundMessages());
605 }
606 }
607 }
608 }
609 }
610
611
612
613
614
615
616
617 public void startBatch()
618 {
619 if (isStopped())
620 throw new IllegalStateException("Not running");
621
622 synchronized (_outQ)
623 {
624 _batch++;
625 }
626 }
627
628
629
630
631
632
633
634 protected void customize(HttpExchange exchange)
635 {
636 StringBuilder builder = null;
637 for (String cookieName : _cookies.keySet())
638 {
639 if (builder == null)
640 builder = new StringBuilder();
641 else
642 builder.append("; ");
643
644
645 Cookie cookie = getCookie(cookieName);
646 if (cookie != null)
647 {
648 builder.append(cookie.getName());
649 builder.append("=");
650 builder.append(cookie.getValue());
651 }
652 }
653
654 if (builder != null)
655 exchange.setRequestHeader(HttpHeaders.COOKIE,builder.toString());
656
657 if (_scheme!=null)
658 exchange.setScheme(_scheme);
659 }
660
661
662 public void setCookie(Cookie cookie)
663 {
664 long expirationTime = System.currentTimeMillis();
665 int maxAge = cookie.getMaxAge();
666 if (maxAge < 0)
667 expirationTime = -1L;
668 else
669 expirationTime += maxAge * 1000;
670
671 ExpirableCookie expirableCookie = new ExpirableCookie(cookie, expirationTime);
672 _cookies.put(cookie.getName(), expirableCookie);
673 }
674
675 public Cookie getCookie(String name)
676 {
677 ExpirableCookie cookie = _cookies.get(name);
678 if (cookie != null)
679 {
680 if (cookie.isExpired())
681 {
682 _cookies.remove(name);
683 cookie = null;
684 }
685 }
686 return cookie == null ? null : cookie.cookie;
687 }
688
689
690
691
692
693
694
695 protected class Exchange extends ContentExchange
696 {
697 Message[] _responses;
698 int _connectFailures;
699 int _backoff = _backoffInterval;
700 String _json;
701
702
703 Exchange(String info)
704 {
705 setMethod("POST");
706 setScheme(HttpSchemes.HTTP_BUFFER);
707 setAddress(_cometdAddress);
708 setURI(_path + "/" + info);
709 setRequestContentType(_formEncoded?"application/x-www-form-urlencoded;charset=utf-8":"text/json;charset=utf-8");
710 }
711
712
713 public int getBackoff()
714 {
715 return _backoff;
716 }
717
718
719 public void incBackoff()
720 {
721 _backoff = Math.min(_backoff+_backoffIncrement,_backoffMaxInterval);
722 }
723
724
725 protected void setMessage(String message)
726 {
727 message=extendOut(message);
728 setJson(message);
729 }
730
731
732 protected void setJson(String json)
733 {
734 try
735 {
736 _json = json;
737
738 if (_formEncoded)
739 setRequestContent(new ByteArrayBuffer("message=" + URLEncoder.encode(_json,"utf-8")));
740 else
741 setRequestContent(new ByteArrayBuffer(_json,"utf-8"));
742 }
743 catch (Exception e)
744 {
745 Log.ignore(e);
746 setRequestContent(new ByteArrayBuffer(_json));
747 }
748 }
749
750
751 protected void onResponseStatus(Buffer version, int status, Buffer reason) throws IOException
752 {
753 super.onResponseStatus(version,status,reason);
754 }
755
756
757 protected void onResponseHeader(Buffer name, Buffer value) throws IOException
758 {
759 super.onResponseHeader(name,value);
760 if (!isRunning())
761 return;
762
763 if (HttpHeaders.CACHE.getOrdinal(name) == HttpHeaders.SET_COOKIE_ORDINAL)
764 {
765 String cname = null;
766 String cvalue = null;
767
768 QuotedStringTokenizer tok = new QuotedStringTokenizer(value.toString(),"=;",false,false);
769 tok.setSingle(false);
770
771 if (tok.hasMoreElements())
772 cname = tok.nextToken();
773 if (tok.hasMoreElements())
774 cvalue = tok.nextToken();
775
776 Cookie cookie = new Cookie(cname,cvalue);
777
778 while (tok.hasMoreTokens())
779 {
780 String token = tok.nextToken();
781 if ("Version".equalsIgnoreCase(token))
782 cookie.setVersion(Integer.parseInt(tok.nextToken()));
783 else if ("Comment".equalsIgnoreCase(token))
784 cookie.setComment(tok.nextToken());
785 else if ("Path".equalsIgnoreCase(token))
786 cookie.setPath(tok.nextToken());
787 else if ("Domain".equalsIgnoreCase(token))
788 cookie.setDomain(tok.nextToken());
789 else if ("Expires".equalsIgnoreCase(token))
790 {
791 try
792 {
793 Date date = new SimpleDateFormat("EEE, dd-MMM-yy HH:mm:ss 'GMT'").parse(tok.nextToken());
794 Long maxAge = TimeUnit.MILLISECONDS.toSeconds(date.getTime() - System.currentTimeMillis());
795 cookie.setMaxAge(maxAge > 0 ? maxAge.intValue() : 0);
796 }
797 catch (ParseException ignored)
798 {
799 }
800 }
801 else if ("Max-Age".equalsIgnoreCase(token))
802 {
803 try
804 {
805 int maxAge = Integer.parseInt(tok.nextToken());
806 cookie.setMaxAge(maxAge);
807 }
808 catch (NumberFormatException ignored)
809 {
810 }
811 }
812 else if ("Secure".equalsIgnoreCase(token))
813 cookie.setSecure(true);
814 }
815
816 BayeuxClient.this.setCookie(cookie);
817 }
818 }
819
820
821 protected void onResponseComplete() throws IOException
822 {
823 if (!isRunning())
824 return;
825
826 super.onResponseComplete();
827
828 if (getResponseStatus() == 200)
829 {
830 String content = getResponseContent();
831
832 if (content == null || content.length() == 0)
833 throw new IllegalStateException();
834 _responses = _msgPool.parse(content);
835
836 if (_responses!=null)
837 for (int i=0;i<_responses.length;i++)
838 extendIn(_responses[i]);
839 }
840 }
841
842
843 protected void resend(boolean backoff)
844 {
845 if (!isRunning())
846 return;
847
848 final boolean disconnecting;
849 synchronized (_outQ)
850 {
851 disconnecting=_disconnecting;
852 }
853 if (disconnecting)
854 {
855 try{stop();}catch(Exception e){Log.ignore(e);}
856 return;
857 }
858
859 setJson(_json);
860 if (!send(this,backoff))
861 Log.warn("Retries exhausted");
862 }
863
864
865 protected void recycle()
866 {
867 if (_responses!=null)
868 for (Message msg:_responses)
869 if (msg instanceof MessageImpl)
870 ((MessageImpl)msg).decRef();
871 _responses=null;
872 }
873 }
874
875
876
877
878
879
880
881 protected class Handshake extends Exchange
882 {
883 public final static String __HANDSHAKE = "[{" + "\"channel\":\"/meta/handshake\"," + "\"version\":\"0.9\"," + "\"minimumVersion\":\"0.9\"" + "}]";
884
885 Handshake()
886 {
887 super("handshake");
888 setMessage(__HANDSHAKE);
889 }
890
891
892
893
894
895
896
897
898 protected void onResponseComplete() throws IOException
899 {
900 super.onResponseComplete();
901
902 if (!isRunning())
903 return;
904
905 if (_disconnecting)
906 {
907 Message error=_msgPool.newMessage();
908 error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
909 error.put("failure","expired");
910 metaHandshake(false,false,error);
911 try{stop();}catch(Exception e){Log.ignore(e);}
912 return;
913 }
914
915 if (getResponseStatus() == 200 && _responses != null && _responses.length > 0)
916 {
917 MessageImpl response = (MessageImpl)_responses[0];
918 boolean successful = response.isSuccessful();
919
920
921 Map adviceField = (Map)response.get(Bayeux.ADVICE_FIELD);
922 if (adviceField != null)
923 _advice = new Advice(adviceField);
924
925 if (successful)
926 {
927 _handshook = true;
928 if (Log.isDebugEnabled())
929 Log.debug("Successful handshake, sending connect");
930 _clientId = (String)response.get(Bayeux.CLIENT_FIELD);
931
932 metaHandshake(true,_handshook,response);
933 _pull = new Connect();
934 send(_pull,false);
935 }
936 else
937 {
938 metaHandshake(false,false,response);
939 _handshook = false;
940 if (_advice != null && _advice.isReconnectNone())
941 throw new IOException("Handshake failed with advice reconnect=none :" + _responses[0]);
942 else if (_advice != null && _advice.isReconnectHandshake())
943 {
944 _pull = new Handshake();
945 if (!send(_pull,true))
946 throw new IOException("Handshake, retries exhausted");
947 }
948 else
949
950 {
951 _pull = new Connect();
952 if (!send(_pull,true))
953 throw new IOException("Connect after handshake, retries exhausted");
954 }
955 }
956 }
957 else
958 {
959 Message error=_msgPool.newMessage();
960 error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
961 error.put("status",new Integer(getResponseStatus()));
962 error.put("content",getResponseContent());
963
964 metaHandshake(false,false,error);
965 resend(true);
966 }
967
968 recycle();
969 }
970
971
972 protected void onExpire()
973 {
974
975 Message error=_msgPool.newMessage();
976 error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
977 error.put("failure","expired");
978 metaHandshake(false,false,error);
979 resend(true);
980 }
981
982
983 protected void onConnectionFailed(Throwable ex)
984 {
985
986 Message error=_msgPool.newMessage();
987 error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
988 error.put("failure",ex.toString());
989 error.put("exception",ex);
990 ex.printStackTrace();
991 metaHandshake(false,false,error);
992 resend(true);
993 }
994
995
996 protected void onException(Throwable ex)
997 {
998
999 Message error=_msgPool.newMessage();
1000 error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
1001 error.put("failure",ex.toString());
1002 error.put("exception",ex);
1003 metaHandshake(false,false,error);
1004 resend(true);
1005 }
1006 }
1007
1008
1009
1010
1011
1012
1013 protected class Connect extends Exchange
1014 {
1015 String _connectString;
1016
1017 Connect()
1018 {
1019 super("connect");
1020 _connectString = "{" + "\"channel\":\"/meta/connect\"," + "\"clientId\":\"" + _clientId + "\"," + "\"connectionType\":\"long-polling\"" + "}";
1021 setMessage(_connectString);
1022 }
1023
1024 protected void onResponseComplete() throws IOException
1025 {
1026 super.onResponseComplete();
1027 if (!isRunning())
1028 return;
1029
1030 if (getResponseStatus() == 200 && _responses != null && _responses.length > 0)
1031 {
1032 try
1033 {
1034 startBatch();
1035
1036 for (int i = 0; i < _responses.length; i++)
1037 {
1038 Message msg = _responses[i];
1039
1040
1041 Map adviceField = (Map)msg.get(Bayeux.ADVICE_FIELD);
1042 if (adviceField != null)
1043 _advice = new Advice(adviceField);
1044
1045 if (Bayeux.META_CONNECT.equals(msg.get(Bayeux.CHANNEL_FIELD)))
1046 {
1047 Boolean successful = (Boolean)msg.get(Bayeux.SUCCESSFUL_FIELD);
1048 if (successful != null && successful.booleanValue())
1049 {
1050 metaConnect(true,msg);
1051
1052 if (!isRunning())
1053 break;
1054
1055 synchronized (_outQ)
1056 {
1057 if (_disconnecting)
1058 continue;
1059
1060 if (!isInitialized())
1061 {
1062 setInitialized(true);
1063 {
1064 if (_outQ.size() > 0)
1065 {
1066 _push = new Publish();
1067 send(_push);
1068 }
1069 }
1070 }
1071
1072 }
1073
1074
1075 _pull = new Connect();
1076 send(_pull,false);
1077 }
1078 else
1079 {
1080
1081
1082
1083
1084
1085
1086
1087
1088 setInitialized(false);
1089 metaConnect(false,msg);
1090
1091 synchronized(_outQ)
1092 {
1093 if (!isRunning()||_disconnecting)
1094 break;
1095 }
1096
1097 if (_advice != null && _advice.isReconnectNone())
1098 throw new IOException("Connect failed, advice reconnect=none");
1099 else if (_advice != null && _advice.isReconnectHandshake())
1100 {
1101 if (Log.isDebugEnabled())
1102 Log.debug("connect received success=false, advice is to rehandshake");
1103 _pull = new Handshake();
1104 send(_pull,true);
1105 }
1106 else
1107 {
1108
1109 if (Log.isDebugEnabled())
1110 Log.debug("Assuming retry=reconnect");
1111 resend(true);
1112 }
1113 }
1114 }
1115 deliver(null,msg);
1116 }
1117 }
1118 finally
1119 {
1120 endBatch();
1121 }
1122 }
1123 else
1124 {
1125 Message error=_msgPool.newMessage();
1126 error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
1127 error.put("status",getResponseStatus());
1128 error.put("content",getResponseContent());
1129 metaConnect(false,error);
1130 resend(true);
1131 }
1132
1133 recycle();
1134 }
1135
1136
1137 protected void onExpire()
1138 {
1139
1140 setInitialized(false);
1141 Message error=_msgPool.newMessage();
1142 error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
1143 error.put("failure","expired");
1144 metaConnect(false,error);
1145 resend(true);
1146 }
1147
1148
1149 protected void onConnectionFailed(Throwable ex)
1150 {
1151
1152 setInitialized(false);
1153 Message error=_msgPool.newMessage();
1154 error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
1155 error.put("failure",ex.toString());
1156 error.put("exception",ex);
1157 metaConnect(false,error);
1158 resend(true);
1159 }
1160
1161
1162 protected void onException(Throwable ex)
1163 {
1164
1165 setInitialized(false);
1166 Message error=_msgPool.newMessage();
1167 error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
1168 error.put("failure",ex.toString());
1169 error.put("exception",ex);
1170 metaConnect(false,error);
1171 resend(true);
1172 }
1173 }
1174
1175
1176
1177
1178
1179
1180 protected class Publish extends Exchange
1181 {
1182 Publish()
1183 {
1184 super("publish");
1185
1186 StringBuffer json = new StringBuffer(256);
1187 synchronized (json)
1188 {
1189 synchronized (_outQ)
1190 {
1191 int s=_outQ.size();
1192 if (s == 0)
1193 return;
1194
1195 for (int i=0;i<s;i++)
1196 {
1197 Message message = _outQ.getUnsafe(i);
1198 message.put(Bayeux.CLIENT_FIELD,_clientId);
1199 extendOut(message);
1200
1201 json.append(i==0?'[':',');
1202 _jsonOut.append(json,message);
1203
1204 if (message instanceof MessageImpl)
1205 ((MessageImpl)message).decRef();
1206 }
1207 json.append(']');
1208 _outQ.clear();
1209 setJson(json.toString());
1210 }
1211 }
1212 }
1213
1214 protected Message[] getOutboundMessages()
1215 {
1216 try
1217 {
1218 return _msgPool.parse(_json);
1219 }
1220 catch (IOException e)
1221 {
1222 Log.warn("Error converting outbound messages");
1223 if (Log.isDebugEnabled())
1224 Log.debug(e);
1225 return null;
1226 }
1227 }
1228
1229
1230
1231
1232
1233
1234
1235
1236 protected void onResponseComplete() throws IOException
1237 {
1238 if (!isRunning())
1239 return;
1240
1241 super.onResponseComplete();
1242 try
1243 {
1244 synchronized (_outQ)
1245 {
1246 startBatch();
1247 _push = null;
1248 }
1249
1250 if (getResponseStatus() == 200 && _responses != null && _responses.length > 0)
1251 {
1252 for (int i = 0; i < _responses.length; i++)
1253 {
1254 MessageImpl msg = (MessageImpl)_responses[i];
1255
1256 deliver(null,msg);
1257 if (Bayeux.META_DISCONNECT.equals(msg.getChannel())&&msg.isSuccessful())
1258 {
1259 if (isStarted())
1260 {
1261 try{stop();}catch(Exception e){Log.ignore(e);}
1262 }
1263 break;
1264 }
1265 }
1266 }
1267 else
1268 {
1269 Log.warn("Publish, error=" + getResponseStatus());
1270 }
1271 }
1272 finally
1273 {
1274 endBatch();
1275 }
1276 recycle();
1277 }
1278
1279
1280 protected void onExpire()
1281 {
1282 super.onExpire();
1283 metaPublishFail(null,this.getOutboundMessages());
1284 if (_disconnecting)
1285 {
1286 try{stop();}catch(Exception e){Log.ignore(e);}
1287 }
1288 }
1289
1290
1291 protected void onConnectionFailed(Throwable ex)
1292 {
1293 super.onConnectionFailed(ex);
1294 metaPublishFail(ex,this.getOutboundMessages());
1295 if (_disconnecting)
1296 {
1297 try{stop();}catch(Exception e){Log.ignore(e);}
1298 }
1299 }
1300
1301
1302 protected void onException(Throwable ex)
1303 {
1304 super.onException(ex);
1305 metaPublishFail(ex,this.getOutboundMessages());
1306 if (_disconnecting)
1307 {
1308 try{stop();}catch(Exception e){Log.ignore(e);}
1309 }
1310 }
1311 }
1312
1313
1314 public void addListener(ClientListener listener)
1315 {
1316 synchronized (_inQ)
1317 {
1318 boolean added=false;
1319 if (listener instanceof MessageListener)
1320 {
1321 added=true;
1322 if (_mListeners == null)
1323 _mListeners = new ArrayList<MessageListener>();
1324 _mListeners.add((MessageListener)listener);
1325 }
1326 if (listener instanceof RemoveListener)
1327 {
1328 added=true;
1329 if (_rListeners == null)
1330 _rListeners = new ArrayList<RemoveListener>();
1331 _rListeners.add((RemoveListener)listener);
1332 }
1333
1334 if (!added)
1335 throw new IllegalArgumentException();
1336 }
1337 }
1338
1339
1340 public void removeListener(ClientListener listener)
1341 {
1342 synchronized (_inQ)
1343 {
1344 if (listener instanceof MessageListener)
1345 {
1346 if (_mListeners != null)
1347 _mListeners.remove((MessageListener)listener);
1348 }
1349 if (listener instanceof RemoveListener)
1350 {
1351 if (_rListeners != null)
1352 _rListeners.remove((RemoveListener)listener);
1353 }
1354 }
1355 }
1356
1357
1358 public int getMaxQueue()
1359 {
1360 return -1;
1361 }
1362
1363
1364 public Queue<Message> getQueue()
1365 {
1366 return _inQ;
1367 }
1368
1369
1370 public void setMaxQueue(int max)
1371 {
1372 if (max != -1)
1373 throw new UnsupportedOperationException();
1374 }
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385 protected boolean send(final Exchange exchange, final boolean backoff)
1386 {
1387 long interval = (_advice != null?_advice.getInterval():0);
1388
1389 if (backoff)
1390 {
1391 int backoffInterval = exchange.getBackoff();
1392 if (Log.isDebugEnabled())
1393 Log.debug("Send with backoff, interval=" + backoffInterval + " for " + exchange);
1394
1395 exchange.incBackoff();
1396
1397 interval += backoffInterval;
1398 }
1399
1400 if (interval > 0)
1401 {
1402 TimerTask task = new TimerTask()
1403 {
1404 public void run()
1405 {
1406 try
1407 {
1408 send(exchange);
1409 }
1410 catch (IOException e)
1411 {
1412 Log.warn("Delayed send, retry: "+e);
1413 Log.debug(e);
1414 send(exchange,true);
1415 }
1416 catch (IllegalStateException e)
1417 {
1418 Log.warn("Delayed send, retry: "+e);
1419 Log.debug(e);
1420 send(exchange,true);
1421 }
1422 }
1423 };
1424 if (Log.isDebugEnabled())
1425 Log.debug("Delay " + interval + " send of " + exchange);
1426 _timer.schedule(task,interval);
1427 }
1428 else
1429 {
1430 try
1431 {
1432 send(exchange);
1433 }
1434 catch (IOException e)
1435 {
1436 Log.warn("Send, retry on fail: "+e);
1437 Log.debug(e);
1438 return send(exchange,true);
1439 }
1440 catch (IllegalStateException e)
1441 {
1442 Log.warn("Send, retry on fail: "+e);
1443 Log.debug(e);
1444 return send(exchange,true);
1445 }
1446 }
1447 return true;
1448
1449 }
1450
1451
1452
1453
1454
1455
1456
1457
1458 protected void send(HttpExchange exchange) throws IOException
1459 {
1460 exchange.reset();
1461 customize(exchange);
1462 if (Log.isDebugEnabled())
1463 Log.debug("Send: using any connection=" + exchange);
1464 _httpClient.send(exchange);
1465 }
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476 protected void setInitialized(boolean b)
1477 {
1478 synchronized (_outQ)
1479 {
1480 _initialized = b;
1481 }
1482 }
1483
1484
1485 protected boolean isInitialized()
1486 {
1487 return _initialized;
1488 }
1489
1490
1491
1492
1493
1494
1495 protected void metaConnect(boolean success, Message message)
1496 {
1497 if (!success)
1498 Log.warn(this.toString()+" "+message.toString());
1499 }
1500
1501
1502
1503
1504
1505
1506
1507 protected void metaHandshake(boolean success, boolean reestablish, Message message)
1508 {
1509 if (!success)
1510 Log.warn(this.toString()+" "+message.toString());
1511 }
1512
1513
1514
1515
1516
1517 protected void metaPublishFail(Throwable e, Message[] messages)
1518 {
1519 Log.warn(this.toString()+": "+e);
1520 Log.debug(e);
1521 }
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535 protected String extendOut(String msg)
1536 {
1537 if (_extensions==null)
1538 return msg;
1539
1540 try
1541 {
1542 Message[] messages = _msgPool.parse(msg);
1543 for (int i=0; i<messages.length; i++)
1544 extendOut(messages[i]);
1545 if (messages.length==1 && msg.charAt(0)=='{')
1546 return _msgPool.getMsgJSON().toJSON(messages[0]);
1547 return _msgPool.getMsgJSON().toJSON(messages);
1548 }
1549 catch(IOException e)
1550 {
1551 Log.warn(e);
1552 return msg;
1553 }
1554 }
1555
1556
1557
1558
1559
1560
1561
1562 protected void extendOut(Message message)
1563 {
1564 if (_extensions!=null)
1565 {
1566 Message m = message;
1567 if (m.getChannel().startsWith(Bayeux.META_SLASH))
1568 for (int i=0;m!=null && i<_extensions.length;i++)
1569 m=_extensions[i].sendMeta(this,m);
1570 else
1571 for (int i=0;m!=null && i<_extensions.length;i++)
1572 m=_extensions[i].send(this,m);
1573
1574 if (message!=m)
1575 {
1576 message.clear();
1577 if (m!=null)
1578 for (Map.Entry<String,Object> entry:m.entrySet())
1579 message.put(entry.getKey(),entry.getValue());
1580 }
1581 }
1582 }
1583
1584
1585
1586
1587
1588
1589
1590 protected void extendIn(Message message)
1591 {
1592 if (_extensions!=null)
1593 {
1594 Message m = message;
1595 if (m.getChannel().startsWith(Bayeux.META_SLASH))
1596 for (int i=_extensions.length;m!=null && i-->0;)
1597 m=_extensions[i].rcvMeta(this,m);
1598 else
1599 for (int i=_extensions.length;m!=null && i-->0;)
1600 m=_extensions[i].rcv(this,m);
1601
1602 if (message!=m)
1603 {
1604 message.clear();
1605 if (m!=null)
1606 for (Map.Entry<String,Object> entry:m.entrySet())
1607 message.put(entry.getKey(),entry.getValue());
1608 }
1609 }
1610 }
1611
1612 private static class ExpirableCookie
1613 {
1614 private final Cookie cookie;
1615 private final long expirationTime;
1616
1617 private ExpirableCookie(Cookie cookie, long expirationTime)
1618 {
1619 this.cookie = cookie;
1620 this.expirationTime = expirationTime;
1621 }
1622
1623 private boolean isExpired()
1624 {
1625 if (expirationTime < 0) return false;
1626 return System.currentTimeMillis() >= expirationTime;
1627 }
1628 }
1629 }