View Javadoc

1   package org.cometd.oort;
2   
3   import java.io.IOException;
4   import java.util.HashMap;
5   import java.util.HashSet;
6   import java.util.Map;
7   import java.util.Set;
8   
9   import org.cometd.Channel;
10  import org.cometd.Client;
11  import org.cometd.Message;
12  import org.cometd.MessageListener;
13  import org.mortbay.cometd.MessageImpl;
14  import org.mortbay.cometd.client.BayeuxClient;
15  
16  /**
17   * Oort Comet client.
18   * <p>
19   * A BayeuxClient that connects the local Oort comet server to 
20   * a remote Oort comet server.
21   *
22   */
23  public class OortComet extends BayeuxClient
24  {
25      protected Oort _oort;
26      protected String _cometUrl;
27      protected String _cometSecret;
28      protected boolean _connected;
29      protected boolean _handshook;
30  
31      OortComet(Oort oort,String cometUrl)
32      {
33          super(oort._httpClient,cometUrl,oort._timer);
34          _cometUrl=cometUrl;
35          _oort=oort;
36          addListener(new OortCometListener());
37      }
38  
39      public boolean isConnected()
40      {
41          return _connected;
42      }
43  
44      public boolean isHandshook()
45      {
46          return _handshook;
47      }
48  
49      @Override
50      protected String extendOut(String message)
51      {
52          if (message==BayeuxClient.Handshake.__HANDSHAKE)
53          {
54              try
55              {
56                  Message[] msg = _msgPool.parse(message);
57  
58                  Map<String,Object> oort = new HashMap<String,Object>();
59                  oort.put("oort",_oort.getURL());
60                  oort.put("oortSecret",_oort.getSecret());
61                  oort.put("comet",_cometUrl);
62                  Map<String,Object> ext = msg[0].getExt(true);
63                  ext.put("oort",oort);
64                  
65                  super.extendOut(msg[0]);
66                  message= _msgPool.getJSON().toJSON(msg);
67                  
68                  for (Message m:msg)
69                      if (m instanceof MessageImpl)
70                          ((MessageImpl)m).decRef();
71                  
72              }
73              catch (IOException e)
74              {
75                  throw new IllegalArgumentException(e);
76              } 
77       
78          }
79          else
80              message=super.extendOut(message);
81          
82          System.err.println(_oort.getURL()+" ==> "+message);
83          return message;
84      }
85  
86      @Override
87      protected void metaConnect(boolean success, Message message)
88      {
89          _connected=success;
90          super.metaConnect(success,message);
91      }
92  
93      @Override
94      protected void metaHandshake(boolean success, boolean reestablish, Message message)
95      {
96          synchronized (_oort)
97          {
98              _handshook=success;
99              super.metaHandshake(success,reestablish,message);
100             if (success)
101             {
102                 Map<String,Object> ext = (Map<String,Object>)message.get("ext");
103                 if (ext!=null)
104                 {
105                     Map<String,Object> oort = (Map<String,Object>)ext.get("oort");
106                     if (oort!=null)
107                     {
108                         _cometSecret=(String)oort.get("cometSecret");
109 
110                         startBatch();
111                         subscribe("/oort/cloud");
112                         for (String channel : _oort._channels)
113                             subscribe(channel);
114                         publish("/oort/cloud",_oort.getKnownComets(),_cometSecret);
115                         endBatch();
116                     }
117                 }
118                 System.err.println(_oort.getURL()+" <== "+ext);
119             }
120         }
121     }
122 
123     @Override
124     protected void metaPublishFail(Throwable e, Message[] messages)
125     {
126         // TODO Auto-generated method stub
127         super.metaPublishFail(e,messages);
128     }
129 
130 
131     protected class OortCometListener implements MessageListener
132     {
133         public void deliver(Client fromClient, Client toClient, Message msg)
134         {
135             String channelId = msg.getChannel();
136             if (msg.getData()!=null)
137             {
138                 if (channelId.startsWith("/oort/"))
139                 {
140                     if (channelId.equals("/oort/cloud"))
141                     {
142                         Object[] data = (Object[])msg.getData();
143                         Set<String> comets = new HashSet<String>();
144                         for (Object o:data)
145                             comets.add(o.toString());
146                         _oort.observedComets(comets);
147                     }  
148 
149                     synchronized (_oort)
150                     {
151                         for( MessageListener listener : _oort._oortMessageListeners)
152                             listener.deliver(fromClient,toClient,msg);
153                     }
154                 }
155                 else
156                 {
157                     Channel channel = _oort._bayeux.getChannel(msg.getChannel(),false);
158                     if (channel!=null)
159                         channel.publish(_oort._oortClient,msg.getData(),msg.getId());
160                 }
161             }
162         }
163     }
164 }