1 package org.cometd.oort;
2
3 import java.io.IOException;
4 import java.util.HashMap;
5 import java.util.HashSet;
6 import java.util.Map;
7 import java.util.Set;
8
9 import org.cometd.Channel;
10 import org.cometd.Client;
11 import org.cometd.Message;
12 import org.cometd.MessageListener;
13 import org.mortbay.cometd.MessageImpl;
14 import org.mortbay.cometd.client.BayeuxClient;
15
16
17
18
19
20
21
22
23 public class OortComet extends BayeuxClient
24 {
25 protected Oort _oort;
26 protected String _cometUrl;
27 protected String _cometSecret;
28 protected boolean _connected;
29 protected boolean _handshook;
30
31 OortComet(Oort oort,String cometUrl)
32 {
33 super(oort._httpClient,cometUrl,oort._timer);
34 _cometUrl=cometUrl;
35 _oort=oort;
36 addListener(new OortCometListener());
37 }
38
39 public boolean isConnected()
40 {
41 return _connected;
42 }
43
44 public boolean isHandshook()
45 {
46 return _handshook;
47 }
48
49 @Override
50 protected String extendOut(String message)
51 {
52 if (message==BayeuxClient.Handshake.__HANDSHAKE)
53 {
54 try
55 {
56 Message[] msg = _msgPool.parse(message);
57
58 Map<String,Object> oort = new HashMap<String,Object>();
59 oort.put("oort",_oort.getURL());
60 oort.put("oortSecret",_oort.getSecret());
61 oort.put("comet",_cometUrl);
62 Map<String,Object> ext = msg[0].getExt(true);
63 ext.put("oort",oort);
64
65 super.extendOut(msg[0]);
66 message= _msgPool.getJSON().toJSON(msg);
67
68 for (Message m:msg)
69 if (m instanceof MessageImpl)
70 ((MessageImpl)m).decRef();
71
72 }
73 catch (IOException e)
74 {
75 throw new IllegalArgumentException(e);
76 }
77
78 }
79 else
80 message=super.extendOut(message);
81
82 System.err.println(_oort.getURL()+" ==> "+message);
83 return message;
84 }
85
86 @Override
87 protected void metaConnect(boolean success, Message message)
88 {
89 _connected=success;
90 super.metaConnect(success,message);
91 }
92
93 @Override
94 protected void metaHandshake(boolean success, boolean reestablish, Message message)
95 {
96 synchronized (_oort)
97 {
98 _handshook=success;
99 super.metaHandshake(success,reestablish,message);
100 if (success)
101 {
102 Map<String,Object> ext = (Map<String,Object>)message.get("ext");
103 if (ext!=null)
104 {
105 Map<String,Object> oort = (Map<String,Object>)ext.get("oort");
106 if (oort!=null)
107 {
108 _cometSecret=(String)oort.get("cometSecret");
109
110 startBatch();
111 subscribe("/oort/cloud");
112 for (String channel : _oort._channels)
113 subscribe(channel);
114 publish("/oort/cloud",_oort.getKnownComets(),_cometSecret);
115 endBatch();
116 }
117 }
118 System.err.println(_oort.getURL()+" <== "+ext);
119 }
120 }
121 }
122
123 @Override
124 protected void metaPublishFail(Throwable e, Message[] messages)
125 {
126
127 super.metaPublishFail(e,messages);
128 }
129
130
131 protected class OortCometListener implements MessageListener
132 {
133 public void deliver(Client fromClient, Client toClient, Message msg)
134 {
135 String channelId = msg.getChannel();
136 if (msg.getData()!=null)
137 {
138 if (channelId.startsWith("/oort/"))
139 {
140 if (channelId.equals("/oort/cloud"))
141 {
142 Object[] data = (Object[])msg.getData();
143 Set<String> comets = new HashSet<String>();
144 for (Object o:data)
145 comets.add(o.toString());
146 _oort.observedComets(comets);
147 }
148
149 synchronized (_oort)
150 {
151 for( MessageListener listener : _oort._oortMessageListeners)
152 listener.deliver(fromClient,toClient,msg);
153 }
154 }
155 else
156 {
157 Channel channel = _oort._bayeux.getChannel(msg.getChannel(),false);
158 if (channel!=null)
159 channel.publish(_oort._oortClient,msg.getData(),msg.getId());
160 }
161 }
162 }
163 }
164 }