View Javadoc

1   // ========================================================================
2   // Copyright 2006 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // Licensed under the Apache License, Version 2.0 (the "License");
5   // you may not use this file except in compliance with the License.
6   // You may obtain a copy of the License at
7   // http://www.apache.org/licenses/LICENSE-2.0
8   // Unless required by applicable law or agreed to in writing, software
9   // distributed under the License is distributed on an "AS IS" BASIS,
10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  // See the License for the specific language governing permissions and
12  // limitations under the License.
13  //========================================================================
14  
15  package org.mortbay.cometd;
16  
17  import java.io.IOException;
18  import java.security.SecureRandom;
19  import java.util.ArrayList;
20  import java.util.Arrays;
21  import java.util.Collection;
22  import java.util.Collections;
23  import java.util.HashMap;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.Random;
27  import java.util.Set;
28  import java.util.concurrent.ConcurrentHashMap;
29  import java.util.concurrent.CopyOnWriteArrayList;
30  import javax.servlet.ServletContext;
31  import javax.servlet.http.HttpServletRequest;
32  
33  import org.cometd.Bayeux;
34  import org.cometd.BayeuxListener;
35  import org.cometd.Channel;
36  import org.cometd.ChannelBayeuxListener;
37  import org.cometd.Client;
38  import org.cometd.ClientBayeuxListener;
39  import org.cometd.Extension;
40  import org.cometd.Message;
41  import org.cometd.SecurityPolicy;
42  import org.mortbay.util.LazyList;
43  import org.mortbay.util.ajax.JSON;
44  
45  /* ------------------------------------------------------------ */
46  /**
47   * @author gregw
48   * @author aabeling: added JSONP transport
49   *
50   */
51  public abstract class AbstractBayeux extends MessagePool implements Bayeux
52  {
53      public static final ChannelId META_ID=new ChannelId(META);
54      public static final ChannelId META_CONNECT_ID=new ChannelId(META_CONNECT);
55      public static final ChannelId META_CLIENT_ID=new ChannelId(META_CLIENT);
56      public static final ChannelId META_DISCONNECT_ID=new ChannelId(META_DISCONNECT);
57      public static final ChannelId META_HANDSHAKE_ID=new ChannelId(META_HANDSHAKE);
58      public static final ChannelId META_PING_ID=new ChannelId(META_PING);
59      public static final ChannelId META_STATUS_ID=new ChannelId(META_STATUS);
60      public static final ChannelId META_SUBSCRIBE_ID=new ChannelId(META_SUBSCRIBE);
61      public static final ChannelId META_UNSUBSCRIBE_ID=new ChannelId(META_UNSUBSCRIBE);
62  
63      private HashMap<String,Handler> _handlers=new HashMap<String,Handler>();
64  
65      private ChannelImpl _root=new ChannelImpl("/",this);
66      private ConcurrentHashMap<String,ClientImpl> _clients=new ConcurrentHashMap<String,ClientImpl>();
67      protected SecurityPolicy _securityPolicy=new DefaultPolicy();
68      protected JSON.Literal _advice;
69      protected JSON.Literal _multiFrameAdvice;
70      protected int _adviceVersion=0;
71      protected Object _handshakeAdvice=new JSON.Literal("{\"reconnect\":\"handshake\",\"interval\":500}");
72      protected int _logLevel;
73      protected long _timeout=30000;
74      protected long _interval=0;
75      protected long _maxInterval=10000;
76      protected boolean _initialized;
77      protected ConcurrentHashMap<String,List<String>> _browser2client=new ConcurrentHashMap<String,List<String>>();
78      protected int _multiFrameInterval=-1;
79  
80      protected boolean _requestAvailable;
81      protected ThreadLocal<HttpServletRequest> _request=new ThreadLocal<HttpServletRequest>();
82  
83      transient ServletContext _context;
84      transient Random _random;
85      transient ConcurrentHashMap<String,ChannelId> _channelIdCache;
86      protected Handler _publishHandler;
87      protected Handler _metaPublishHandler;
88      protected int _maxClientQueue=-1;
89  
90      protected Extension[] _extensions;
91      protected JSON.Literal _transports=new JSON.Literal("[\"" + Bayeux.TRANSPORT_LONG_POLL + "\",\"" + Bayeux.TRANSPORT_CALLBACK_POLL + "\"]");
92      protected JSON.Literal _replyExt=new JSON.Literal("{\"ack\":\"true\"}");
93      protected List<ClientBayeuxListener> _clientListeners=new CopyOnWriteArrayList<ClientBayeuxListener>();
94      protected List<ChannelBayeuxListener> _channelListeners=new CopyOnWriteArrayList<ChannelBayeuxListener>();
95      
96      protected int _maxLazyLatency=5000;
97  
98      /* ------------------------------------------------------------ */
99      /**
100      * @param context The logLevel init parameter is used to set the logging to: 0=none, 1=info, 2=debug
101      */
102     protected AbstractBayeux()
103     {
104         _publishHandler=new PublishHandler();
105         _metaPublishHandler=new MetaPublishHandler();
106         _handlers.put(META_HANDSHAKE,new HandshakeHandler());
107         _handlers.put(META_CONNECT,new ConnectHandler());
108         _handlers.put(META_DISCONNECT,new DisconnectHandler());
109         _handlers.put(META_SUBSCRIBE,new SubscribeHandler());
110         _handlers.put(META_UNSUBSCRIBE,new UnsubscribeHandler());
111         _handlers.put(META_PING,new PingHandler());
112 
113         setTimeout(getTimeout());
114     }
115 
116     /* ------------------------------------------------------------ */
117     public void addExtension(Extension ext)
118     {
119         _extensions=(Extension[])LazyList.addToArray(_extensions,ext,Extension.class);
120     }
121 
122     /* ------------------------------------------------------------ */
123     /**
124      * @param id
125      * @return
126      */
127     public ChannelImpl getChannel(ChannelId id)
128     {
129         return _root.getChild(id);
130     }
131 
132     /* ------------------------------------------------------------ */
133     public ChannelImpl getChannel(String id)
134     {
135         ChannelId cid=getChannelId(id);
136         if (cid.depth() == 0)
137             return null;
138         return _root.getChild(cid);
139     }
140 
141     /* ------------------------------------------------------------ */
142     public Channel getChannel(String id, boolean create)
143     {
144         synchronized(this)
145         {
146             ChannelImpl channel=getChannel(id);
147 
148             if (channel == null && create)
149             {
150                 channel=new ChannelImpl(id,this);
151                 _root.addChild(channel);
152 
153                 if (isLogInfo())
154                     logInfo("newChannel: " + channel);
155             }
156             return channel;
157         }
158     }
159 
160     /* ------------------------------------------------------------ */
161     public ChannelId getChannelId(String id)
162     {
163         ChannelId cid=_channelIdCache.get(id);
164         if (cid == null)
165         {
166             // TODO shrink cache!
167             cid=new ChannelId(id);
168             _channelIdCache.put(id,cid);
169         }
170         return cid;
171     }
172 
173     /* ------------------------------------------------------------ */
174     /*
175      * (non-Javadoc)
176      * 
177      * @see org.mortbay.cometd.Bx#getClient(java.lang.String)
178      */
179     public Client getClient(String client_id)
180     {
181         synchronized(this)
182         {
183             if (client_id == null)
184                 return null;
185             Client client=_clients.get(client_id);
186             return client;
187         }
188     }
189 
190     /* ------------------------------------------------------------ */
191     public Set<String> getClientIDs()
192     {
193         return _clients.keySet();
194     }
195 
196     /* ------------------------------------------------------------ */
197     /**
198      * @return The maximum time in ms to wait between polls before timing out a
199      *         client
200      */
201     public long getMaxInterval()
202     {
203         return _maxInterval;
204     }
205 
206     /* ------------------------------------------------------------ */
207     /**
208      * @return the logLevel. 0=none, 1=info, 2=debug
209      */
210     public int getLogLevel()
211     {
212         return _logLevel;
213     }
214 
215     /* ------------------------------------------------------------ */
216     /*
217      * (non-Javadoc)
218      * 
219      * @see org.mortbay.cometd.Bx#getSecurityPolicy()
220      */
221     public SecurityPolicy getSecurityPolicy()
222     {
223         return _securityPolicy;
224     }
225 
226     /* ------------------------------------------------------------ */
227     public long getTimeout()
228     {
229         return _timeout;
230     }
231 
232     /* ------------------------------------------------------------ */
233     public long getInterval()
234     {
235         return _interval;
236     }
237 
238     /* ------------------------------------------------------------ */
239     /**
240      * @return true if published messages are directly delivered to subscribers.
241      *         False if a new message is to be created that holds only supported
242      *         fields.
243      */
244     public boolean isDirectDeliver()
245     {
246         return false;
247     }
248 
249     /* ------------------------------------------------------------ */
250     /**
251      * @deprecated
252      * @param directDeliver
253      *            true if published messages are directly delivered to
254      *            subscribers. False if a new message is to be created that
255      *            holds only supported fields.
256      */
257     public void setDirectDeliver(boolean directDeliver)
258     {
259         _context.log("directDeliver is deprecated");
260     }
261 
262     /* ------------------------------------------------------------ */
263     /**
264      * Handle a Bayeux message. This is normally only called by the bayeux
265      * servlet or a test harness.
266      *
267      * @param client
268      *            The client if known
269      * @param transport
270      *            The transport to use for the message
271      * @param message
272      *            The bayeux message.
273      */
274     public String handle(ClientImpl client, Transport transport, Message message) throws IOException
275     {
276         String channel_id=message.getChannel();
277 
278         Handler handler=(Handler)_handlers.get(channel_id);
279         if (handler != null)
280         {
281             message=extendRcvMeta(client,message);
282             handler.handle(client,transport,message);
283             _metaPublishHandler.handle(client,transport,message);
284         }
285         else if (channel_id.startsWith(META_SLASH))
286         {
287             message=extendRcvMeta(client,message);
288             _metaPublishHandler.handle(client,transport,message);
289         }
290         else
291         {
292             // non meta channel
293             handler=_publishHandler;
294             message=extendRcv(client,message);
295             handler.handle(client,transport,message);
296         }
297 
298         return channel_id;
299     }
300 
301     /* ------------------------------------------------------------ */
302     public boolean hasChannel(String id)
303     {
304         ChannelId cid=getChannelId(id);
305         return _root.getChild(cid) != null;
306     }
307 
308     /* ------------------------------------------------------------ */
309     public boolean isInitialized()
310     {
311         return _initialized;
312     }
313 
314     /* ------------------------------------------------------------ */
315     /**
316      * @return the commented
317      * @deprecated
318      */
319     public boolean isJSONCommented()
320     {
321         return false;
322     }
323 
324     /* ------------------------------------------------------------ */
325     public boolean isLogDebug()
326     {
327         return _logLevel > 1;
328     }
329 
330     /* ------------------------------------------------------------ */
331     public boolean isLogInfo()
332     {
333         return _logLevel > 0;
334     }
335 
336     /* ------------------------------------------------------------ */
337     public void logDebug(String message)
338     {
339         if (_logLevel > 1)
340             _context.log(message);
341     }
342 
343     /* ------------------------------------------------------------ */
344     public void logDebug(String message, Throwable th)
345     {
346         if (_logLevel > 1)
347             _context.log(message,th);
348     }
349 
350     /* ------------------------------------------------------------ */
351     public void logWarn(String message, Throwable th)
352     {
353         _context.log(message + ": " + th.toString());
354     }
355 
356     /* ------------------------------------------------------------ */
357     public void logWarn(String message)
358     {
359         _context.log(message);
360     }
361 
362     /* ------------------------------------------------------------ */
363     public void logInfo(String message)
364     {
365         if (_logLevel > 0)
366             _context.log(message);
367     }
368 
369     /* ------------------------------------------------------------ */
370     public Client newClient(String idPrefix)
371     {
372         ClientImpl client=new ClientImpl(this,idPrefix);
373         return client;
374     }
375 
376     /* ------------------------------------------------------------ */
377     public abstract ClientImpl newRemoteClient();
378 
379     /* ------------------------------------------------------------ */
380     /**
381      * Create new transport object for a bayeux message
382      *
383      * @param client
384      *            The client
385      * @param message
386      *            the bayeux message
387      * @return the negotiated transport.
388      */
389     public Transport newTransport(ClientImpl client, Map<?,?> message)
390     {
391         if (isLogDebug())
392             logDebug("newTransport: client=" + client + ",message=" + message);
393 
394         Transport result;
395 
396         String type = client == null ? null : client.getConnectionType();
397         if (type == null)
398         {
399             // Check if it is a connect message and we can extract the connection type
400             type = (String)message.get(Bayeux.CONNECTION_TYPE_FIELD);
401         }
402         if (type == null)
403         {
404             // Check if it is an handshake message and we can negotiate the connection type
405             Object types = message.get(Bayeux.SUPPORTED_CONNECTION_TYPES_FIELD);
406             if (types != null)
407             {
408                 List supportedTypes;
409                 if (types instanceof Object[]) supportedTypes = Arrays.asList((Object[])types);
410                 else if (types instanceof List) supportedTypes = (List)types;
411                 else if (types instanceof Map) supportedTypes = new ArrayList(((Map)types).values());
412                 else supportedTypes = Collections.emptyList();
413 
414                 if (supportedTypes.contains(Bayeux.TRANSPORT_LONG_POLL)) type = Bayeux.TRANSPORT_LONG_POLL;
415                 else if (supportedTypes.contains(Bayeux.TRANSPORT_CALLBACK_POLL)) type = Bayeux.TRANSPORT_CALLBACK_POLL;
416             }
417         }
418         if (type == null)
419         {
420             // A normal message, check if it has the jsonp parameter
421             String jsonp = (String) message.get(Bayeux.JSONP_PARAMETER);
422             type = jsonp != null ? Bayeux.TRANSPORT_CALLBACK_POLL : Bayeux.TRANSPORT_LONG_POLL;
423         }
424 
425         if (Bayeux.TRANSPORT_CALLBACK_POLL.equals(type))
426         {
427             String jsonp = (String)message.get(Bayeux.JSONP_PARAMETER);
428             if (jsonp == null) throw new IllegalArgumentException("Missing 'jsonp' field in message " + message + " for transport " + type);
429             result = new JSONPTransport(jsonp);
430         }
431         else if (Bayeux.TRANSPORT_LONG_POLL.equals(type))
432         {
433             result = new JSONTransport();
434         }
435         else
436         {
437             throw new IllegalArgumentException("Unsupported transport type " + type);
438         }
439 
440         if (isLogDebug())
441             logDebug("newTransport: result="+result);
442 
443         return result;
444     }
445 
446     /* ------------------------------------------------------------ */
447     /**
448      * Publish data to a channel. Creates a message and delivers it to the root
449      * channel.
450      *
451      * @param to
452      * @param from
453      * @param data
454      * @param msgId
455      */
456     protected void doPublish(ChannelId to, Client from, Object data, String msgId, boolean lazy)
457     {
458         final MessageImpl message=newMessage();
459         message.put(CHANNEL_FIELD,to.toString());
460 
461         if (msgId == null)
462         {
463             long id=message.hashCode() ^ (to == null?0:to.hashCode()) ^ (from == null?0:from.hashCode());
464             id=id < 0?-id:id;
465             message.put(ID_FIELD,Long.toString(id,36));
466         }
467         else
468             message.put(ID_FIELD,msgId);
469         message.put(DATA_FIELD,data);
470 
471         message.setLazy(lazy);
472 
473         final Message m=extendSendBayeux(from,message);
474 
475         if (m != null)
476             _root.doDelivery(to,from,m);
477         if (m instanceof MessageImpl)
478             ((MessageImpl)m).decRef();
479     }
480 
481     /* ------------------------------------------------------------ */
482     public boolean removeChannel(ChannelImpl channel)
483     {
484         return _root.doRemove(channel,_channelListeners);
485     }
486 
487     /* ------------------------------------------------------------ */
488     public void addChannel(ChannelImpl channel)
489     {
490         for (ChannelBayeuxListener l : _channelListeners)
491             l.channelAdded(channel);
492     }
493 
494     /* ------------------------------------------------------------ */
495     protected String newClientId(long variation, String idPrefix)
496     {
497         if (idPrefix == null)
498             return Long.toString(getRandom(),36) + Long.toString(variation,36);
499         else
500             return idPrefix + "_" + Long.toString(getRandom(),36);
501     }
502 
503     /* ------------------------------------------------------------ */
504     protected void addClient(ClientImpl client, String idPrefix)
505     {
506         while(true)
507         {
508             String id=newClientId(client.hashCode(),idPrefix);
509             client.setId(id);
510 
511             ClientImpl other=_clients.putIfAbsent(id,client);
512             if (other == null)
513             {
514                 for (ClientBayeuxListener l : _clientListeners)
515                     l.clientAdded((Client)client);
516 
517                 return;
518             }
519         }
520     }
521 
522     /* ------------------------------------------------------------ */
523     /*
524      * (non-Javadoc)
525      * 
526      * @see org.mortbay.cometd.Bx#removeClient(java.lang.String)
527      */
528     public Client removeClient(String client_id)
529     {
530         ClientImpl client;
531         synchronized(this)
532         {
533             if (client_id == null)
534                 return null;
535             client=_clients.remove(client_id);
536         }
537         if (client != null)
538         {
539             for (ClientBayeuxListener l : _clientListeners)
540                 l.clientRemoved((Client)client);
541             client.unsubscribeAll();
542         }
543         return client;
544     }
545 
546     /* ------------------------------------------------------------ */
547     /**
548      * @param ms
549      *            The maximum time in ms to wait between polls before timing out
550      *            a client
551      */
552     public void setMaxInterval(long ms)
553     {
554         _maxInterval=ms;
555     }
556 
557     /* ------------------------------------------------------------ */
558     /**
559      * @param commented the commented to set
560      */
561     public void setJSONCommented(boolean commented)
562     {
563         if (commented)
564             _context.log("JSONCommented is deprecated");
565     }
566 
567     /* ------------------------------------------------------------ */
568     /**
569      * @param logLevel
570      *            the logLevel: 0=none, 1=info, 2=debug
571      */
572     public void setLogLevel(int logLevel)
573     {
574         _logLevel=logLevel;
575     }
576 
577     /* ------------------------------------------------------------ */
578     /*
579      * (non-Javadoc)
580      *
581      * @see
582      * org.mortbay.cometd.Bx#setSecurityPolicy(org.mortbay.cometd.SecurityPolicy
583      * )
584      */
585     public void setSecurityPolicy(SecurityPolicy securityPolicy)
586     {
587         _securityPolicy=securityPolicy;
588     }
589 
590     /* ------------------------------------------------------------ */
591     public void setTimeout(long ms)
592     {
593         _timeout=ms;
594         generateAdvice();
595     }
596 
597     /* ------------------------------------------------------------ */
598     public void setInterval(long ms)
599     {
600         _interval=ms;
601         generateAdvice();
602     }
603 
604     /* ------------------------------------------------------------ */
605     /**
606      * The time a client should delay between reconnects when multiple
607      * connections from the same browser are detected. This effectively produces
608      * traditional polling.
609      *
610      * @param multiFrameInterval
611      *            the multiFrameInterval to set
612      */
613     public void setMultiFrameInterval(int multiFrameInterval)
614     {
615         _multiFrameInterval=multiFrameInterval;
616         generateAdvice();
617     }
618 
619     /* ------------------------------------------------------------ */
620     /**
621      * @return the multiFrameInterval in milliseconds
622      */
623     public int getMultiFrameInterval()
624     {
625         return _multiFrameInterval;
626     }
627 
628     /* ------------------------------------------------------------ */
629     void generateAdvice()
630     {
631         setAdvice(new JSON.Literal("{\"reconnect\":\"retry\",\"interval\":" + getInterval() + ",\"timeout\":" + getTimeout() + "}"));
632     }
633 
634     /* ------------------------------------------------------------ */
635     public void setAdvice(JSON.Literal advice)
636     {
637         synchronized(this)
638         {
639             _adviceVersion++;
640             _advice=advice;
641             _multiFrameAdvice=new JSON.Literal(JSON.toString(multiFrameAdvice(advice)));
642         }
643     }
644 
645     /* ------------------------------------------------------------ */
646     private Map<String,Object> multiFrameAdvice(JSON.Literal advice)
647     {
648         Map<String,Object> a=(Map<String,Object>)JSON.parse(_advice.toString());
649         a.put("multiple-clients",Boolean.TRUE);
650         if (_multiFrameInterval > 0)
651         {
652             a.put("reconnect","retry");
653             a.put("interval",_multiFrameInterval);
654         }
655         else
656             a.put("reconnect","none");
657         return a;
658     }
659 
660     /* ------------------------------------------------------------ */
661     public JSON.Literal getAdvice()
662     {
663         return _advice;
664     }
665 
666     /* ------------------------------------------------------------ */
667     /**
668      * @return TRUE if {@link #getCurrentRequest()} will return the current
669      *         request
670      */
671     public boolean isRequestAvailable()
672     {
673         return _requestAvailable;
674     }
675 
676     /* ------------------------------------------------------------ */
677     /**
678      * @param requestAvailable
679      *            TRUE if {@link #getCurrentRequest()} will return the current
680      *            request
681      */
682     public void setRequestAvailable(boolean requestAvailable)
683     {
684         _requestAvailable=requestAvailable;
685     }
686 
687     /* ------------------------------------------------------------ */
688     /**
689      * @return the current request if {@link #isRequestAvailable()} is true,
690      *         else null
691      */
692     public HttpServletRequest getCurrentRequest()
693     {
694         return _request.get();
695     }
696 
697     /* ------------------------------------------------------------ */
698     /**
699      * @return the current request if {@link #isRequestAvailable()} is true,
700      *         else null
701      */
702     void setCurrentRequest(HttpServletRequest request)
703     {
704         _request.set(request);
705     }
706 
707     /* ------------------------------------------------------------ */
708     public Collection<Channel> getChannels()
709     {
710         List<Channel> channels=new ArrayList<Channel>();
711         _root.getChannels(channels);
712         return channels;
713     }
714 
715     /* ------------------------------------------------------------ */
716     /**
717      * @return
718      */
719     public int getChannelCount()
720     {
721         return _root.getChannelCount();
722     }
723 
724     /* ------------------------------------------------------------ */
725     public Collection<Client> getClients()
726     {
727         synchronized(this)
728         {
729             return new ArrayList<Client>(_clients.values());
730         }
731     }
732 
733     /* ------------------------------------------------------------ */
734     /**
735      * @return
736      */
737     public int getClientCount()
738     {
739         synchronized(this)
740         {
741             return _clients.size();
742         }
743     }
744 
745     /* ------------------------------------------------------------ */
746     public boolean hasClient(String clientId)
747     {
748         synchronized(this)
749         {
750             if (clientId == null)
751                 return false;
752             return _clients.containsKey(clientId);
753         }
754     }
755 
756     /* ------------------------------------------------------------ */
757     public Channel removeChannel(String channelId)
758     {
759         Channel channel=getChannel(channelId);
760 
761         boolean removed=false;
762         if (channel != null)
763             removed=channel.remove();
764 
765         if (removed)
766             return channel;
767         else
768             return null;
769     }
770 
771     /* ------------------------------------------------------------ */
772     protected void initialize(ServletContext context)
773     {
774         synchronized(this)
775         {
776             _initialized=true;
777             _context=context;
778             try
779             {
780                 _random=SecureRandom.getInstance("SHA1PRNG");
781             }
782             catch(Exception e)
783             {
784                 context.log("Could not get secure random for ID generation",e);
785                 _random=new Random();
786             }
787             _random.setSeed(_random.nextLong() ^ hashCode() ^ System.nanoTime() ^ Runtime.getRuntime().freeMemory());
788             _channelIdCache=new ConcurrentHashMap<String,ChannelId>();
789 
790             _root.addChild(new ServiceChannel(Bayeux.SERVICE));
791 
792         }
793     }
794 
795     /* ------------------------------------------------------------ */
796     long getRandom()
797     {
798         long l=_random.nextLong();
799         return l < 0?-l:l;
800     }
801 
802     /* ------------------------------------------------------------ */
803     void clientOnBrowser(String browserId, String clientId)
804     {
805         List<String> clients=_browser2client.get(browserId);
806         if (clients == null)
807         {
808             List<String> new_clients=new CopyOnWriteArrayList<String>();
809             clients=_browser2client.putIfAbsent(browserId,new_clients);
810             if (clients == null)
811                 clients=new_clients;
812         }
813         clients.add(clientId);
814     }
815 
816     /* ------------------------------------------------------------ */
817     void clientOffBrowser(String browserId, String clientId)
818     {
819         List<String> clients=_browser2client.get(browserId);
820 
821         if (clients != null)
822             clients.remove(clientId);
823     }
824 
825     /* ------------------------------------------------------------ */
826     List<String> clientsOnBrowser(String browserId)
827     {
828         List<String> clients=_browser2client.get(browserId);
829 
830         if (clients == null)
831             return Collections.emptyList();
832         return clients;
833     }
834 
835     /* ------------------------------------------------------------ */
836     public void addListener(BayeuxListener listener)
837     {
838         if (listener instanceof ClientBayeuxListener)
839             _clientListeners.add((ClientBayeuxListener)listener);
840         if (listener instanceof ChannelBayeuxListener)
841             _channelListeners.add((ChannelBayeuxListener)listener);
842     }
843 
844     /* ------------------------------------------------------------ */
845     public int getMaxClientQueue()
846     {
847         return _maxClientQueue;
848     }
849 
850     /* ------------------------------------------------------------ */
851     public void setMaxClientQueue(int size)
852     {
853         _maxClientQueue=size;
854     }
855 
856     /* ------------------------------------------------------------ */
857     protected Message extendRcv(ClientImpl from, Message message)
858     {
859         if (_extensions != null)
860         {
861             for (int i=_extensions.length; message != null && i-- > 0;)
862                 message=_extensions[i].rcv(from,message);
863         }
864 
865         if (from != null)
866         {
867             Extension[] client_exs=from.getExtensions();
868             if (client_exs != null)
869             {
870                 for (int i=client_exs.length; message != null && i-- > 0;)
871                     message=client_exs[i].rcv(from,message);
872             }
873         }
874 
875         return message;
876     }
877 
878     /* ------------------------------------------------------------ */
879     protected Message extendRcvMeta(ClientImpl from, Message message)
880     {
881         if (_extensions != null)
882         {
883             for (int i=_extensions.length; message != null && i-- > 0;)
884                 message=_extensions[i].rcvMeta(from,message);
885         }
886 
887         if (from != null)
888         {
889             Extension[] client_exs=from.getExtensions();
890             if (client_exs != null)
891             {
892                 for (int i=client_exs.length; message != null && i-- > 0;)
893                     message=client_exs[i].rcvMeta(from,message);
894             }
895         }
896         return message;
897     }
898 
899     /* ------------------------------------------------------------ */
900     protected Message extendSendBayeux(Client from, Message message)
901     {
902         if (_extensions != null)
903         {
904             for (int i=0; message != null && i < _extensions.length; i++)
905             {
906                 message=_extensions[i].send(from,message);
907             }
908         }
909 
910         return message;
911     }
912 
913     /* ------------------------------------------------------------ */
914     public Message extendSendClient(Client from, ClientImpl to, Message message)
915     {
916         if (to != null)
917         {
918             Extension[] client_exs=to.getExtensions();
919             if (client_exs != null)
920             {
921                 for (int i=0; message != null && i < client_exs.length; i++)
922                     message=client_exs[i].send(from,message);
923             }
924         }
925 
926         return message;
927     }
928 
929     /* ------------------------------------------------------------ */
930     public Message extendSendMeta(ClientImpl from, Message message)
931     {
932         if (_extensions != null)
933         {
934             for (int i=0; message != null && i < _extensions.length; i++)
935                 message=_extensions[i].sendMeta(from,message);
936         }
937 
938         if (from != null)
939         {
940             Extension[] client_exs=from.getExtensions();
941             if (client_exs != null)
942             {
943                 for (int i=0; message != null && i < client_exs.length; i++)
944                     message=client_exs[i].sendMeta(from,message);
945             }
946         }
947 
948         return message;
949     }
950 
951     /* ------------------------------------------------------------ */
952     /**
953      * @return the maximum ms that a lazy message will wait before 
954      * resuming waiting client
955      */
956     public int getMaxLazyLatency()
957     {
958         return _maxLazyLatency;
959     }
960 
961     /* ------------------------------------------------------------ */
962     /**
963      * @param ms the maximum ms that a lazy message will wait before 
964      * resuming waiting client
965      */
966     public void setMaxLazyLatency(int ms)
967     {
968         _maxLazyLatency = ms;
969     }
970 
971     /* ------------------------------------------------------------ */
972     /* ------------------------------------------------------------ */
973     public static class DefaultPolicy implements SecurityPolicy
974     {
975         public boolean canHandshake(Message message)
976         {
977             return true;
978         }
979 
980         public boolean canCreate(Client client, String channel, Message message)
981         {
982             return client != null && !channel.startsWith(Bayeux.META_SLASH);
983         }
984 
985         public boolean canSubscribe(Client client, String channel, Message message)
986         {
987             if (client != null && ("/**".equals(channel) || "/*".equals(channel)))
988                 return false;
989             return client != null && !channel.startsWith(Bayeux.META_SLASH);
990         }
991 
992         public boolean canPublish(Client client, String channel, Message message)
993         {
994             return client != null || client == null && Bayeux.META_HANDSHAKE.equals(channel);
995         }
996 
997     }
998 
999     /* ------------------------------------------------------------ */
1000     /* ------------------------------------------------------------ */
1001     protected abstract class Handler
1002     {
1003         abstract void handle(ClientImpl client, Transport transport, Message message) throws IOException;
1004 
1005         abstract ChannelId getMetaChannelId();
1006 
1007         void unknownClient(Transport transport, String channel) throws IOException
1008         {
1009             MessageImpl reply=newMessage();
1010 
1011             reply.put(CHANNEL_FIELD,channel);
1012             reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1013             reply.put(ERROR_FIELD,"402::Unknown client");
1014             reply.put("advice",_handshakeAdvice);
1015             transport.send(reply);
1016             reply.decRef();
1017         }
1018 
1019         void sendMetaReply(final ClientImpl client, Message reply, final Transport transport) throws IOException
1020         {
1021             reply=extendSendMeta(client,reply);
1022             if (reply != null)
1023             {
1024                 transport.send(reply);
1025                 if (reply instanceof MessageImpl)
1026                     ((MessageImpl)reply).decRef();
1027             }
1028         }
1029     }
1030 
1031     /* ------------------------------------------------------------ */
1032     /* ------------------------------------------------------------ */
1033     protected class ConnectHandler extends Handler
1034     {
1035         protected String _metaChannel=META_CONNECT;
1036 
1037         @Override
1038         ChannelId getMetaChannelId()
1039         {
1040             return META_CONNECT_ID;
1041         }
1042 
1043         @Override
1044         public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1045         {
1046             if (client == null)
1047             {
1048                 unknownClient(transport,_metaChannel);
1049                 return;
1050             }
1051 
1052             // is this the first connect message?
1053             String type=client.getConnectionType();
1054             boolean polling=true;
1055             if (type == null)
1056             {
1057                 type=(String)message.get(Bayeux.CONNECTION_TYPE_FIELD);
1058                 client.setConnectionType(type);
1059                 polling=false;
1060             }
1061 
1062             Object advice=message.get(ADVICE_FIELD);
1063             if (advice != null)
1064             {
1065                 Long timeout=(Long)((Map)advice).get("timeout");
1066                 if (timeout != null && timeout.longValue() > 0)
1067                     client.setTimeout(timeout.longValue());
1068                 else
1069                     client.setTimeout(0);
1070 
1071                 Long interval=(Long)((Map)advice).get("interval");
1072                 if (interval != null && interval.longValue() > 0)
1073                     client.setInterval(interval.longValue());
1074                 else
1075                     client.setInterval(0);
1076             }
1077             else
1078             {
1079                 client.setTimeout(0);
1080                 client.setInterval(0);
1081             }
1082 
1083             advice=null;
1084 
1085             // Work out if multiple clients from some browser?
1086             if (polling && _multiFrameInterval > 0 && client.getBrowserId() != null)
1087             {
1088                 List<String> clients=clientsOnBrowser(client.getBrowserId());
1089                 int count=clients.size();
1090                 if (count > 1)
1091                 {
1092                     polling=clients.get(0).equals(client.getId());
1093                     advice=client.getAdvice();
1094                     if (advice == null)
1095                         advice=_multiFrameAdvice;
1096                     else
1097                         // could probably cache this
1098                         advice=multiFrameAdvice((JSON.Literal)advice);
1099                 }
1100             }
1101 
1102             synchronized(this)
1103             {
1104                 if (advice == null)
1105                 {
1106                     if (_adviceVersion != client._adviseVersion)
1107                     {
1108                         advice=_advice;
1109                         client._adviseVersion=_adviceVersion;
1110                     }
1111                 }
1112                 else
1113                     client._adviseVersion=-1; // clear so it is reset after multi state clears
1114             }
1115 
1116             // reply to connect message
1117             String id=message.getId();
1118 
1119             Message reply=newMessage(message);
1120 
1121             reply.put(CHANNEL_FIELD,META_CONNECT);
1122             reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1123             if (advice != null)
1124                 reply.put(ADVICE_FIELD,advice);
1125             if (id != null)
1126                 reply.put(ID_FIELD,id);
1127 
1128             if (polling)
1129                 transport.setMetaConnectReply(reply);
1130             else
1131                 sendMetaReply(client,reply,transport);
1132         }
1133     }
1134 
1135     /* ------------------------------------------------------------ */
1136     /* ------------------------------------------------------------ */
1137     protected class DisconnectHandler extends Handler
1138     {
1139         @Override
1140         ChannelId getMetaChannelId()
1141         {
1142             return META_DISCONNECT_ID;
1143         }
1144 
1145         @Override
1146         public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1147         {
1148             if (client == null)
1149             {
1150                 unknownClient(transport,META_DISCONNECT);
1151                 return;
1152             }
1153             if (isLogInfo())
1154                 logInfo("Disconnect " + client.getId());
1155 
1156             client.remove(false);
1157 
1158             Message reply=newMessage(message);
1159             reply.put(CHANNEL_FIELD,META_DISCONNECT);
1160             reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1161             String id=message.getId();
1162             if (id != null)
1163                 reply.put(ID_FIELD,id);
1164 
1165             reply=extendSendMeta(client,reply);
1166 
1167             Message pollReply=transport.getMetaConnectReply();
1168             if (pollReply != null)
1169             {
1170                 transport.setMetaConnectReply(null);
1171                 sendMetaReply(client,pollReply,transport);
1172             }
1173             sendMetaReply(client,reply,transport);
1174         }
1175     }
1176 
1177     /* ------------------------------------------------------------ */
1178     /* ------------------------------------------------------------ */
1179     protected class HandshakeHandler extends Handler
1180     {
1181         @Override
1182         ChannelId getMetaChannelId()
1183         {
1184             return META_HANDSHAKE_ID;
1185         }
1186 
1187         @Override
1188         public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1189         {
1190             if (client != null)
1191                 throw new IllegalStateException();
1192 
1193             if (_securityPolicy != null && !_securityPolicy.canHandshake(message))
1194             {
1195                 Message reply=newMessage(message);
1196                 reply.put(CHANNEL_FIELD,META_HANDSHAKE);
1197                 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1198                 reply.put(ERROR_FIELD,"403::Handshake denied");
1199 
1200                 sendMetaReply(client,reply,transport);
1201                 return;
1202             }
1203 
1204             client=newRemoteClient();
1205 
1206             Message reply=newMessage(message);
1207             reply.put(CHANNEL_FIELD,META_HANDSHAKE);
1208             reply.put(VERSION_FIELD,"1.0");
1209             reply.put(MIN_VERSION_FIELD,"0.9");
1210 
1211             if (client != null)
1212             {
1213                 reply.put(SUPPORTED_CONNECTION_TYPES_FIELD,_transports);
1214                 reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1215                 reply.put(CLIENT_FIELD,client.getId());
1216                 if (_advice != null)
1217                     reply.put(ADVICE_FIELD,_advice);
1218             }
1219             else
1220             {
1221                 reply.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
1222                 if (_advice != null)
1223                     reply.put(ADVICE_FIELD,_advice);
1224             }
1225 
1226             if (isLogDebug())
1227                 logDebug("handshake.handle: reply=" + reply);
1228 
1229             String id=message.getId();
1230             if (id != null)
1231                 reply.put(ID_FIELD,id);
1232 
1233             sendMetaReply(client,reply,transport);
1234         }
1235     }
1236 
1237     /* ------------------------------------------------------------ */
1238     /* ------------------------------------------------------------ */
1239     protected class PublishHandler extends Handler
1240     {
1241         @Override
1242         ChannelId getMetaChannelId()
1243         {
1244             return null;
1245         }
1246 
1247         @Override
1248         public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1249         {
1250             String channel_id=message.getChannel();
1251 
1252             if (client == null && message.containsKey(CLIENT_FIELD))
1253             {
1254                 unknownClient(transport,channel_id);
1255                 return;
1256             }
1257 
1258             String id=message.getId();
1259 
1260             ChannelId cid=getChannelId(channel_id);
1261             Object data=message.get(Bayeux.DATA_FIELD);
1262 
1263             Message reply=newMessage(message);
1264             reply.put(CHANNEL_FIELD,channel_id);
1265             if (id != null)
1266                 reply.put(ID_FIELD,id);
1267 
1268             if (data == null)
1269             {
1270                 message=null;
1271                 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1272                 reply.put(ERROR_FIELD,"403::No data");
1273             }
1274             else if (!_securityPolicy.canPublish(client,channel_id,message))
1275             {
1276                 message=null;
1277                 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1278                 reply.put(ERROR_FIELD,"403::Publish denied");
1279             }
1280             else
1281             {
1282                 message.remove(CLIENT_FIELD);
1283                 message=extendSendBayeux(client,message);
1284 
1285                 if (message != null)
1286                 {
1287                     reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1288                 }
1289                 else
1290                 {
1291                     reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1292                     reply.put(ERROR_FIELD,"404::Message deleted");
1293                 }
1294             }
1295 
1296             sendMetaReply(client,reply,transport);
1297 
1298             if (message != null)
1299                 _root.doDelivery(cid,client,message);
1300         }
1301     }
1302 
1303     /* ------------------------------------------------------------ */
1304     /* ------------------------------------------------------------ */
1305     protected class MetaPublishHandler extends Handler
1306     {
1307         @Override
1308         ChannelId getMetaChannelId()
1309         {
1310             return null;
1311         }
1312 
1313         @Override
1314         public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1315         {
1316             String channel_id=message.getChannel();
1317 
1318             if (client == null && !META_HANDSHAKE.equals(channel_id))
1319             {
1320                 // unknown client
1321                 return;
1322             }
1323 
1324             if (_securityPolicy.canPublish(client,channel_id,message))
1325             {
1326                 _root.doDelivery(getChannelId(channel_id),client,message);
1327             }
1328         }
1329     }
1330 
1331     /* ------------------------------------------------------------ */
1332     /* ------------------------------------------------------------ */
1333     protected class SubscribeHandler extends Handler
1334     {
1335         @Override
1336         ChannelId getMetaChannelId()
1337         {
1338             return META_SUBSCRIBE_ID;
1339         }
1340 
1341         @Override
1342         public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1343         {
1344             if (client == null)
1345             {
1346                 unknownClient(transport,META_SUBSCRIBE);
1347                 return;
1348             }
1349 
1350             String subscribe_id=(String)message.get(SUBSCRIPTION_FIELD);
1351 
1352             // select a random channel ID if none specifified
1353             if (subscribe_id == null)
1354             {
1355                 subscribe_id=Long.toString(getRandom(),36);
1356                 while(getChannel(subscribe_id) != null)
1357                     subscribe_id=Long.toString(getRandom(),36);
1358             }
1359 
1360             ChannelId cid=null;
1361             boolean can_subscribe=false;
1362 
1363             if (subscribe_id.startsWith(Bayeux.SERVICE_SLASH))
1364             {
1365                 can_subscribe=true;
1366             }
1367             else if (subscribe_id.startsWith(Bayeux.META_SLASH))
1368             {
1369                 can_subscribe=false;
1370             }
1371             else
1372             {
1373                 cid=getChannelId(subscribe_id);
1374                 can_subscribe=_securityPolicy.canSubscribe(client,subscribe_id,message);
1375             }
1376 
1377             Message reply=newMessage(message);
1378             reply.put(CHANNEL_FIELD,META_SUBSCRIBE);
1379             reply.put(SUBSCRIPTION_FIELD,subscribe_id);
1380 
1381             if (can_subscribe)
1382             {
1383                 if (cid != null)
1384                 {
1385                     ChannelImpl channel=getChannel(cid);
1386                     if (channel == null && _securityPolicy.canCreate(client,subscribe_id,message))
1387                         channel=(ChannelImpl)getChannel(subscribe_id,true);
1388 
1389                     if (channel != null)
1390                         channel.subscribe(client);
1391                     else
1392                         can_subscribe=false;
1393                 }
1394 
1395                 if (can_subscribe)
1396                 {
1397                     reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1398                 }
1399                 else
1400                 {
1401                     reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1402                     reply.put(ERROR_FIELD,"403::cannot create");
1403                 }
1404             }
1405             else
1406             {
1407                 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1408                 reply.put(ERROR_FIELD,"403::cannot subscribe");
1409 
1410             }
1411 
1412             String id=message.getId();
1413             if (id != null)
1414                 reply.put(ID_FIELD,id);
1415 
1416             sendMetaReply(client,reply,transport);
1417         }
1418     }
1419 
1420     /* ------------------------------------------------------------ */
1421     /* ------------------------------------------------------------ */
1422     protected class UnsubscribeHandler extends Handler
1423     {
1424         @Override
1425         ChannelId getMetaChannelId()
1426         {
1427             return META_UNSUBSCRIBE_ID;
1428         }
1429 
1430         @Override
1431         public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1432         {
1433             if (client == null)
1434             {
1435                 unknownClient(transport,META_UNSUBSCRIBE);
1436                 return;
1437             }
1438 
1439             String channel_id=(String)message.get(SUBSCRIPTION_FIELD);
1440             ChannelImpl channel=getChannel(channel_id);
1441             if (channel != null)
1442                 channel.unsubscribe(client);
1443 
1444             Message reply=newMessage(message);
1445             reply.put(CHANNEL_FIELD,META_UNSUBSCRIBE);
1446             reply.put(SUBSCRIPTION_FIELD,channel_id);
1447             reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1448 
1449             String id=message.getId();
1450             if (id != null)
1451                 reply.put(ID_FIELD,id);
1452 
1453             sendMetaReply(client,reply,transport);
1454         }
1455     }
1456 
1457     /* ------------------------------------------------------------ */
1458     /* ------------------------------------------------------------ */
1459     protected class PingHandler extends Handler
1460     {
1461         @Override
1462         ChannelId getMetaChannelId()
1463         {
1464             return META_PING_ID;
1465         }
1466 
1467         @Override
1468         public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1469         {
1470             Message reply=newMessage(message);
1471             reply.put(CHANNEL_FIELD,META_PING);
1472             reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1473 
1474             String id=message.getId();
1475             if (id != null)
1476                 reply.put(ID_FIELD,id);
1477 
1478             sendMetaReply(client,reply,transport);
1479         }
1480     }
1481 
1482     /* ------------------------------------------------------------ */
1483     /* ------------------------------------------------------------ */
1484     protected class ServiceChannel extends ChannelImpl
1485     {
1486         ServiceChannel(String id)
1487         {
1488             super(id,AbstractBayeux.this);
1489         }
1490 
1491         /* ------------------------------------------------------------ */
1492         /*
1493          * (non-Javadoc)
1494          *
1495          * @see
1496          * org.mortbay.cometd.ChannelImpl#addChild(org.mortbay.cometd.ChannelImpl
1497          * )
1498          */
1499         @Override
1500         public void addChild(ChannelImpl channel)
1501         {
1502             super.addChild(channel);
1503             setPersistent(true);
1504         }
1505 
1506         /* ------------------------------------------------------------ */
1507         @Override
1508         public void subscribe(Client client)
1509         {
1510             if (client.isLocal())
1511                 super.subscribe(client);
1512         }
1513 
1514     }
1515 }