View Javadoc

1   // ========================================================================
2   // Copyright 2006-20078 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // Licensed under the Apache License, Version 2.0 (the "License");
5   // you may not use this file except in compliance with the License.
6   // You may obtain a copy of the License at
7   // http://www.apache.org/licenses/LICENSE-2.0
8   // Unless required by applicable law or agreed to in writing, software
9   // distributed under the License is distributed on an "AS IS" BASIS,
10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  // See the License for the specific language governing permissions and
12  // limitations under the License.
13  // ========================================================================
14  
15  package org.mortbay.cometd.client;
16  
17  import java.io.IOException;
18  import java.net.URLEncoder;
19  import java.util.ArrayList;
20  import java.util.LinkedList;
21  import java.util.List;
22  import java.util.Map;
23  import java.util.Queue;
24  import java.util.Timer;
25  import java.util.TimerTask;
26  import java.util.concurrent.ConcurrentHashMap;
27  
28  import javax.servlet.http.Cookie;
29  
30  import org.cometd.Bayeux;
31  import org.cometd.Client;
32  import org.cometd.ClientListener;
33  import org.cometd.Extension;
34  import org.cometd.Message;
35  import org.cometd.MessageListener;
36  import org.cometd.RemoveListener;
37  import org.mortbay.cometd.MessageImpl;
38  import org.mortbay.cometd.MessagePool;
39  import org.mortbay.component.AbstractLifeCycle;
40  import org.mortbay.io.Buffer;
41  import org.mortbay.io.ByteArrayBuffer;
42  import org.mortbay.jetty.HttpHeaders;
43  import org.mortbay.jetty.HttpSchemes;
44  import org.mortbay.jetty.HttpURI;
45  import org.mortbay.jetty.client.Address;
46  import org.mortbay.jetty.client.ContentExchange;
47  import org.mortbay.jetty.client.HttpClient;
48  import org.mortbay.jetty.client.HttpExchange;
49  import org.mortbay.log.Log;
50  import org.mortbay.util.ArrayQueue;
51  import org.mortbay.util.LazyList;
52  import org.mortbay.util.QuotedStringTokenizer;
53  import org.mortbay.util.ajax.JSON;
54  
55  /* ------------------------------------------------------------ */
56  /**
57   * Bayeux protocol Client.
58   * <p>
59   * Implements a Bayeux Ajax Push client as part of the cometd project.
60   * <p>
61   * The HttpClient attributes are used to share a Timer and MessagePool instance
62   * between all Bayeux clients sharing the same HttpClient.
63   * 
64   * @see http://cometd.org
65   * @author gregw
66   * 
67   */
68  public class BayeuxClient extends AbstractLifeCycle implements Client
69  {
70      private final static String __TIMER="org.mortbay.cometd.client.Timer";
71      private final static String __JSON="org.mortbay.cometd.client.JSON";
72      private final static String __MSGPOOL="org.mortbay.cometd.MessagePool";
73      protected HttpClient _httpClient;
74  
75      protected MessagePool _msgPool;
76      private ArrayQueue<Message> _inQ = new ArrayQueue<Message>();  // queue of incoming messages 
77      private ArrayQueue<Message> _outQ = new ArrayQueue<Message>(); // queue of outgoing messages
78      protected Address _cometdAddress;
79      private Exchange _pull;
80      private Exchange _push;
81      private String _path = "/cometd";
82      private boolean _initialized = false;
83      private boolean _disconnecting = false;
84      private boolean _handshook = false;
85      private String _clientId;
86      private org.cometd.Listener _listener;
87      private List<RemoveListener> _rListeners;
88      private List<MessageListener> _mListeners;
89      private int _batch;
90      private boolean _formEncoded;
91      private Map<String, Cookie> _cookies = new ConcurrentHashMap<String, Cookie>();
92      private Advice _advice;
93      private Timer _timer;
94      private int _backoffInterval = 0;
95      private int _backoffIncrement = 1000;
96      private int _backoffMaxInterval = 60000;
97      private Buffer _scheme;
98      protected Extension[] _extensions;
99      protected JSON _jsonOut;
100     
101     /* ------------------------------------------------------------ */
102     public BayeuxClient(HttpClient client, String url)
103     {
104         this(client,url,null);
105     }
106     
107     /* ------------------------------------------------------------ */
108     public BayeuxClient(HttpClient client, String url, Timer timer)
109     {
110         HttpURI uri = new HttpURI(url);
111         _httpClient = client;
112         _cometdAddress = new Address(uri.getHost(),uri.getPort());
113         _path=uri.getPath();
114         _timer = timer;
115         _scheme = (HttpSchemes.HTTPS.equals(uri.getScheme()))?HttpSchemes.HTTPS_BUFFER:HttpSchemes.HTTP_BUFFER;
116     }
117     
118     /* ------------------------------------------------------------ */
119     public BayeuxClient(HttpClient client, Address address, String path, Timer timer)
120     {
121         _httpClient = client;
122         _cometdAddress = address;
123         _path = path;
124         _timer = timer;
125     }
126 
127     /* ------------------------------------------------------------ */
128     public BayeuxClient(HttpClient client, Address address, String uri)
129     {
130         this(client,address,uri,null);
131     }
132 
133     /* ------------------------------------------------------------ */
134     public void addExtension(Extension ext)
135     {
136         _extensions = (Extension[])LazyList.addToArray(_extensions,ext,Extension.class);
137     }
138 
139     /* ------------------------------------------------------------ */
140     Extension[] getExtensions()
141     {
142         return _extensions;
143     }
144     
145     /* ------------------------------------------------------------ */
146     /**
147      * If unable to connect/handshake etc, even if following the interval in the
148      * advice, wait for this interval initially, and try again.
149      * 
150      * @param interval 
151      */
152     public void setBackOffInterval(int interval)
153     {
154         _backoffInterval = interval;
155     }
156 
157     /* ------------------------------------------------------------ */
158     /**
159      * @return the backoff interval to wait before retrying an unsuccessful
160      * or failed message 
161      */
162     public int getBackoffInterval()
163     {
164         return _backoffInterval;
165     }
166 
167     /* ------------------------------------------------------------ */
168     /**
169      * @deprecated We retry an infinite number of times. 
170      * use {@link #getBackoffIncrement()} to set limits
171      */
172     public void setBackoffMaxRetries(int retries)
173     {
174     }
175 
176     /* ------------------------------------------------------------ */
177     /**
178      * @deprecated
179      */
180     public int getBackoffMaxRetries()
181     {
182         return -1;
183     }
184 
185     /* ------------------------------------------------------------ */
186     /**
187      . Each retry will increment by this 
188      * intervel, until we reach _backoffMaxInterval
189      * 
190     */
191     public void setBackoffIncrement(int interval)
192     {
193         _backoffIncrement = interval;
194     }
195 
196     /* ------------------------------------------------------------ */
197     /**
198      * @return the backoff interval used to increase the backoff time when 
199      * retrying an unsuccessful or failed message. 
200      */
201     public int getBackoffIncrement()
202     {
203         return _backoffIncrement;
204     }
205 
206     /* ------------------------------------------------------------ */
207     public void setBackoffMaxInterval(int interval)
208     {
209         _backoffMaxInterval = interval;
210     }
211     
212     public int getBackoffMaxInterval()
213     {
214         return _backoffMaxInterval;
215     }
216 
217     /* ------------------------------------------------------------ */
218     /*
219      * (non-Javadoc) Returns the clientId
220      * 
221      * @see dojox.cometd.Client#getId()
222      */
223     public String getId()
224     {
225         return _clientId;
226     }
227 
228     /* ------------------------------------------------------------ */
229     protected void doStart() throws Exception
230     {
231         if (!_httpClient.isStarted())
232             throw new IllegalStateException("!HttpClient.isStarted()");
233             
234         synchronized (_httpClient)
235         {
236             if (_jsonOut == null)
237             {
238                 _jsonOut = (JSON)_httpClient.getAttribute(__JSON);
239                 if (_jsonOut==null)
240                 {
241                     _jsonOut = new JSON();
242                     _httpClient.setAttribute(__JSON,_jsonOut);
243                 }
244             }
245             
246             if (_timer == null)
247             {
248                 _timer = (Timer)_httpClient.getAttribute(__TIMER);
249                 if (_timer==null)
250                 {
251                     _timer = new Timer(__TIMER+"@"+hashCode(),true);
252                     _httpClient.setAttribute(__TIMER,_timer);
253                 }
254             }
255             
256             if (_msgPool == null)
257             {
258                 _msgPool = (MessagePool)_httpClient.getAttribute(__MSGPOOL);
259                 if (_msgPool==null)
260                 {
261                     _msgPool = new MessagePool();
262                     _httpClient.setAttribute(__MSGPOOL,_msgPool);
263                 }
264             }
265         }
266         _disconnecting=false;
267         _pull=null;
268         _push=null;
269         super.doStart();
270         synchronized (_outQ)
271         {
272             if (!_initialized && _pull == null)
273             {
274                 _pull = new Handshake();
275                 send((Exchange)_pull,false);
276             }
277         }
278     }
279 
280     /* ------------------------------------------------------------ */
281     protected void doStop() throws Exception
282     {
283         if (!_disconnecting)
284             disconnect();
285         super.doStop();
286     }
287 
288     /* ------------------------------------------------------------ */
289     public boolean isPolling()
290     {
291         synchronized (_outQ)
292         {
293             return isRunning() && (_pull != null);
294         }
295     }
296 
297     /* ------------------------------------------------------------ */
298     /**
299      * (non-Javadoc)
300      */
301     public void deliver(Client from, Message message)
302     {
303         if (!isRunning())
304             throw new IllegalStateException("Not running");
305 
306         synchronized (_inQ)
307         {
308             if (_mListeners == null)
309                 _inQ.add(message);
310             else
311             {
312                 for (MessageListener l : _mListeners)
313                     l.deliver(from,this,message);
314             }
315         }
316     }
317 
318     /* ------------------------------------------------------------ */
319     /*
320      * (non-Javadoc)
321      * 
322      * @see dojox.cometd.Client#deliver(dojox.cometd.Client, java.lang.String,
323      * java.lang.Object, java.lang.String)
324      */
325     public void deliver(Client from, String toChannel, Object data, String id)
326     {
327         if (!isRunning())
328             throw new IllegalStateException("Not running");
329 
330         MessageImpl message = _msgPool.newMessage();
331 
332         message.put(Bayeux.CHANNEL_FIELD,toChannel);
333         message.put(Bayeux.DATA_FIELD,data);
334         if (id != null)
335             message.put(Bayeux.ID_FIELD,id);
336 
337         synchronized (_inQ)
338         {
339             if (_mListeners == null)
340             {
341                 message.incRef();
342                 _inQ.add(message);
343             }
344             else
345             {
346                 for (MessageListener l : _mListeners)
347                     if (l instanceof MessageListener.Synchronous)
348                         l.deliver(from,this,message);
349             }
350         }
351         if (_mListeners !=null)
352             for (MessageListener l : _mListeners)
353                 if (!(l instanceof MessageListener.Synchronous))
354                 l.deliver(from,this,message);
355         message.decRef();
356     }
357 
358     /* ------------------------------------------------------------ */
359     /**
360      * @deprecated
361      */
362     public org.cometd.Listener getListener()
363     {
364         synchronized (_inQ)
365         {
366             return _listener;
367         }
368     }
369 
370     /* ------------------------------------------------------------ */
371     /*
372      * (non-Javadoc)
373      * 
374      * @see dojox.cometd.Client#hasMessages()
375      */
376     public boolean hasMessages()
377     {
378         synchronized (_inQ)
379         {
380             return _inQ.size() > 0;
381         }
382     }
383 
384     /* ------------------------------------------------------------ */
385     /*
386      * (non-Javadoc)
387      * 
388      * @see dojox.cometd.Client#isLocal()
389      */
390     public boolean isLocal()
391     {
392         return false;
393     }
394 
395     /* ------------------------------------------------------------ */
396     /*
397      * (non-Javadoc)
398      * 
399      * @see dojox.cometd.Client#subscribe(java.lang.String)
400      */
401     private void publish(MessageImpl msg)
402     {
403         msg.incRef();
404         synchronized (_outQ)
405         {
406             _outQ.add(msg);
407 
408             if (_batch == 0 && _initialized && _push == null)
409             {
410                 _push = new Publish();
411                 try
412                 {
413                     send(_push);
414                 }
415                 catch (IOException e)
416                 {
417                     metaPublishFail(e,((Publish)_push).getOutboundMessages());
418                 }
419                 catch (IllegalStateException e)
420                 {
421                     metaPublishFail(e,((Publish)_push).getOutboundMessages());
422                 }
423             }
424         }
425     }
426 
427     /* ------------------------------------------------------------ */
428     /*
429      * (non-Javadoc)
430      * 
431      * @see dojox.cometd.Client#publish(java.lang.String, java.lang.Object,
432      * java.lang.String)
433      */
434     public void publish(String toChannel, Object data, String msgId)
435     {
436         if (!isRunning() || _disconnecting)
437             throw new IllegalStateException("Not running");
438 
439         MessageImpl msg = _msgPool.newMessage();
440         msg.put(Bayeux.CHANNEL_FIELD,toChannel);
441         msg.put(Bayeux.DATA_FIELD,data);
442         if (msgId != null)
443             msg.put(Bayeux.ID_FIELD,msgId);
444         publish(msg);
445         msg.decRef();
446     }
447 
448     /* ------------------------------------------------------------ */
449     /*
450      * (non-Javadoc)
451      * 
452      * @see dojox.cometd.Client#subscribe(java.lang.String)
453      */
454     public void subscribe(String toChannel)
455     {
456         if (!isRunning() || _disconnecting)
457             throw new IllegalStateException("Not running");
458 
459         MessageImpl msg = _msgPool.newMessage();
460         msg.put(Bayeux.CHANNEL_FIELD,Bayeux.META_SUBSCRIBE);
461         msg.put(Bayeux.SUBSCRIPTION_FIELD,toChannel);
462         publish(msg);
463         msg.decRef();
464     }
465 
466     /* ------------------------------------------------------------ */
467     /*
468      * (non-Javadoc)
469      * 
470      * @see dojox.cometd.Client#unsubscribe(java.lang.String)
471      */
472     public void unsubscribe(String toChannel)
473     {
474         if (!isRunning() || _disconnecting)
475             throw new IllegalStateException("Not running");
476 
477         MessageImpl msg = _msgPool.newMessage();
478         msg.put(Bayeux.CHANNEL_FIELD,Bayeux.META_UNSUBSCRIBE);
479         msg.put(Bayeux.SUBSCRIPTION_FIELD,toChannel);
480         publish(msg);
481         msg.decRef();
482     }
483 
484     /* ------------------------------------------------------------ */
485     /**
486      * Disconnect this client.
487      * @deprecated use {@link #disconnect()}
488      */
489     public void remove()
490     {
491         disconnect();
492     }
493     
494     /* ------------------------------------------------------------ */
495     /**
496      * Disconnect this client.
497      */
498     public void disconnect()
499     {
500         if (isStopped() || _disconnecting)
501             throw new IllegalStateException("Not running");
502 
503         MessageImpl msg = _msgPool.newMessage();
504         msg.put(Bayeux.CHANNEL_FIELD,Bayeux.META_DISCONNECT);
505 
506         synchronized (_outQ)
507         {
508             _outQ.add(msg);
509             _disconnecting = true;
510             if (_batch == 0 && _initialized && _push == null)
511             {
512                 _push = new Publish();
513                 try
514                 {
515                     send(_push);
516                 }
517                 catch (IOException e)
518                 {
519                     Log.warn(e.toString());
520                     Log.debug(e);
521                     send(_push,true);
522                 }
523             }
524             _initialized = false;
525         }
526     }
527 
528     /* ------------------------------------------------------------ */
529     /**
530      * @deprecated
531      */
532     public void setListener(org.cometd.Listener listener)
533     {
534         synchronized (_inQ)
535         {
536             if (_listener != null)
537                 removeListener(_listener);
538             _listener = listener;
539             if (_listener != null)
540                 addListener(_listener);
541         }
542     }
543 
544     /* ------------------------------------------------------------ */
545     /*
546      * (non-Javadoc) Removes all available messages from the inbound queue. If a
547      * listener is set then messages are not queued.
548      * 
549      * @see dojox.cometd.Client#takeMessages()
550      */
551     public List<Message> takeMessages()
552     {
553         final LinkedList<Message> list;
554         synchronized (_inQ)
555         {
556             list = new LinkedList<Message>(_inQ);
557             _inQ.clear();
558         }
559         for (Message m : list)
560             if (m instanceof MessageImpl)
561                 ((MessageImpl)m).decRef();
562         return list;
563     }
564 
565     /* ------------------------------------------------------------ */
566     /*
567      * (non-Javadoc)
568      * 
569      * @see dojox.cometd.Client#endBatch()
570      */
571     public void endBatch()
572     {
573         synchronized (_outQ)
574         {
575             if (--_batch <= 0)
576             {
577                 _batch = 0;
578                 if ((_initialized || _disconnecting) && _push == null && _outQ.size() > 0)
579                 {
580                     _push = new Publish();
581                     try
582                     {
583                         send(_push);
584                     }
585                     catch (IOException e)
586                     {
587                         metaPublishFail(e,((Publish)_push).getOutboundMessages());
588                     }
589                 }
590             }
591         }
592     }
593 
594     /* ------------------------------------------------------------ */
595     /*
596      * (non-Javadoc)
597      * 
598      * @see dojox.cometd.Client#startBatch()
599      */
600     public void startBatch()
601     {
602         if (isStopped())
603             throw new IllegalStateException("Not running");
604 
605         synchronized (_outQ)
606         {
607             _batch++;
608         }
609     }
610 
611     /* ------------------------------------------------------------ */
612     /**
613      * Customize an Exchange. Called when an exchange is about to be sent to
614      * allow Cookies and Credentials to be customized. Default implementation
615      * sets any cookies
616      */
617     protected void customize(HttpExchange exchange)
618     {
619         StringBuilder buf = null;
620         for (Cookie cookie : _cookies.values())
621         {
622             if (buf == null)
623                 buf = new StringBuilder();
624             else
625                 buf.append("; ");
626             buf.append(cookie.getName()); // TODO quotes
627             buf.append("=");
628             buf.append(cookie.getValue()); // TODO quotes
629         }
630         if (buf != null)
631             exchange.setRequestHeader(HttpHeaders.COOKIE,buf.toString());
632       
633         if (_scheme!=null)
634             exchange.setScheme(_scheme);
635     }
636 
637     /* ------------------------------------------------------------ */
638     public void setCookie(Cookie cookie)
639     {
640         _cookies.put(cookie.getName(),cookie);
641     }
642 
643     /* ------------------------------------------------------------ */
644     /* ------------------------------------------------------------ */
645     /* ------------------------------------------------------------ */
646     /**
647      * The base class for all bayeux exchanges.
648      */
649     protected class Exchange extends ContentExchange
650     {
651         Message[] _responses;
652         int _connectFailures;
653         int _backoff = _backoffInterval;
654         String _json;
655 
656         /* ------------------------------------------------------------ */
657         Exchange(String info)
658         {
659             setMethod("POST");
660             setScheme(HttpSchemes.HTTP_BUFFER);
661             setAddress(_cometdAddress);
662             setURI(_path + "/" + info);
663             setRequestContentType(_formEncoded?"application/x-www-form-urlencoded;charset=utf-8":"text/json;charset=utf-8");
664         }
665 
666         /* ------------------------------------------------------------ */
667         public int getBackoff()
668         {
669             return _backoff;
670         }
671 
672         /* ------------------------------------------------------------ */
673         public void incBackoff()
674         {
675             _backoff = Math.min(_backoff+_backoffIncrement,_backoffMaxInterval);
676         }
677 
678         /* ------------------------------------------------------------ */
679         protected void setMessage(String message)
680         {
681             message=extendOut(message);
682             setJson(message);
683         }
684 
685         /* ------------------------------------------------------------ */
686         protected void setJson(String json)
687         {
688             try
689             {
690                 _json = json;
691 
692                 if (_formEncoded)
693                     setRequestContent(new ByteArrayBuffer("message=" + URLEncoder.encode(_json,"utf-8")));
694                 else
695                     setRequestContent(new ByteArrayBuffer(_json,"utf-8"));
696             }
697             catch (Exception e)
698             {
699                 Log.ignore(e);
700                 setRequestContent(new ByteArrayBuffer(_json));
701             }
702         }
703 
704         /* ------------------------------------------------------------ */
705         protected void onResponseStatus(Buffer version, int status, Buffer reason) throws IOException
706         {
707             super.onResponseStatus(version,status,reason);
708         }
709 
710         /* ------------------------------------------------------------ */
711         protected void onResponseHeader(Buffer name, Buffer value) throws IOException
712         {
713             super.onResponseHeader(name,value);
714             if (!isRunning())
715                 return;
716 
717             if (HttpHeaders.CACHE.getOrdinal(name) == HttpHeaders.SET_COOKIE_ORDINAL)
718             {
719                 String cname = null;
720                 String cvalue = null;
721 
722                 QuotedStringTokenizer tok = new QuotedStringTokenizer(value.toString(),"=;",false,false);
723                 tok.setSingle(false);
724 
725                 if (tok.hasMoreElements())
726                     cname = tok.nextToken();
727                 if (tok.hasMoreElements())
728                     cvalue = tok.nextToken();
729 
730                 Cookie cookie = new Cookie(cname,cvalue);
731 
732                 while (tok.hasMoreTokens())
733                 {
734                     String token = tok.nextToken();
735                     if ("Version".equalsIgnoreCase(token))
736                         cookie.setVersion(Integer.parseInt(tok.nextToken()));
737                     else if ("Comment".equalsIgnoreCase(token))
738                         cookie.setComment(tok.nextToken());
739                     else if ("Path".equalsIgnoreCase(token))
740                         cookie.setPath(tok.nextToken());
741                     else if ("Domain".equalsIgnoreCase(token))
742                         cookie.setDomain(tok.nextToken());
743                     else if ("Expires".equalsIgnoreCase(token))
744                     {
745                         tok.nextToken();
746                         // TODO
747                     }
748                     else if ("Max-Age".equalsIgnoreCase(token))
749                     {
750                         tok.nextToken();
751                         // TODO
752                     }
753                     else if ("Secure".equalsIgnoreCase(token))
754                         cookie.setSecure(true);
755                 }
756 
757                 BayeuxClient.this.setCookie(cookie);
758             }
759         }
760 
761         /* ------------------------------------------------------------ */
762         protected void onResponseComplete() throws IOException
763         {
764             if (!isRunning())
765                 return;
766 
767             super.onResponseComplete();
768 
769             if (getResponseStatus() == 200)
770             {
771                 String content = getResponseContent();
772                 // TODO
773                 if (content == null || content.length() == 0)
774                     throw new IllegalStateException();
775                 _responses = _msgPool.parse(content);
776                 
777                 if (_responses!=null)
778                     for (int i=0;i<_responses.length;i++)
779                         extendIn(_responses[i]);
780             }
781         }
782 
783         /* ------------------------------------------------------------ */
784         protected void resend(boolean backoff)
785         {
786             if (!isRunning())
787                 return;
788 
789             final boolean disconnecting;
790             synchronized (_outQ)
791             {
792                 disconnecting=_disconnecting;
793             }
794             if (disconnecting)
795             {
796                 try{stop();}catch(Exception e){Log.ignore(e);}
797                 return;
798             }
799             
800             setJson(_json);
801             if (!send(this,backoff))
802                 Log.warn("Retries exhausted"); // giving up
803         }
804         
805         /* ------------------------------------------------------------ */
806         protected void recycle()
807         {
808             if (_responses!=null)
809                 for (Message msg:_responses)
810                     if (msg instanceof MessageImpl)
811                         ((MessageImpl)msg).decRef();
812             _responses=null;
813         }
814     }
815 
816     /* ------------------------------------------------------------ */
817     /**
818      * The Bayeux handshake exchange. Negotiates a client Id and initializes the
819      * protocol.
820      * 
821      */
822     protected class Handshake extends Exchange
823     {
824         public final static String __HANDSHAKE = "[{" + "\"channel\":\"/meta/handshake\"," + "\"version\":\"0.9\"," + "\"minimumVersion\":\"0.9\"" + "}]";
825 
826         Handshake()
827         {
828             super("handshake");
829             setMessage(__HANDSHAKE);
830         }
831 
832         /* ------------------------------------------------------------ */
833         /*
834          * (non-Javadoc)
835          * 
836          * @see
837          * org.mortbay.cometd.client.BayeuxClient.Exchange#onResponseComplete()
838          */
839         protected void onResponseComplete() throws IOException
840         {
841             super.onResponseComplete();
842 
843             if (!isRunning())
844                 return;    
845             
846             if (_disconnecting)
847             {
848                 Message error=_msgPool.newMessage();
849                 error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
850                 error.put("failure","expired");
851                 metaHandshake(false,false,error);
852                 try{stop();}catch(Exception e){Log.ignore(e);}
853                 return;
854             }
855 
856             if (getResponseStatus() == 200 && _responses != null && _responses.length > 0)
857             {
858                 MessageImpl response = (MessageImpl)_responses[0];
859                 boolean successful = response.isSuccessful();
860 
861                 // Get advice if there is any
862                 Map adviceField = (Map)response.get(Bayeux.ADVICE_FIELD);
863                 if (adviceField != null)
864                     _advice = new Advice(adviceField);
865 
866                 if (successful)
867                 {
868                     _handshook = true;
869                     if (Log.isDebugEnabled())
870                         Log.debug("Successful handshake, sending connect");
871                     _clientId = (String)response.get(Bayeux.CLIENT_FIELD);
872 
873                     metaHandshake(true,_handshook,response);
874                     _pull = new Connect();
875                     send(_pull,false);
876                 }
877                 else
878                 {
879                     metaHandshake(false,false,response);
880                     _handshook = false;
881                     if (_advice != null && _advice.isReconnectNone())
882                         throw new IOException("Handshake failed with advice reconnect=none :" + _responses[0]);
883                     else if (_advice != null && _advice.isReconnectHandshake())
884                     {
885                         _pull = new Handshake();
886                         if (!send(_pull,true))
887                             throw new IOException("Handshake, retries exhausted");
888                     }
889                     else
890                     // assume retry = reconnect?
891                     {
892                         _pull = new Connect();
893                         if (!send(_pull,true))
894                             throw new IOException("Connect after handshake, retries exhausted");
895                     }
896                 }
897             }
898             else
899             {
900                 Message error=_msgPool.newMessage();
901                 error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
902                 error.put("status",new Integer(getResponseStatus()));
903                 error.put("content",getResponseContent());
904                 
905                 metaHandshake(false,false,error);
906                 resend(true);
907             }
908             
909             recycle();
910         }
911 
912         /* ------------------------------------------------------------ */
913         protected void onExpire()
914         {
915             // super.onExpire();
916             Message error=_msgPool.newMessage();
917             error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
918             error.put("failure","expired");
919             metaHandshake(false,false,error);
920             resend(true);
921         }
922 
923         /* ------------------------------------------------------------ */
924         protected void onConnectionFailed(Throwable ex)
925         {
926             // super.onConnectionFailed(ex);
927             Message error=_msgPool.newMessage();
928             error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
929             error.put("failure",ex.toString());
930             error.put("exception",ex);
931             ex.printStackTrace();
932             metaHandshake(false,false,error);
933             resend(true);
934         }
935 
936         /* ------------------------------------------------------------ */
937         protected void onException(Throwable ex)
938         {
939             // super.onException(ex);
940             Message error=_msgPool.newMessage();
941             error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
942             error.put("failure",ex.toString());
943             error.put("exception",ex);
944             metaHandshake(false,false,error);
945             resend(true);
946         }
947     }
948 
949     /* ------------------------------------------------------------ */
950     /**
951      * The Bayeux Connect exchange. Connect exchanges implement the long poll
952      * for Bayeux.
953      */
954     protected class Connect extends Exchange
955     {
956         String _connectString;
957 
958         Connect()
959         {
960             super("connect");
961             _connectString = "{" + "\"channel\":\"/meta/connect\"," + "\"clientId\":\"" + _clientId + "\"," + "\"connectionType\":\"long-polling\"" + "}";
962             setMessage(_connectString);
963         }
964 
965         protected void onResponseComplete() throws IOException
966         {
967             super.onResponseComplete();
968             if (!isRunning())
969                 return;
970 
971             if (getResponseStatus() == 200 && _responses != null && _responses.length > 0)
972             {
973                 try
974                 {
975                     startBatch();
976 
977                     for (int i = 0; i < _responses.length; i++)
978                     {
979                         Message msg = _responses[i];
980 
981                         // get advice if there is any
982                         Map adviceField = (Map)msg.get(Bayeux.ADVICE_FIELD);
983                         if (adviceField != null)
984                             _advice = new Advice(adviceField);
985 
986                         if (Bayeux.META_CONNECT.equals(msg.get(Bayeux.CHANNEL_FIELD)))
987                         {
988                             Boolean successful = (Boolean)msg.get(Bayeux.SUCCESSFUL_FIELD);
989                             if (successful != null && successful.booleanValue())
990                             {                               
991                                 metaConnect(true,msg);
992 
993                                 if (!isRunning())
994                                     break;
995                                 
996                                 synchronized (_outQ)
997                                 {
998                                     if (_disconnecting)
999                                         continue;
1000                                     
1001                                     if (!isInitialized())
1002                                     {
1003                                         setInitialized(true);
1004                                         {
1005                                             if (_outQ.size() > 0)
1006                                             {
1007                                                 _push = new Publish();
1008                                                 send(_push);
1009                                             }
1010                                         }
1011                                     }
1012 
1013                                 }
1014                                 // send a Connect (ie longpoll) possibly with
1015                                 // delay according to interval advice                                
1016                                 _pull = new Connect();
1017                                 send(_pull,false);
1018                             }
1019                             else
1020                             {
1021                                 // received a failure to our connect message,
1022                                 // check the advice to see what to do:
1023                                 // reconnect: none = hard error
1024                                 // reconnect: handshake = send a handshake
1025                                 // message
1026                                 // reconnect: retry = send another connect,
1027                                 // possibly using interval
1028 
1029                                 setInitialized(false);
1030                                 metaConnect(false,msg);
1031                                 
1032                                 synchronized(_outQ)
1033                                 {
1034                                     if (!isRunning()||_disconnecting)
1035                                         break;
1036                                 }
1037                                 
1038                                 if (_advice != null && _advice.isReconnectNone())
1039                                     throw new IOException("Connect failed, advice reconnect=none");
1040                                 else if (_advice != null && _advice.isReconnectHandshake())
1041                                 {
1042                                     if (Log.isDebugEnabled())
1043                                         Log.debug("connect received success=false, advice is to rehandshake");
1044                                     _pull = new Handshake();
1045                                     send(_pull,true);
1046                                 }
1047                                 else
1048                                 {
1049                                     // assume retry = reconnect
1050                                     if (Log.isDebugEnabled())
1051                                         Log.debug("Assuming retry=reconnect");
1052                                     resend(true);
1053                                 }
1054                             }
1055                         }
1056                         deliver(null,msg);
1057                     }
1058                 }
1059                 finally
1060                 {
1061                     endBatch();
1062                 }
1063             }
1064             else
1065             {
1066                 Message error=_msgPool.newMessage();
1067                 error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
1068                 error.put("status",getResponseStatus());
1069                 error.put("content",getResponseContent());
1070                 metaConnect(false,error);
1071                 resend(true);
1072             }
1073 
1074             recycle();
1075         }
1076 
1077         /* ------------------------------------------------------------ */
1078         protected void onExpire()
1079         {
1080             // super.onExpire();
1081             setInitialized(false);
1082             Message error=_msgPool.newMessage();
1083             error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
1084             error.put("failure","expired");
1085             metaConnect(false,error);
1086             resend(true);
1087         }
1088 
1089         /* ------------------------------------------------------------ */
1090         protected void onConnectionFailed(Throwable ex)
1091         {
1092             // super.onConnectionFailed(ex);
1093             setInitialized(false);
1094             Message error=_msgPool.newMessage();
1095             error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
1096             error.put("failure",ex.toString());
1097             error.put("exception",ex);
1098             metaConnect(false,error);
1099             resend(true);
1100         }
1101 
1102         /* ------------------------------------------------------------ */
1103         protected void onException(Throwable ex)
1104         {
1105             // super.onException(ex);
1106             setInitialized(false);
1107             Message error=_msgPool.newMessage();
1108             error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
1109             error.put("failure",ex.toString());
1110             error.put("exception",ex);
1111             metaConnect(false,error);
1112             resend(true);
1113         }
1114     }
1115 
1116     /* ------------------------------------------------------------ */
1117     /**
1118      * Publish message exchange. Sends messages to bayeux server and handles any
1119      * messages received as a result.
1120      */
1121     protected class Publish extends Exchange
1122     {
1123         Publish()
1124         {
1125             super("publish");
1126             
1127             StringBuffer json = new StringBuffer(256);
1128             synchronized (json)
1129             {
1130                 synchronized (_outQ)
1131                 {
1132                     int s=_outQ.size();
1133                     if (s == 0)
1134                         return;
1135 
1136                     for (int i=0;i<s;i++)
1137                     {
1138                         Message message = _outQ.getUnsafe(i);
1139                         message.put(Bayeux.CLIENT_FIELD,_clientId);
1140                         extendOut(message);
1141 
1142                         json.append(i==0?'[':',');
1143                         _jsonOut.append(json,message);
1144 
1145                         if (message instanceof MessageImpl)
1146                             ((MessageImpl)message).decRef();
1147                     }
1148                     json.append(']');
1149                     _outQ.clear();
1150                     setJson(json.toString());
1151                 }
1152             }
1153         }
1154 
1155         protected Message[] getOutboundMessages()
1156         {
1157             try
1158             {
1159                 return _msgPool.parse(_json);
1160             }
1161             catch (IOException e)
1162             {
1163                 Log.warn("Error converting outbound messages");
1164                 if (Log.isDebugEnabled())
1165                     Log.debug(e);
1166                 return null;
1167             }
1168         }
1169 
1170         /* ------------------------------------------------------------ */
1171         /*
1172          * (non-Javadoc)
1173          * 
1174          * @see
1175          * org.mortbay.cometd.client.BayeuxClient.Exchange#onResponseComplete()
1176          */
1177         protected void onResponseComplete() throws IOException
1178         {
1179             if (!isRunning())
1180                 return;
1181             
1182             super.onResponseComplete();
1183             try
1184             {
1185                 synchronized (_outQ)
1186                 {
1187                     startBatch();
1188                     _push = null;
1189                 }
1190 
1191                 if (getResponseStatus() == 200 && _responses != null && _responses.length > 0)
1192                 {
1193                     for (int i = 0; i < _responses.length; i++)
1194                     {
1195                         MessageImpl msg = (MessageImpl)_responses[i];
1196                             
1197                         deliver(null,msg);
1198                         if (Bayeux.META_DISCONNECT.equals(msg.getChannel())&&msg.isSuccessful())
1199                         {
1200                             if (isStarted())
1201                             {
1202                                 try{stop();}catch(Exception e){Log.ignore(e);}
1203                             }
1204                             break;
1205                         }
1206                     }
1207                 }
1208                 else
1209                 {
1210                     Log.warn("Publish, error=" + getResponseStatus());
1211                 }
1212             }
1213             finally
1214             {
1215                 endBatch();
1216             }
1217             recycle();
1218         }
1219 
1220         /* ------------------------------------------------------------ */
1221         protected void onExpire()
1222         {
1223             super.onExpire();
1224             metaPublishFail(null,this.getOutboundMessages());
1225             if (_disconnecting)
1226             {
1227                 try{stop();}catch(Exception e){Log.ignore(e);}
1228             }
1229         }
1230 
1231         /* ------------------------------------------------------------ */
1232         protected void onConnectionFailed(Throwable ex)
1233         {
1234             super.onConnectionFailed(ex);
1235             metaPublishFail(ex,this.getOutboundMessages());
1236             if (_disconnecting)
1237             {
1238                 try{stop();}catch(Exception e){Log.ignore(e);}
1239             }
1240         }
1241 
1242         /* ------------------------------------------------------------ */
1243         protected void onException(Throwable ex)
1244         {
1245             super.onException(ex);
1246             metaPublishFail(ex,this.getOutboundMessages());
1247             if (_disconnecting)
1248             {
1249                 try{stop();}catch(Exception e){Log.ignore(e);}
1250             }
1251         }
1252     }
1253 
1254     /* ------------------------------------------------------------ */
1255     public void addListener(ClientListener listener)
1256     {
1257         synchronized (_inQ)
1258         {
1259             boolean added=false;
1260             if (listener instanceof MessageListener)
1261             {
1262                 added=true;
1263                 if (_mListeners == null)
1264                     _mListeners = new ArrayList<MessageListener>();
1265                 _mListeners.add((MessageListener)listener);
1266             }
1267             if (listener instanceof RemoveListener)
1268             {
1269                 added=true;
1270                 if (_rListeners == null)
1271                     _rListeners = new ArrayList<RemoveListener>();
1272                 _rListeners.add((RemoveListener)listener);
1273             }
1274             
1275             if (!added)
1276                 throw new IllegalArgumentException();
1277         }
1278     }
1279 
1280     /* ------------------------------------------------------------ */
1281     public void removeListener(ClientListener listener)
1282     {
1283         synchronized (_inQ)
1284         {
1285             if (listener instanceof MessageListener)
1286             {
1287                 if (_mListeners != null)
1288                     _mListeners.remove((MessageListener)listener);
1289             }
1290             if (listener instanceof RemoveListener)
1291             {
1292                 if (_rListeners != null)
1293                     _rListeners.remove((RemoveListener)listener);
1294             }
1295         }
1296     }
1297 
1298     /* ------------------------------------------------------------ */
1299     public int getMaxQueue()
1300     {
1301         return -1;
1302     }
1303 
1304     /* ------------------------------------------------------------ */
1305     public Queue<Message> getQueue()
1306     {
1307         return _inQ;
1308     }
1309 
1310     /* ------------------------------------------------------------ */
1311     public void setMaxQueue(int max)
1312     {
1313         if (max != -1)
1314             throw new UnsupportedOperationException();
1315     }
1316 
1317     /* ------------------------------------------------------------ */
1318     /**
1319      * Send the exchange, possibly using a backoff.
1320      * 
1321      * @param exchange
1322      * @param backoff
1323      *            if true, use backoff algorithm to send
1324      * @return
1325      */
1326     protected boolean send(final Exchange exchange, final boolean backoff)
1327     {
1328         long interval = (_advice != null?_advice.getInterval():0);
1329 
1330         if (backoff)
1331         {
1332             int backoffInterval = exchange.getBackoff();
1333             if (Log.isDebugEnabled())
1334                 Log.debug("Send with backoff, interval=" + backoffInterval + " for " + exchange);
1335 
1336             exchange.incBackoff();
1337 
1338             interval += backoffInterval;
1339         }
1340 
1341         if (interval > 0)
1342         {
1343             TimerTask task = new TimerTask()
1344             {
1345                 public void run()
1346                 {
1347                     try
1348                     {
1349                         send(exchange);
1350                     }
1351                     catch (IOException e)
1352                     {
1353                         Log.warn("Delayed send, retry: "+e);
1354                         Log.debug(e);
1355                         send(exchange,true);
1356                     }
1357                     catch (IllegalStateException e)
1358                     {
1359                         Log.warn("Delayed send, retry: "+e);
1360                         Log.debug(e);
1361                         send(exchange,true);
1362                     }
1363                 }
1364             };
1365             if (Log.isDebugEnabled())
1366                 Log.debug("Delay " + interval + " send of " + exchange);
1367             _timer.schedule(task,interval);
1368         }
1369         else
1370         {
1371             try
1372             {
1373                 send(exchange);
1374             }
1375             catch (IOException e)
1376             {
1377                 Log.warn("Send, retry on fail: "+e);
1378                 Log.debug(e);
1379                 return send(exchange,true);
1380             }
1381             catch (IllegalStateException e)
1382             {
1383                 Log.warn("Send, retry on fail: "+e);
1384                 Log.debug(e);
1385                 return send(exchange,true);
1386             }
1387         }
1388         return true;
1389 
1390     }
1391 
1392     /* ------------------------------------------------------------ */
1393     /**
1394      * Send the exchange.
1395      * 
1396      * @param exchange
1397      * @throws IOException
1398      */
1399     protected void send(HttpExchange exchange) throws IOException
1400     {
1401         exchange.reset(); // ensure at start state
1402         customize(exchange);
1403         if (Log.isDebugEnabled())
1404             Log.debug("Send: using any connection=" + exchange);
1405         _httpClient.send(exchange); // use any connection
1406     }
1407 
1408     /* ------------------------------------------------------------ */
1409     /**
1410      * False when we have received a success=false message in response to a
1411      * Connect, or we have had an exception when sending or receiving a Connect.
1412      * 
1413      * True when handshake and then connect has happened.
1414      * 
1415      * @param b
1416      */
1417     protected void setInitialized(boolean b)
1418     {
1419         synchronized (_outQ)
1420         {
1421             _initialized = b;
1422         }
1423     }
1424 
1425     /* ------------------------------------------------------------ */
1426     protected boolean isInitialized()
1427     {
1428         return _initialized;
1429     }
1430 
1431     /* ------------------------------------------------------------ */
1432     /**
1433      * Called with the results of a /meta/connect message
1434      * @param success connect was returned with this status
1435      */
1436     protected void metaConnect(boolean success, Message message)
1437     {
1438         if (!success)
1439             Log.warn(this.toString()+" "+message.toString());
1440     }
1441 
1442     /* ------------------------------------------------------------ */
1443     /**
1444      * Called with the results of a /meta/handshake message
1445      * @param success connect was returned with this status
1446      * @param reestablish the client was previously connected.
1447      */
1448     protected void metaHandshake(boolean success, boolean reestablish, Message message)
1449     {
1450         if (!success)
1451             Log.warn(this.toString()+" "+message.toString());
1452     }
1453 
1454     /* ------------------------------------------------------------ */
1455     /**
1456      * Called with the results of a failed publish
1457      */
1458     protected void metaPublishFail(Throwable e, Message[] messages)
1459     {
1460         Log.warn(this.toString()+": "+e);
1461         Log.debug(e);
1462     }
1463 
1464     /* ------------------------------------------------------------ */
1465     /** Called to extend outbound string messages.
1466      * Some messages are sent as preformatted JSON strings (eg handshake
1467      * and connect messages).  This extendOut method is a variation of the
1468      * {@link #extendOut(Message)} method to efficiently cater for these
1469      * preformatted strings.
1470      * <p>
1471      * This method calls the {@link Extension}s added by {@link #addExtension(Extension)}
1472      * 
1473      * @param msg
1474      * @return the extended message
1475      */
1476     protected String extendOut(String msg)
1477     {
1478         if (_extensions==null)
1479             return msg;
1480         
1481         try
1482         {
1483             Message[] messages = _msgPool.parse(msg);
1484             for (int i=0; i<messages.length; i++)
1485                 extendOut(messages[i]);
1486             if (messages.length==1 && msg.charAt(0)=='{')
1487                 return _msgPool.getMsgJSON().toJSON(messages[0]);
1488             return _msgPool.getMsgJSON().toJSON(messages);
1489         }
1490         catch(IOException e)
1491         {
1492             Log.warn(e);
1493             return msg;
1494         }
1495     }
1496 
1497     /* ------------------------------------------------------------ */
1498     /** Called to extend outbound messages
1499      * <p>
1500      * This method calls the {@link Extension}s added by {@link #addExtension(Extension)}
1501      * 
1502      */
1503     protected void extendOut(Message message)
1504     {
1505         if (_extensions!=null)
1506         {
1507             Message m = message;
1508             if (m.getChannel().startsWith(Bayeux.META_SLASH))
1509                 for (int i=0;m!=null && i<_extensions.length;i++)
1510                     m=_extensions[i].sendMeta(this,m);
1511             else
1512                 for (int i=0;m!=null && i<_extensions.length;i++)
1513                     m=_extensions[i].send(this,m);
1514                 
1515             if (message!=m)
1516             {
1517                 message.clear();
1518                 if (m!=null)
1519                     for (Map.Entry<String,Object> entry:m.entrySet())
1520                         message.put(entry.getKey(),entry.getValue());
1521             }
1522         }
1523     }
1524 
1525     /* ------------------------------------------------------------ */
1526     /** Called to extend inbound messages
1527      * <p>
1528      * This method calls the {@link Extension}s added by {@link #addExtension(Extension)}
1529      * 
1530      */
1531     protected void extendIn(Message message)
1532     {
1533         if (_extensions!=null)
1534         {
1535             Message m = message;
1536             if (m.getChannel().startsWith(Bayeux.META_SLASH))
1537                 for (int i=_extensions.length;m!=null && i-->0;)
1538                     m=_extensions[i].rcvMeta(this,m);
1539             else
1540                 for (int i=_extensions.length;m!=null && i-->0;)
1541                     m=_extensions[i].rcv(this,m);
1542                 
1543             if (message!=m)
1544             {
1545                 message.clear();
1546                 if (m!=null)
1547                     for (Map.Entry<String,Object> entry:m.entrySet())
1548                         message.put(entry.getKey(),entry.getValue());
1549             }
1550         }
1551     }
1552 
1553     
1554 }