1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.mortbay.cometd.continuation;
16
17 import java.io.IOException;
18 import java.nio.ByteBuffer;
19 import javax.servlet.ServletException;
20 import javax.servlet.http.HttpServletRequest;
21 import javax.servlet.http.HttpServletResponse;
22
23 import org.cometd.Bayeux;
24 import org.cometd.Client;
25 import org.cometd.Extension;
26 import org.cometd.Message;
27 import org.mortbay.cometd.AbstractBayeux;
28 import org.mortbay.cometd.AbstractCometdServlet;
29 import org.mortbay.cometd.ClientImpl;
30 import org.mortbay.cometd.JSONTransport;
31 import org.mortbay.cometd.MessageImpl;
32 import org.mortbay.cometd.Transport;
33 import org.mortbay.util.ArrayQueue;
34 import org.mortbay.util.StringUtil;
35 import org.mortbay.util.ajax.Continuation;
36 import org.mortbay.util.ajax.ContinuationSupport;
37
38 public class ContinuationCometdServlet extends AbstractCometdServlet
39 {
40
41 @Override
42 protected AbstractBayeux newBayeux()
43 {
44 return new ContinuationBayeux();
45 }
46
47
48 @Override
49 protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
50 {
51
52 Object clientObj=request.getAttribute(CLIENT_ATTR);
53 Transport transport=null;
54 int received=-1;
55 boolean metaConnectDeliveryOnly=false;
56 boolean pendingResponse=false;
57 boolean metaConnect=false;
58
59
60 ContinuationClient client=(clientObj instanceof ClientImpl)?(ContinuationClient)clientObj:null;
61 if (client != null)
62 {
63
64 transport=(Transport)request.getAttribute(TRANSPORT_ATTR);
65 transport.setResponse(response);
66 metaConnectDeliveryOnly=client.isMetaConnectDeliveryOnly() || transport.isMetaConnectDeliveryOnly();
67 metaConnect=true;
68 }
69 else
70 {
71 Message[] messages=getMessages(request);
72 received=messages.length;
73
74
75 String jsonpParam=request.getParameter("jsonp");
76
77
78 try
79 {
80 for (Message message : messages)
81 {
82 if (jsonpParam != null)
83 message.put("jsonp",jsonpParam);
84
85 if (client == null)
86 {
87 client=(ContinuationClient)_bayeux.getClient((String)message.get(AbstractBayeux.CLIENT_FIELD));
88
89
90
91 if (client == null)
92 {
93
94 String browser_id=findBrowserId(request);
95 if (browser_id == null)
96 browser_id=setBrowserId(request,response);
97
98 if (transport == null)
99 {
100 transport=_bayeux.newTransport(client,message);
101 transport.setResponse(response);
102 metaConnectDeliveryOnly=transport.isMetaConnectDeliveryOnly();
103 }
104 _bayeux.handle(null,transport,message);
105 message=null;
106 continue;
107 }
108 }
109
110 String browser_id=findBrowserId(request);
111 if (browser_id != null && (client.getBrowserId() == null || !client.getBrowserId().equals(browser_id)))
112 client.setBrowserId(browser_id);
113
114
115 if (transport == null)
116 {
117 transport=_bayeux.newTransport(client,message);
118 transport.setResponse(response);
119 metaConnectDeliveryOnly=client.isMetaConnectDeliveryOnly() || transport.isMetaConnectDeliveryOnly();
120 }
121
122
123
124 if (!metaConnectDeliveryOnly && !pendingResponse)
125 {
126 pendingResponse=true;
127 client.responsePending();
128 }
129
130 if (Bayeux.META_CONNECT.equals(message.getChannel()))
131 metaConnect=true;
132
133 _bayeux.handle(client,transport,message);
134 }
135 }
136 finally
137 {
138 for (Message message : messages)
139 ((MessageImpl)message).decRef();
140 if (pendingResponse)
141 {
142 client.responded();
143 }
144 }
145 }
146
147 Message metaConnectReply=null;
148
149
150 if (transport != null)
151 {
152 metaConnectReply=transport.getMetaConnectReply();
153 if (metaConnectReply != null)
154 {
155 long timeout=client.getTimeout();
156 if (timeout == 0)
157 timeout=_bayeux.getTimeout();
158
159 Continuation continuation=ContinuationSupport.getContinuation(request,client);
160
161
162 synchronized(client)
163 {
164 if (!client.hasNonLazyMessages() && !continuation.isPending() && received <= 1)
165 {
166
167 client.setContinuation(continuation);
168 request.setAttribute(CLIENT_ATTR,client);
169 request.setAttribute(TRANSPORT_ATTR,transport);
170 continuation.suspend(timeout);
171 }
172
173 if (!continuation.isPending())
174 client.access();
175
176 continuation.reset();
177 }
178
179 client.setContinuation(null);
180 transport.setMetaConnectReply(null);
181 }
182 else if (client != null)
183 {
184 client.access();
185 }
186 }
187
188 if (client != null)
189 {
190 if (metaConnectDeliveryOnly && !metaConnect)
191 {
192
193 client.resume();
194 }
195 else
196 {
197
198 synchronized(client)
199 {
200 client.doDeliverListeners();
201
202 final ArrayQueue<Message> messages=(ArrayQueue)client.getQueue();
203 final int size=messages.size();
204
205 try
206 {
207 for (int i=0; i < size; i++)
208 {
209 final Message message=messages.getUnsafe(i);
210 final MessageImpl mesgImpl=(message instanceof MessageImpl)?(MessageImpl)message:null;
211
212
213 if (i == 0 && size == 1 && mesgImpl != null && _refsThreshold > 0 && metaConnectReply != null && transport instanceof JSONTransport)
214 {
215
216 ByteBuffer buffer=mesgImpl.getBuffer();
217 if (buffer != null)
218 {
219
220 request.setAttribute("org.mortbay.jetty.ResponseBuffer",buffer);
221 if (metaConnectReply instanceof MessageImpl)
222 ((MessageImpl)metaConnectReply).decRef();
223 metaConnectReply=null;
224 transport=null;
225 mesgImpl.decRef();
226 continue;
227 }
228 else if (mesgImpl.getRefs() >= _refsThreshold)
229 {
230
231 byte[] contentBytes=("[" + mesgImpl.getJSON() + ",{\"" + Bayeux.SUCCESSFUL_FIELD + "\":true,\"" + Bayeux.CHANNEL_FIELD
232 + "\":\"" + Bayeux.META_CONNECT + "\"}]").getBytes(StringUtil.__UTF8);
233 int contentLength=contentBytes.length;
234
235 String headerString="HTTP/1.1 200 OK\r\n" + "Content-Type: text/json; charset=utf-8\r\n" + "Content-Length: "
236 + contentLength + "\r\n" + "\r\n";
237
238 byte[] headerBytes=headerString.getBytes(StringUtil.__UTF8);
239
240 buffer=ByteBuffer.allocateDirect(headerBytes.length + contentLength);
241 buffer.put(headerBytes);
242 buffer.put(contentBytes);
243 buffer.flip();
244
245 mesgImpl.setBuffer(buffer);
246 request.setAttribute("org.mortbay.jetty.ResponseBuffer",buffer);
247 metaConnectReply=null;
248 if (metaConnectReply instanceof MessageImpl)
249 ((MessageImpl)metaConnectReply).decRef();
250 transport=null;
251 mesgImpl.decRef();
252 continue;
253 }
254 }
255
256 if (message != null)
257 transport.send(message);
258 if (mesgImpl != null)
259 mesgImpl.decRef();
260 }
261 }
262 finally
263 {
264 messages.clear();
265 }
266 }
267
268 if (metaConnectReply != null)
269 {
270 metaConnectReply=_bayeux.extendSendMeta(client,metaConnectReply);
271 transport.send(metaConnectReply);
272 if (metaConnectReply instanceof MessageImpl)
273 ((MessageImpl)metaConnectReply).decRef();
274 }
275 }
276 }
277
278 if (transport != null)
279 transport.complete();
280 }
281
282 public void destroy()
283 {
284 if (_bayeux != null)
285 ((ContinuationBayeux)_bayeux).destroy();
286 }
287 }