View Javadoc

1   // ========================================================================
2   // Copyright 2006-20078 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // Licensed under the Apache License, Version 2.0 (the "License");
5   // you may not use this file except in compliance with the License.
6   // You may obtain a copy of the License at
7   // http://www.apache.org/licenses/LICENSE-2.0
8   // Unless required by applicable law or agreed to in writing, software
9   // distributed under the License is distributed on an "AS IS" BASIS,
10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  // See the License for the specific language governing permissions and
12  // limitations under the License.
13  // ========================================================================
14  
15  package org.mortbay.cometd.client;
16  
17  import java.io.IOException;
18  import java.net.URLEncoder;
19  import java.text.ParseException;
20  import java.text.SimpleDateFormat;
21  import java.util.ArrayList;
22  import java.util.Date;
23  import java.util.LinkedList;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.Queue;
27  import java.util.Timer;
28  import java.util.TimerTask;
29  import java.util.concurrent.ConcurrentHashMap;
30  import java.util.concurrent.TimeUnit;
31  import javax.servlet.http.Cookie;
32  
33  import org.cometd.Bayeux;
34  import org.cometd.Client;
35  import org.cometd.ClientListener;
36  import org.cometd.Extension;
37  import org.cometd.Message;
38  import org.cometd.MessageListener;
39  import org.cometd.RemoveListener;
40  import org.mortbay.cometd.MessageImpl;
41  import org.mortbay.cometd.MessagePool;
42  import org.mortbay.component.AbstractLifeCycle;
43  import org.mortbay.io.Buffer;
44  import org.mortbay.io.ByteArrayBuffer;
45  import org.mortbay.jetty.HttpHeaders;
46  import org.mortbay.jetty.HttpSchemes;
47  import org.mortbay.jetty.HttpURI;
48  import org.mortbay.jetty.client.Address;
49  import org.mortbay.jetty.client.ContentExchange;
50  import org.mortbay.jetty.client.HttpClient;
51  import org.mortbay.jetty.client.HttpExchange;
52  import org.mortbay.log.Log;
53  import org.mortbay.util.ArrayQueue;
54  import org.mortbay.util.LazyList;
55  import org.mortbay.util.QuotedStringTokenizer;
56  import org.mortbay.util.ajax.JSON;
57  
58  /* ------------------------------------------------------------ */
59  /**
60   * Bayeux protocol Client.
61   * <p>
62   * Implements a Bayeux Ajax Push client as part of the cometd project.
63   * <p>
64   * The HttpClient attributes are used to share a Timer and MessagePool instance
65   * between all Bayeux clients sharing the same HttpClient.
66   *
67   * @see http://cometd.org
68   * @author gregw
69   *
70   */
71  public class BayeuxClient extends AbstractLifeCycle implements Client
72  {
73      private final static String __TIMER="org.mortbay.cometd.client.Timer";
74      private final static String __JSON="org.mortbay.cometd.client.JSON";
75      private final static String __MSGPOOL="org.mortbay.cometd.MessagePool";
76      protected HttpClient _httpClient;
77  
78      protected MessagePool _msgPool;
79      private ArrayQueue<Message> _inQ = new ArrayQueue<Message>();  // queue of incoming messages
80      private ArrayQueue<Message> _outQ = new ArrayQueue<Message>(); // queue of outgoing messages
81      protected Address _cometdAddress;
82      private Exchange _pull;
83      private Exchange _push;
84      private String _path = "/cometd";
85      private boolean _initialized = false;
86      private boolean _disconnecting = false;
87      private boolean _handshook = false;
88      private String _clientId;
89      private org.cometd.Listener _listener;
90      private List<RemoveListener> _rListeners;
91      private List<MessageListener> _mListeners;
92      private int _batch;
93      private boolean _formEncoded;
94      private Map<String, ExpirableCookie> _cookies = new ConcurrentHashMap<String, ExpirableCookie>();
95      private Advice _advice;
96      private Timer _timer;
97      private int _backoffInterval = 0;
98      private int _backoffIncrement = 1000;
99      private int _backoffMaxInterval = 60000;
100     private Buffer _scheme;
101     protected Extension[] _extensions;
102     protected JSON _jsonOut;
103 
104     /* ------------------------------------------------------------ */
105     public BayeuxClient(HttpClient client, String url)
106     {
107         this(client,url,null);
108     }
109 
110     /* ------------------------------------------------------------ */
111     public BayeuxClient(HttpClient client, String url, Timer timer)
112     {
113         HttpURI uri = new HttpURI(url);
114         _httpClient = client;
115         _cometdAddress = new Address(uri.getHost(),uri.getPort());
116         _path=uri.getPath();
117         _timer = timer;
118         _scheme = (HttpSchemes.HTTPS.equals(uri.getScheme()))?HttpSchemes.HTTPS_BUFFER:HttpSchemes.HTTP_BUFFER;
119     }
120 
121     /* ------------------------------------------------------------ */
122     public BayeuxClient(HttpClient client, Address address, String path, Timer timer)
123     {
124         _httpClient = client;
125         _cometdAddress = address;
126         _path = path;
127         _timer = timer;
128     }
129 
130     /* ------------------------------------------------------------ */
131     public BayeuxClient(HttpClient client, Address address, String uri)
132     {
133         this(client,address,uri,null);
134     }
135 
136     /* ------------------------------------------------------------ */
137     public void addExtension(Extension ext)
138     {
139         _extensions = (Extension[])LazyList.addToArray(_extensions,ext,Extension.class);
140     }
141 
142     /* ------------------------------------------------------------ */
143     Extension[] getExtensions()
144     {
145         return _extensions;
146     }
147 
148     /* ------------------------------------------------------------ */
149     /**
150      * If unable to connect/handshake etc, even if following the interval in the
151      * advice, wait for this interval initially, and try again.
152      *
153      * @param interval
154      */
155     public void setBackOffInterval(int interval)
156     {
157         _backoffInterval = interval;
158     }
159 
160     /* ------------------------------------------------------------ */
161     /**
162      * @return the backoff interval to wait before retrying an unsuccessful
163      * or failed message
164      */
165     public int getBackoffInterval()
166     {
167         return _backoffInterval;
168     }
169 
170     /* ------------------------------------------------------------ */
171     /**
172      * @deprecated We retry an infinite number of times.
173      * use {@link #getBackoffIncrement()} to set limits
174      */
175     public void setBackoffMaxRetries(int retries)
176     {
177     }
178 
179     /* ------------------------------------------------------------ */
180     /**
181      * @deprecated
182      */
183     public int getBackoffMaxRetries()
184     {
185         return -1;
186     }
187 
188     /* ------------------------------------------------------------ */
189     /**
190      . Each retry will increment by this
191      * intervel, until we reach _backoffMaxInterval
192      *
193     */
194     public void setBackoffIncrement(int interval)
195     {
196         _backoffIncrement = interval;
197     }
198 
199     /* ------------------------------------------------------------ */
200     /**
201      * @return the backoff interval used to increase the backoff time when
202      * retrying an unsuccessful or failed message.
203      */
204     public int getBackoffIncrement()
205     {
206         return _backoffIncrement;
207     }
208 
209     /* ------------------------------------------------------------ */
210     public void setBackoffMaxInterval(int interval)
211     {
212         _backoffMaxInterval = interval;
213     }
214 
215     public int getBackoffMaxInterval()
216     {
217         return _backoffMaxInterval;
218     }
219 
220     /* ------------------------------------------------------------ */
221     /*
222      * (non-Javadoc) Returns the clientId
223      *
224      * @see dojox.cometd.Client#getId()
225      */
226     public String getId()
227     {
228         return _clientId;
229     }
230 
231     /* ------------------------------------------------------------ */
232     protected void doStart() throws Exception
233     {
234         if (!_httpClient.isStarted())
235             throw new IllegalStateException("!HttpClient.isStarted()");
236 
237         synchronized (_httpClient)
238         {
239             if (_jsonOut == null)
240             {
241                 _jsonOut = (JSON)_httpClient.getAttribute(__JSON);
242                 if (_jsonOut==null)
243                 {
244                     _jsonOut = new JSON();
245                     _httpClient.setAttribute(__JSON,_jsonOut);
246                 }
247             }
248 
249             if (_timer == null)
250             {
251                 _timer = (Timer)_httpClient.getAttribute(__TIMER);
252                 if (_timer==null)
253                 {
254                     _timer = new Timer(__TIMER+"@"+hashCode(),true);
255                     _httpClient.setAttribute(__TIMER,_timer);
256                 }
257             }
258 
259             if (_msgPool == null)
260             {
261                 _msgPool = (MessagePool)_httpClient.getAttribute(__MSGPOOL);
262                 if (_msgPool==null)
263                 {
264                     _msgPool = new MessagePool();
265                     _httpClient.setAttribute(__MSGPOOL,_msgPool);
266                 }
267             }
268         }
269         _disconnecting=false;
270         _pull=null;
271         _push=null;
272         super.doStart();
273         synchronized (_outQ)
274         {
275             if (!_initialized && _pull == null)
276             {
277                 _pull = new Handshake();
278                 send((Exchange)_pull,false);
279             }
280         }
281     }
282 
283     /* ------------------------------------------------------------ */
284     protected void doStop() throws Exception
285     {
286         if (!_disconnecting)
287             disconnect();
288         super.doStop();
289     }
290 
291     /* ------------------------------------------------------------ */
292     public boolean isPolling()
293     {
294         synchronized (_outQ)
295         {
296             return isRunning() && (_pull != null);
297         }
298     }
299 
300     /* ------------------------------------------------------------ */
301     /**
302      * (non-Javadoc)
303      */
304     public void deliver(Client from, Message message)
305     {
306         if (!isRunning())
307             throw new IllegalStateException("Not running");
308 
309         synchronized (_inQ)
310         {
311             if (_mListeners == null)
312                 _inQ.add(message);
313             else
314             {
315                 for (MessageListener l : _mListeners)
316                     notifyMessageListener(l, from, message);
317             }
318         }
319     }
320 
321     private void notifyMessageListener(MessageListener listener, Client from, Message message)
322     {
323         try
324         {
325             listener.deliver(from, this, message);
326         }
327         catch (Throwable x)
328         {
329             Log.debug(x);
330         }
331     }
332 
333     /* ------------------------------------------------------------ */
334     /*
335      * (non-Javadoc)
336      *
337      * @see dojox.cometd.Client#deliver(dojox.cometd.Client, java.lang.String,
338      * java.lang.Object, java.lang.String)
339      */
340     public void deliver(Client from, String toChannel, Object data, String id)
341     {
342         if (!isRunning())
343             throw new IllegalStateException("Not running");
344 
345         MessageImpl message = _msgPool.newMessage();
346 
347         message.put(Bayeux.CHANNEL_FIELD,toChannel);
348         message.put(Bayeux.DATA_FIELD,data);
349         if (id != null)
350             message.put(Bayeux.ID_FIELD,id);
351 
352         synchronized (_inQ)
353         {
354             if (_mListeners == null)
355             {
356                 message.incRef();
357                 _inQ.add(message);
358             }
359             else
360             {
361                 for (MessageListener l : _mListeners)
362                     if (l instanceof MessageListener.Synchronous)
363                         notifyMessageListener(l, from, message);
364             }
365         }
366 
367         if (_mListeners !=null)
368             for (MessageListener l : _mListeners)
369                 if (!(l instanceof MessageListener.Synchronous))
370                     notifyMessageListener(l, from, message);
371 
372         message.decRef();
373     }
374 
375     /* ------------------------------------------------------------ */
376     /**
377      * @deprecated
378      */
379     public org.cometd.Listener getListener()
380     {
381         synchronized (_inQ)
382         {
383             return _listener;
384         }
385     }
386 
387     /* ------------------------------------------------------------ */
388     /*
389      * (non-Javadoc)
390      *
391      * @see dojox.cometd.Client#hasMessages()
392      */
393     public boolean hasMessages()
394     {
395         synchronized (_inQ)
396         {
397             return _inQ.size() > 0;
398         }
399     }
400 
401     /* ------------------------------------------------------------ */
402     /*
403      * (non-Javadoc)
404      *
405      * @see dojox.cometd.Client#isLocal()
406      */
407     public boolean isLocal()
408     {
409         return false;
410     }
411 
412     /* ------------------------------------------------------------ */
413     /*
414      * (non-Javadoc)
415      *
416      * @see dojox.cometd.Client#subscribe(java.lang.String)
417      */
418     private void publish(MessageImpl msg)
419     {
420         msg.incRef();
421         synchronized (_outQ)
422         {
423             _outQ.add(msg);
424 
425             if (_batch == 0 && _initialized && _push == null)
426             {
427                 _push = new Publish();
428                 try
429                 {
430                     send(_push);
431                 }
432                 catch (IOException e)
433                 {
434                     metaPublishFail(e,((Publish)_push).getOutboundMessages());
435                 }
436                 catch (IllegalStateException e)
437                 {
438                     metaPublishFail(e,((Publish)_push).getOutboundMessages());
439                 }
440             }
441         }
442     }
443 
444     /* ------------------------------------------------------------ */
445     /*
446      * (non-Javadoc)
447      *
448      * @see dojox.cometd.Client#publish(java.lang.String, java.lang.Object,
449      * java.lang.String)
450      */
451     public void publish(String toChannel, Object data, String msgId)
452     {
453         if (!isRunning() || _disconnecting)
454             throw new IllegalStateException("Not running");
455 
456         MessageImpl msg = _msgPool.newMessage();
457         msg.put(Bayeux.CHANNEL_FIELD,toChannel);
458         msg.put(Bayeux.DATA_FIELD,data);
459         if (msgId != null)
460             msg.put(Bayeux.ID_FIELD,msgId);
461         publish(msg);
462         msg.decRef();
463     }
464 
465     /* ------------------------------------------------------------ */
466     /*
467      * (non-Javadoc)
468      *
469      * @see dojox.cometd.Client#subscribe(java.lang.String)
470      */
471     public void subscribe(String toChannel)
472     {
473         if (!isRunning() || _disconnecting)
474             throw new IllegalStateException("Not running");
475 
476         MessageImpl msg = _msgPool.newMessage();
477         msg.put(Bayeux.CHANNEL_FIELD,Bayeux.META_SUBSCRIBE);
478         msg.put(Bayeux.SUBSCRIPTION_FIELD,toChannel);
479         publish(msg);
480         msg.decRef();
481     }
482 
483     /* ------------------------------------------------------------ */
484     /*
485      * (non-Javadoc)
486      *
487      * @see dojox.cometd.Client#unsubscribe(java.lang.String)
488      */
489     public void unsubscribe(String toChannel)
490     {
491         if (!isRunning() || _disconnecting)
492             throw new IllegalStateException("Not running");
493 
494         MessageImpl msg = _msgPool.newMessage();
495         msg.put(Bayeux.CHANNEL_FIELD,Bayeux.META_UNSUBSCRIBE);
496         msg.put(Bayeux.SUBSCRIPTION_FIELD,toChannel);
497         publish(msg);
498         msg.decRef();
499     }
500 
501     /* ------------------------------------------------------------ */
502     /**
503      * Disconnect this client.
504      * @deprecated use {@link #disconnect()}
505      */
506     public void remove()
507     {
508         disconnect();
509     }
510 
511     /* ------------------------------------------------------------ */
512     /**
513      * Disconnect this client.
514      */
515     public void disconnect()
516     {
517         if (isStopped() || _disconnecting)
518             throw new IllegalStateException("Not running");
519 
520         MessageImpl msg = _msgPool.newMessage();
521         msg.put(Bayeux.CHANNEL_FIELD,Bayeux.META_DISCONNECT);
522 
523         synchronized (_outQ)
524         {
525             _outQ.add(msg);
526             _disconnecting = true;
527             if (_batch == 0 && _initialized && _push == null)
528             {
529                 _push = new Publish();
530                 try
531                 {
532                     send(_push);
533                 }
534                 catch (IOException e)
535                 {
536                     Log.warn(e.toString());
537                     Log.debug(e);
538                     send(_push,true);
539                 }
540             }
541             _initialized = false;
542         }
543     }
544 
545     /* ------------------------------------------------------------ */
546     /**
547      * @deprecated
548      */
549     public void setListener(org.cometd.Listener listener)
550     {
551         synchronized (_inQ)
552         {
553             if (_listener != null)
554                 removeListener(_listener);
555             _listener = listener;
556             if (_listener != null)
557                 addListener(_listener);
558         }
559     }
560 
561     /* ------------------------------------------------------------ */
562     /*
563      * (non-Javadoc) Removes all available messages from the inbound queue. If a
564      * listener is set then messages are not queued.
565      *
566      * @see dojox.cometd.Client#takeMessages()
567      */
568     public List<Message> takeMessages()
569     {
570         final LinkedList<Message> list;
571         synchronized (_inQ)
572         {
573             list = new LinkedList<Message>(_inQ);
574             _inQ.clear();
575         }
576         for (Message m : list)
577             if (m instanceof MessageImpl)
578                 ((MessageImpl)m).decRef();
579         return list;
580     }
581 
582     /* ------------------------------------------------------------ */
583     /*
584      * (non-Javadoc)
585      *
586      * @see dojox.cometd.Client#endBatch()
587      */
588     public void endBatch()
589     {
590         synchronized (_outQ)
591         {
592             if (--_batch <= 0)
593             {
594                 _batch = 0;
595                 if ((_initialized || _disconnecting) && _push == null && _outQ.size() > 0)
596                 {
597                     _push = new Publish();
598                     try
599                     {
600                         send(_push);
601                     }
602                     catch (IOException e)
603                     {
604                         metaPublishFail(e,((Publish)_push).getOutboundMessages());
605                     }
606                 }
607             }
608         }
609     }
610 
611     /* ------------------------------------------------------------ */
612     /*
613      * (non-Javadoc)
614      *
615      * @see dojox.cometd.Client#startBatch()
616      */
617     public void startBatch()
618     {
619         if (isStopped())
620             throw new IllegalStateException("Not running");
621 
622         synchronized (_outQ)
623         {
624             _batch++;
625         }
626     }
627 
628     /* ------------------------------------------------------------ */
629     /**
630      * Customize an Exchange. Called when an exchange is about to be sent to
631      * allow Cookies and Credentials to be customized. Default implementation
632      * sets any cookies
633      */
634     protected void customize(HttpExchange exchange)
635     {
636         StringBuilder builder = null;
637         for (String cookieName : _cookies.keySet())
638         {
639             if (builder == null)
640                 builder = new StringBuilder();
641             else
642                 builder.append("; ");
643 
644             // Expiration is handled by getCookie()
645             Cookie cookie = getCookie(cookieName);
646             if (cookie != null)
647             {
648                 builder.append(cookie.getName()); // TODO quotes
649                 builder.append("=");
650                 builder.append(cookie.getValue()); // TODO quotes
651             }
652         }
653 
654         if (builder != null)
655             exchange.setRequestHeader(HttpHeaders.COOKIE,builder.toString());
656 
657         if (_scheme!=null)
658             exchange.setScheme(_scheme);
659     }
660 
661     /* ------------------------------------------------------------ */
662     public void setCookie(Cookie cookie)
663     {
664         long expirationTime = System.currentTimeMillis();
665         int maxAge = cookie.getMaxAge();
666         if (maxAge < 0)
667             expirationTime = -1L;
668         else
669             expirationTime += maxAge * 1000;
670 
671         ExpirableCookie expirableCookie = new ExpirableCookie(cookie, expirationTime);
672         _cookies.put(cookie.getName(), expirableCookie);
673     }
674 
675     public Cookie getCookie(String name)
676     {
677         ExpirableCookie cookie = _cookies.get(name);
678         if (cookie != null)
679         {
680             if (cookie.isExpired())
681             {
682                 _cookies.remove(name);
683                 cookie = null;
684             }
685         }
686         return cookie == null ? null : cookie.cookie;
687     }
688 
689     /* ------------------------------------------------------------ */
690     /* ------------------------------------------------------------ */
691     /* ------------------------------------------------------------ */
692     /**
693      * The base class for all bayeux exchanges.
694      */
695     protected class Exchange extends ContentExchange
696     {
697         Message[] _responses;
698         int _connectFailures;
699         int _backoff = _backoffInterval;
700         String _json;
701 
702         /* ------------------------------------------------------------ */
703         Exchange(String info)
704         {
705             setMethod("POST");
706             setScheme(HttpSchemes.HTTP_BUFFER);
707             setAddress(_cometdAddress);
708             setURI(_path + "/" + info);
709             setRequestContentType(_formEncoded?"application/x-www-form-urlencoded;charset=utf-8":"text/json;charset=utf-8");
710         }
711 
712         /* ------------------------------------------------------------ */
713         public int getBackoff()
714         {
715             return _backoff;
716         }
717 
718         /* ------------------------------------------------------------ */
719         public void incBackoff()
720         {
721             _backoff = Math.min(_backoff+_backoffIncrement,_backoffMaxInterval);
722         }
723 
724         /* ------------------------------------------------------------ */
725         protected void setMessage(String message)
726         {
727             message=extendOut(message);
728             setJson(message);
729         }
730 
731         /* ------------------------------------------------------------ */
732         protected void setJson(String json)
733         {
734             try
735             {
736                 _json = json;
737 
738                 if (_formEncoded)
739                     setRequestContent(new ByteArrayBuffer("message=" + URLEncoder.encode(_json,"utf-8")));
740                 else
741                     setRequestContent(new ByteArrayBuffer(_json,"utf-8"));
742             }
743             catch (Exception e)
744             {
745                 Log.ignore(e);
746                 setRequestContent(new ByteArrayBuffer(_json));
747             }
748         }
749 
750         /* ------------------------------------------------------------ */
751         protected void onResponseStatus(Buffer version, int status, Buffer reason) throws IOException
752         {
753             super.onResponseStatus(version,status,reason);
754         }
755 
756         /* ------------------------------------------------------------ */
757         protected void onResponseHeader(Buffer name, Buffer value) throws IOException
758         {
759             super.onResponseHeader(name,value);
760             if (!isRunning())
761                 return;
762 
763             if (HttpHeaders.CACHE.getOrdinal(name) == HttpHeaders.SET_COOKIE_ORDINAL)
764             {
765                 String cname = null;
766                 String cvalue = null;
767 
768                 QuotedStringTokenizer tok = new QuotedStringTokenizer(value.toString(),"=;",false,false);
769                 tok.setSingle(false);
770 
771                 if (tok.hasMoreElements())
772                     cname = tok.nextToken();
773                 if (tok.hasMoreElements())
774                     cvalue = tok.nextToken();
775 
776                 Cookie cookie = new Cookie(cname,cvalue);
777 
778                 while (tok.hasMoreTokens())
779                 {
780                     String token = tok.nextToken();
781                     if ("Version".equalsIgnoreCase(token))
782                         cookie.setVersion(Integer.parseInt(tok.nextToken()));
783                     else if ("Comment".equalsIgnoreCase(token))
784                         cookie.setComment(tok.nextToken());
785                     else if ("Path".equalsIgnoreCase(token))
786                         cookie.setPath(tok.nextToken());
787                     else if ("Domain".equalsIgnoreCase(token))
788                         cookie.setDomain(tok.nextToken());
789                     else if ("Expires".equalsIgnoreCase(token))
790                     {
791                         try
792                         {
793                             Date date = new SimpleDateFormat("EEE, dd-MMM-yy HH:mm:ss 'GMT'").parse(tok.nextToken());
794                             Long maxAge = TimeUnit.MILLISECONDS.toSeconds(date.getTime() - System.currentTimeMillis());
795                             cookie.setMaxAge(maxAge > 0 ? maxAge.intValue() : 0);
796                         }
797                         catch (ParseException ignored)
798                         {
799                         }
800                     }
801                     else if ("Max-Age".equalsIgnoreCase(token))
802                     {
803                         try
804                         {
805                             int maxAge = Integer.parseInt(tok.nextToken());
806                             cookie.setMaxAge(maxAge);
807                         }
808                         catch (NumberFormatException ignored)
809                         {
810                         }
811                     }
812                     else if ("Secure".equalsIgnoreCase(token))
813                         cookie.setSecure(true);
814                 }
815 
816                 BayeuxClient.this.setCookie(cookie);
817             }
818         }
819 
820         /* ------------------------------------------------------------ */
821         protected void onResponseComplete() throws IOException
822         {
823             if (!isRunning())
824                 return;
825 
826             super.onResponseComplete();
827 
828             if (getResponseStatus() == 200)
829             {
830                 String content = getResponseContent();
831                 // TODO
832                 if (content == null || content.length() == 0)
833                     throw new IllegalStateException();
834                 _responses = _msgPool.parse(content);
835 
836                 if (_responses!=null)
837                     for (int i=0;i<_responses.length;i++)
838                         extendIn(_responses[i]);
839             }
840         }
841 
842         /* ------------------------------------------------------------ */
843         protected void resend(boolean backoff)
844         {
845             if (!isRunning())
846                 return;
847 
848             final boolean disconnecting;
849             synchronized (_outQ)
850             {
851                 disconnecting=_disconnecting;
852             }
853             if (disconnecting)
854             {
855                 try{stop();}catch(Exception e){Log.ignore(e);}
856                 return;
857             }
858 
859             setJson(_json);
860             if (!send(this,backoff))
861                 Log.warn("Retries exhausted"); // giving up
862         }
863 
864         /* ------------------------------------------------------------ */
865         protected void recycle()
866         {
867             if (_responses!=null)
868                 for (Message msg:_responses)
869                     if (msg instanceof MessageImpl)
870                         ((MessageImpl)msg).decRef();
871             _responses=null;
872         }
873     }
874 
875     /* ------------------------------------------------------------ */
876     /**
877      * The Bayeux handshake exchange. Negotiates a client Id and initializes the
878      * protocol.
879      *
880      */
881     protected class Handshake extends Exchange
882     {
883         public final static String __HANDSHAKE = "[{" + "\"channel\":\"/meta/handshake\"," + "\"version\":\"0.9\"," + "\"minimumVersion\":\"0.9\"" + "}]";
884 
885         Handshake()
886         {
887             super("handshake");
888             setMessage(__HANDSHAKE);
889         }
890 
891         /* ------------------------------------------------------------ */
892         /*
893          * (non-Javadoc)
894          *
895          * @see
896          * org.mortbay.cometd.client.BayeuxClient.Exchange#onResponseComplete()
897          */
898         protected void onResponseComplete() throws IOException
899         {
900             super.onResponseComplete();
901 
902             if (!isRunning())
903                 return;
904 
905             if (_disconnecting)
906             {
907                 Message error=_msgPool.newMessage();
908                 error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
909                 error.put("failure","expired");
910                 metaHandshake(false,false,error);
911                 try{stop();}catch(Exception e){Log.ignore(e);}
912                 return;
913             }
914 
915             if (getResponseStatus() == 200 && _responses != null && _responses.length > 0)
916             {
917                 MessageImpl response = (MessageImpl)_responses[0];
918                 boolean successful = response.isSuccessful();
919 
920                 // Get advice if there is any
921                 Map adviceField = (Map)response.get(Bayeux.ADVICE_FIELD);
922                 if (adviceField != null)
923                     _advice = new Advice(adviceField);
924 
925                 if (successful)
926                 {
927                     _handshook = true;
928                     if (Log.isDebugEnabled())
929                         Log.debug("Successful handshake, sending connect");
930                     _clientId = (String)response.get(Bayeux.CLIENT_FIELD);
931 
932                     metaHandshake(true,_handshook,response);
933                     _pull = new Connect();
934                     send(_pull,false);
935                 }
936                 else
937                 {
938                     metaHandshake(false,false,response);
939                     _handshook = false;
940                     if (_advice != null && _advice.isReconnectNone())
941                         throw new IOException("Handshake failed with advice reconnect=none :" + _responses[0]);
942                     else if (_advice != null && _advice.isReconnectHandshake())
943                     {
944                         _pull = new Handshake();
945                         if (!send(_pull,true))
946                             throw new IOException("Handshake, retries exhausted");
947                     }
948                     else
949                     // assume retry = reconnect?
950                     {
951                         _pull = new Connect();
952                         if (!send(_pull,true))
953                             throw new IOException("Connect after handshake, retries exhausted");
954                     }
955                 }
956             }
957             else
958             {
959                 Message error=_msgPool.newMessage();
960                 error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
961                 error.put("status",new Integer(getResponseStatus()));
962                 error.put("content",getResponseContent());
963 
964                 metaHandshake(false,false,error);
965                 resend(true);
966             }
967 
968             recycle();
969         }
970 
971         /* ------------------------------------------------------------ */
972         protected void onExpire()
973         {
974             // super.onExpire();
975             Message error=_msgPool.newMessage();
976             error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
977             error.put("failure","expired");
978             metaHandshake(false,false,error);
979             resend(true);
980         }
981 
982         /* ------------------------------------------------------------ */
983         protected void onConnectionFailed(Throwable ex)
984         {
985             // super.onConnectionFailed(ex);
986             Message error=_msgPool.newMessage();
987             error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
988             error.put("failure",ex.toString());
989             error.put("exception",ex);
990             ex.printStackTrace();
991             metaHandshake(false,false,error);
992             resend(true);
993         }
994 
995         /* ------------------------------------------------------------ */
996         protected void onException(Throwable ex)
997         {
998             // super.onException(ex);
999             Message error=_msgPool.newMessage();
1000             error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
1001             error.put("failure",ex.toString());
1002             error.put("exception",ex);
1003             metaHandshake(false,false,error);
1004             resend(true);
1005         }
1006     }
1007 
1008     /* ------------------------------------------------------------ */
1009     /**
1010      * The Bayeux Connect exchange. Connect exchanges implement the long poll
1011      * for Bayeux.
1012      */
1013     protected class Connect extends Exchange
1014     {
1015         String _connectString;
1016 
1017         Connect()
1018         {
1019             super("connect");
1020             _connectString = "{" + "\"channel\":\"/meta/connect\"," + "\"clientId\":\"" + _clientId + "\"," + "\"connectionType\":\"long-polling\"" + "}";
1021             setMessage(_connectString);
1022         }
1023 
1024         protected void onResponseComplete() throws IOException
1025         {
1026             super.onResponseComplete();
1027             if (!isRunning())
1028                 return;
1029 
1030             if (getResponseStatus() == 200 && _responses != null && _responses.length > 0)
1031             {
1032                 try
1033                 {
1034                     startBatch();
1035 
1036                     for (int i = 0; i < _responses.length; i++)
1037                     {
1038                         Message msg = _responses[i];
1039 
1040                         // get advice if there is any
1041                         Map adviceField = (Map)msg.get(Bayeux.ADVICE_FIELD);
1042                         if (adviceField != null)
1043                             _advice = new Advice(adviceField);
1044 
1045                         if (Bayeux.META_CONNECT.equals(msg.get(Bayeux.CHANNEL_FIELD)))
1046                         {
1047                             Boolean successful = (Boolean)msg.get(Bayeux.SUCCESSFUL_FIELD);
1048                             if (successful != null && successful.booleanValue())
1049                             {
1050                                 metaConnect(true,msg);
1051 
1052                                 if (!isRunning())
1053                                     break;
1054 
1055                                 synchronized (_outQ)
1056                                 {
1057                                     if (_disconnecting)
1058                                         continue;
1059 
1060                                     if (!isInitialized())
1061                                     {
1062                                         setInitialized(true);
1063                                         {
1064                                             if (_outQ.size() > 0)
1065                                             {
1066                                                 _push = new Publish();
1067                                                 send(_push);
1068                                             }
1069                                         }
1070                                     }
1071 
1072                                 }
1073                                 // send a Connect (ie longpoll) possibly with
1074                                 // delay according to interval advice
1075                                 _pull = new Connect();
1076                                 send(_pull,false);
1077                             }
1078                             else
1079                             {
1080                                 // received a failure to our connect message,
1081                                 // check the advice to see what to do:
1082                                 // reconnect: none = hard error
1083                                 // reconnect: handshake = send a handshake
1084                                 // message
1085                                 // reconnect: retry = send another connect,
1086                                 // possibly using interval
1087 
1088                                 setInitialized(false);
1089                                 metaConnect(false,msg);
1090 
1091                                 synchronized(_outQ)
1092                                 {
1093                                     if (!isRunning()||_disconnecting)
1094                                         break;
1095                                 }
1096 
1097                                 if (_advice != null && _advice.isReconnectNone())
1098                                     throw new IOException("Connect failed, advice reconnect=none");
1099                                 else if (_advice != null && _advice.isReconnectHandshake())
1100                                 {
1101                                     if (Log.isDebugEnabled())
1102                                         Log.debug("connect received success=false, advice is to rehandshake");
1103                                     _pull = new Handshake();
1104                                     send(_pull,true);
1105                                 }
1106                                 else
1107                                 {
1108                                     // assume retry = reconnect
1109                                     if (Log.isDebugEnabled())
1110                                         Log.debug("Assuming retry=reconnect");
1111                                     resend(true);
1112                                 }
1113                             }
1114                         }
1115                         deliver(null,msg);
1116                     }
1117                 }
1118                 finally
1119                 {
1120                     endBatch();
1121                 }
1122             }
1123             else
1124             {
1125                 Message error=_msgPool.newMessage();
1126                 error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
1127                 error.put("status",getResponseStatus());
1128                 error.put("content",getResponseContent());
1129                 metaConnect(false,error);
1130                 resend(true);
1131             }
1132 
1133             recycle();
1134         }
1135 
1136         /* ------------------------------------------------------------ */
1137         protected void onExpire()
1138         {
1139             // super.onExpire();
1140             setInitialized(false);
1141             Message error=_msgPool.newMessage();
1142             error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
1143             error.put("failure","expired");
1144             metaConnect(false,error);
1145             resend(true);
1146         }
1147 
1148         /* ------------------------------------------------------------ */
1149         protected void onConnectionFailed(Throwable ex)
1150         {
1151             // super.onConnectionFailed(ex);
1152             setInitialized(false);
1153             Message error=_msgPool.newMessage();
1154             error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
1155             error.put("failure",ex.toString());
1156             error.put("exception",ex);
1157             metaConnect(false,error);
1158             resend(true);
1159         }
1160 
1161         /* ------------------------------------------------------------ */
1162         protected void onException(Throwable ex)
1163         {
1164             // super.onException(ex);
1165             setInitialized(false);
1166             Message error=_msgPool.newMessage();
1167             error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
1168             error.put("failure",ex.toString());
1169             error.put("exception",ex);
1170             metaConnect(false,error);
1171             resend(true);
1172         }
1173     }
1174 
1175     /* ------------------------------------------------------------ */
1176     /**
1177      * Publish message exchange. Sends messages to bayeux server and handles any
1178      * messages received as a result.
1179      */
1180     protected class Publish extends Exchange
1181     {
1182         Publish()
1183         {
1184             super("publish");
1185 
1186             StringBuffer json = new StringBuffer(256);
1187             synchronized (json)
1188             {
1189                 synchronized (_outQ)
1190                 {
1191                     int s=_outQ.size();
1192                     if (s == 0)
1193                         return;
1194 
1195                     for (int i=0;i<s;i++)
1196                     {
1197                         Message message = _outQ.getUnsafe(i);
1198                         message.put(Bayeux.CLIENT_FIELD,_clientId);
1199                         extendOut(message);
1200 
1201                         json.append(i==0?'[':',');
1202                         _jsonOut.append(json,message);
1203 
1204                         if (message instanceof MessageImpl)
1205                             ((MessageImpl)message).decRef();
1206                     }
1207                     json.append(']');
1208                     _outQ.clear();
1209                     setJson(json.toString());
1210                 }
1211             }
1212         }
1213 
1214         protected Message[] getOutboundMessages()
1215         {
1216             try
1217             {
1218                 return _msgPool.parse(_json);
1219             }
1220             catch (IOException e)
1221             {
1222                 Log.warn("Error converting outbound messages");
1223                 if (Log.isDebugEnabled())
1224                     Log.debug(e);
1225                 return null;
1226             }
1227         }
1228 
1229         /* ------------------------------------------------------------ */
1230         /*
1231          * (non-Javadoc)
1232          *
1233          * @see
1234          * org.mortbay.cometd.client.BayeuxClient.Exchange#onResponseComplete()
1235          */
1236         protected void onResponseComplete() throws IOException
1237         {
1238             if (!isRunning())
1239                 return;
1240 
1241             super.onResponseComplete();
1242             try
1243             {
1244                 synchronized (_outQ)
1245                 {
1246                     startBatch();
1247                     _push = null;
1248                 }
1249 
1250                 if (getResponseStatus() == 200 && _responses != null && _responses.length > 0)
1251                 {
1252                     for (int i = 0; i < _responses.length; i++)
1253                     {
1254                         MessageImpl msg = (MessageImpl)_responses[i];
1255 
1256                         deliver(null,msg);
1257                         if (Bayeux.META_DISCONNECT.equals(msg.getChannel())&&msg.isSuccessful())
1258                         {
1259                             if (isStarted())
1260                             {
1261                                 try{stop();}catch(Exception e){Log.ignore(e);}
1262                             }
1263                             break;
1264                         }
1265                     }
1266                 }
1267                 else
1268                 {
1269                     Log.warn("Publish, error=" + getResponseStatus());
1270                 }
1271             }
1272             finally
1273             {
1274                 endBatch();
1275             }
1276             recycle();
1277         }
1278 
1279         /* ------------------------------------------------------------ */
1280         protected void onExpire()
1281         {
1282             super.onExpire();
1283             metaPublishFail(null,this.getOutboundMessages());
1284             if (_disconnecting)
1285             {
1286                 try{stop();}catch(Exception e){Log.ignore(e);}
1287             }
1288         }
1289 
1290         /* ------------------------------------------------------------ */
1291         protected void onConnectionFailed(Throwable ex)
1292         {
1293             super.onConnectionFailed(ex);
1294             metaPublishFail(ex,this.getOutboundMessages());
1295             if (_disconnecting)
1296             {
1297                 try{stop();}catch(Exception e){Log.ignore(e);}
1298             }
1299         }
1300 
1301         /* ------------------------------------------------------------ */
1302         protected void onException(Throwable ex)
1303         {
1304             super.onException(ex);
1305             metaPublishFail(ex,this.getOutboundMessages());
1306             if (_disconnecting)
1307             {
1308                 try{stop();}catch(Exception e){Log.ignore(e);}
1309             }
1310         }
1311     }
1312 
1313     /* ------------------------------------------------------------ */
1314     public void addListener(ClientListener listener)
1315     {
1316         synchronized (_inQ)
1317         {
1318             boolean added=false;
1319             if (listener instanceof MessageListener)
1320             {
1321                 added=true;
1322                 if (_mListeners == null)
1323                     _mListeners = new ArrayList<MessageListener>();
1324                 _mListeners.add((MessageListener)listener);
1325             }
1326             if (listener instanceof RemoveListener)
1327             {
1328                 added=true;
1329                 if (_rListeners == null)
1330                     _rListeners = new ArrayList<RemoveListener>();
1331                 _rListeners.add((RemoveListener)listener);
1332             }
1333 
1334             if (!added)
1335                 throw new IllegalArgumentException();
1336         }
1337     }
1338 
1339     /* ------------------------------------------------------------ */
1340     public void removeListener(ClientListener listener)
1341     {
1342         synchronized (_inQ)
1343         {
1344             if (listener instanceof MessageListener)
1345             {
1346                 if (_mListeners != null)
1347                     _mListeners.remove((MessageListener)listener);
1348             }
1349             if (listener instanceof RemoveListener)
1350             {
1351                 if (_rListeners != null)
1352                     _rListeners.remove((RemoveListener)listener);
1353             }
1354         }
1355     }
1356 
1357     /* ------------------------------------------------------------ */
1358     public int getMaxQueue()
1359     {
1360         return -1;
1361     }
1362 
1363     /* ------------------------------------------------------------ */
1364     public Queue<Message> getQueue()
1365     {
1366         return _inQ;
1367     }
1368 
1369     /* ------------------------------------------------------------ */
1370     public void setMaxQueue(int max)
1371     {
1372         if (max != -1)
1373             throw new UnsupportedOperationException();
1374     }
1375 
1376     /* ------------------------------------------------------------ */
1377     /**
1378      * Send the exchange, possibly using a backoff.
1379      *
1380      * @param exchange
1381      * @param backoff
1382      *            if true, use backoff algorithm to send
1383      * @return
1384      */
1385     protected boolean send(final Exchange exchange, final boolean backoff)
1386     {
1387         long interval = (_advice != null?_advice.getInterval():0);
1388 
1389         if (backoff)
1390         {
1391             int backoffInterval = exchange.getBackoff();
1392             if (Log.isDebugEnabled())
1393                 Log.debug("Send with backoff, interval=" + backoffInterval + " for " + exchange);
1394 
1395             exchange.incBackoff();
1396 
1397             interval += backoffInterval;
1398         }
1399 
1400         if (interval > 0)
1401         {
1402             TimerTask task = new TimerTask()
1403             {
1404                 public void run()
1405                 {
1406                     try
1407                     {
1408                         send(exchange);
1409                     }
1410                     catch (IOException e)
1411                     {
1412                         Log.warn("Delayed send, retry: "+e);
1413                         Log.debug(e);
1414                         send(exchange,true);
1415                     }
1416                     catch (IllegalStateException e)
1417                     {
1418                         Log.warn("Delayed send, retry: "+e);
1419                         Log.debug(e);
1420                         send(exchange,true);
1421                     }
1422                 }
1423             };
1424             if (Log.isDebugEnabled())
1425                 Log.debug("Delay " + interval + " send of " + exchange);
1426             _timer.schedule(task,interval);
1427         }
1428         else
1429         {
1430             try
1431             {
1432                 send(exchange);
1433             }
1434             catch (IOException e)
1435             {
1436                 Log.warn("Send, retry on fail: "+e);
1437                 Log.debug(e);
1438                 return send(exchange,true);
1439             }
1440             catch (IllegalStateException e)
1441             {
1442                 Log.warn("Send, retry on fail: "+e);
1443                 Log.debug(e);
1444                 return send(exchange,true);
1445             }
1446         }
1447         return true;
1448 
1449     }
1450 
1451     /* ------------------------------------------------------------ */
1452     /**
1453      * Send the exchange.
1454      *
1455      * @param exchange
1456      * @throws IOException
1457      */
1458     protected void send(HttpExchange exchange) throws IOException
1459     {
1460         exchange.reset(); // ensure at start state
1461         customize(exchange);
1462         if (Log.isDebugEnabled())
1463             Log.debug("Send: using any connection=" + exchange);
1464         _httpClient.send(exchange); // use any connection
1465     }
1466 
1467     /* ------------------------------------------------------------ */
1468     /**
1469      * False when we have received a success=false message in response to a
1470      * Connect, or we have had an exception when sending or receiving a Connect.
1471      *
1472      * True when handshake and then connect has happened.
1473      *
1474      * @param b
1475      */
1476     protected void setInitialized(boolean b)
1477     {
1478         synchronized (_outQ)
1479         {
1480             _initialized = b;
1481         }
1482     }
1483 
1484     /* ------------------------------------------------------------ */
1485     protected boolean isInitialized()
1486     {
1487         return _initialized;
1488     }
1489 
1490     /* ------------------------------------------------------------ */
1491     /**
1492      * Called with the results of a /meta/connect message
1493      * @param success connect was returned with this status
1494      */
1495     protected void metaConnect(boolean success, Message message)
1496     {
1497         if (!success)
1498             Log.warn(this.toString()+" "+message.toString());
1499     }
1500 
1501     /* ------------------------------------------------------------ */
1502     /**
1503      * Called with the results of a /meta/handshake message
1504      * @param success connect was returned with this status
1505      * @param reestablish the client was previously connected.
1506      */
1507     protected void metaHandshake(boolean success, boolean reestablish, Message message)
1508     {
1509         if (!success)
1510             Log.warn(this.toString()+" "+message.toString());
1511     }
1512 
1513     /* ------------------------------------------------------------ */
1514     /**
1515      * Called with the results of a failed publish
1516      */
1517     protected void metaPublishFail(Throwable e, Message[] messages)
1518     {
1519         Log.warn(this.toString()+": "+e);
1520         Log.debug(e);
1521     }
1522 
1523     /* ------------------------------------------------------------ */
1524     /** Called to extend outbound string messages.
1525      * Some messages are sent as preformatted JSON strings (eg handshake
1526      * and connect messages).  This extendOut method is a variation of the
1527      * {@link #extendOut(Message)} method to efficiently cater for these
1528      * preformatted strings.
1529      * <p>
1530      * This method calls the {@link Extension}s added by {@link #addExtension(Extension)}
1531      *
1532      * @param msg
1533      * @return the extended message
1534      */
1535     protected String extendOut(String msg)
1536     {
1537         if (_extensions==null)
1538             return msg;
1539 
1540         try
1541         {
1542             Message[] messages = _msgPool.parse(msg);
1543             for (int i=0; i<messages.length; i++)
1544                 extendOut(messages[i]);
1545             if (messages.length==1 && msg.charAt(0)=='{')
1546                 return _msgPool.getMsgJSON().toJSON(messages[0]);
1547             return _msgPool.getMsgJSON().toJSON(messages);
1548         }
1549         catch(IOException e)
1550         {
1551             Log.warn(e);
1552             return msg;
1553         }
1554     }
1555 
1556     /* ------------------------------------------------------------ */
1557     /** Called to extend outbound messages
1558      * <p>
1559      * This method calls the {@link Extension}s added by {@link #addExtension(Extension)}
1560      *
1561      */
1562     protected void extendOut(Message message)
1563     {
1564         if (_extensions!=null)
1565         {
1566             Message m = message;
1567             if (m.getChannel().startsWith(Bayeux.META_SLASH))
1568                 for (int i=0;m!=null && i<_extensions.length;i++)
1569                     m=_extensions[i].sendMeta(this,m);
1570             else
1571                 for (int i=0;m!=null && i<_extensions.length;i++)
1572                     m=_extensions[i].send(this,m);
1573 
1574             if (message!=m)
1575             {
1576                 message.clear();
1577                 if (m!=null)
1578                     for (Map.Entry<String,Object> entry:m.entrySet())
1579                         message.put(entry.getKey(),entry.getValue());
1580             }
1581         }
1582     }
1583 
1584     /* ------------------------------------------------------------ */
1585     /** Called to extend inbound messages
1586      * <p>
1587      * This method calls the {@link Extension}s added by {@link #addExtension(Extension)}
1588      *
1589      */
1590     protected void extendIn(Message message)
1591     {
1592         if (_extensions!=null)
1593         {
1594             Message m = message;
1595             if (m.getChannel().startsWith(Bayeux.META_SLASH))
1596                 for (int i=_extensions.length;m!=null && i-->0;)
1597                     m=_extensions[i].rcvMeta(this,m);
1598             else
1599                 for (int i=_extensions.length;m!=null && i-->0;)
1600                     m=_extensions[i].rcv(this,m);
1601 
1602             if (message!=m)
1603             {
1604                 message.clear();
1605                 if (m!=null)
1606                     for (Map.Entry<String,Object> entry:m.entrySet())
1607                         message.put(entry.getKey(),entry.getValue());
1608             }
1609         }
1610     }
1611 
1612     private static class ExpirableCookie
1613     {
1614         private final Cookie cookie;
1615         private final long expirationTime;
1616 
1617         private ExpirableCookie(Cookie cookie, long expirationTime)
1618         {
1619             this.cookie = cookie;
1620             this.expirationTime = expirationTime;
1621         }
1622 
1623         private boolean isExpired()
1624         {
1625             if (expirationTime < 0) return false;
1626             return System.currentTimeMillis() >= expirationTime;
1627         }
1628     }
1629 }