1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.mortbay.cometd;
15
16 import java.lang.reflect.Method;
17 import java.util.Map;
18 import java.util.concurrent.ConcurrentHashMap;
19
20 import org.cometd.Bayeux;
21 import org.cometd.Channel;
22 import org.cometd.Client;
23 import org.cometd.Listener;
24 import org.cometd.Message;
25 import org.cometd.MessageListener;
26 import org.mortbay.component.LifeCycle;
27 import org.mortbay.log.Log;
28 import org.mortbay.thread.QueuedThreadPool;
29 import org.mortbay.thread.ThreadPool;
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60 public abstract class BayeuxService
61 {
62 private String _name;
63 private Bayeux _bayeux;
64 private Client _client;
65 private Map<String,Method> _methods=new ConcurrentHashMap<String,Method>();
66 private ThreadPool _threadPool;
67 private MessageListener _listener;
68 private boolean _seeOwn=false;
69
70
71
72
73
74
75
76
77
78
79
80 public BayeuxService(Bayeux bayeux, String name)
81 {
82 this(bayeux,name,0,false);
83 }
84
85
86
87
88
89
90
91
92
93
94
95
96
97 public BayeuxService(Bayeux bayeux, String name, int maxThreads)
98 {
99 this(bayeux,name,maxThreads,false);
100 }
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116 public BayeuxService(Bayeux bayeux, String name, int maxThreads, boolean synchronous)
117 {
118 if (maxThreads > 0)
119 setThreadPool(new QueuedThreadPool(maxThreads));
120 _name=name;
121 _bayeux=bayeux;
122 _client=_bayeux.newClient(name);
123 _listener=(synchronous)?new SyncListen():new AsyncListen();
124 _client.addListener(_listener);
125
126 }
127
128
129 public Bayeux getBayeux()
130 {
131 return _bayeux;
132 }
133
134
135 public Client getClient()
136 {
137 return _client;
138 }
139
140
141 public ThreadPool getThreadPool()
142 {
143 return _threadPool;
144 }
145
146
147
148
149
150
151
152
153 public void setThreadPool(ThreadPool pool)
154 {
155 try
156 {
157 if (pool instanceof LifeCycle)
158 if (!((LifeCycle)pool).isStarted())
159 ((LifeCycle)pool).start();
160 }
161 catch(Exception e)
162 {
163 throw new IllegalStateException(e);
164 }
165 _threadPool=pool;
166 }
167
168
169 public boolean isSeeOwnPublishes()
170 {
171 return _seeOwn;
172 }
173
174
175 public void setSeeOwnPublishes(boolean own)
176 {
177 _seeOwn=own;
178 }
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218 protected void subscribe(String channelId, String methodName)
219 {
220 Method method=null;
221
222 Class<?> c=this.getClass();
223 while(c != null && c != Object.class)
224 {
225 Method[] methods=c.getDeclaredMethods();
226 for (int i=methods.length; i-- > 0;)
227 {
228 if (methodName.equals(methods[i].getName()))
229 {
230 if (method != null)
231 throw new IllegalArgumentException("Multiple methods called '" + methodName + "'");
232 method=methods[i];
233 }
234 }
235 c=c.getSuperclass();
236 }
237
238 if (method == null)
239 throw new NoSuchMethodError(methodName);
240 int params=method.getParameterTypes().length;
241 if (params < 2 || params > 4)
242 throw new IllegalArgumentException("Method '" + methodName + "' does not have 2or3 parameters");
243 if (!Client.class.isAssignableFrom(method.getParameterTypes()[0]))
244 throw new IllegalArgumentException("Method '" + methodName + "' does not have Client as first parameter");
245
246 Channel channel=_bayeux.getChannel(channelId,true);
247
248 if (((ChannelImpl)channel).getChannelId().isWild())
249 {
250 final Method m=method;
251 Client wild_client=_bayeux.newClient(_name + "-wild");
252 wild_client.addListener(_listener instanceof MessageListener.Asynchronous?new AsyncWildListen(wild_client,m):new SyncWildListen(wild_client,m));
253 channel.subscribe(wild_client);
254 }
255 else
256 {
257 _methods.put(channelId,method);
258 channel.subscribe(_client);
259 }
260 }
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284 protected void send(Client toClient, String onChannel, Object data, String id)
285 {
286 toClient.deliver(getClient(),onChannel,data,id);
287 }
288
289
290
291
292
293
294
295
296
297
298
299 protected void exception(Client fromClient, Client toClient, Map<String,Object> msg, Throwable th)
300 {
301 System.err.println(msg);
302 th.printStackTrace();
303 }
304
305
306 private void invoke(final Method method, final Client fromClient, final Client toClient, final Message msg)
307 {
308 if (_threadPool == null)
309 doInvoke(method,fromClient,toClient,msg);
310 else
311 {
312 _threadPool.dispatch(new Runnable()
313 {
314 public void run()
315 {
316 try
317 {
318 ((MessageImpl)msg).incRef();
319 doInvoke(method,fromClient,toClient,msg);
320 }
321 finally
322 {
323 ((MessageImpl)msg).decRef();
324 }
325 }
326 });
327 }
328 }
329
330
331 private void doInvoke(Method method, Client fromClient, Client toClient, Message msg)
332 {
333 String channel=(String)msg.get(Bayeux.CHANNEL_FIELD);
334 Object data=msg.get(Bayeux.DATA_FIELD);
335 String id=msg.getId();
336
337 if (method != null)
338 {
339 try
340 {
341 Class<?>[] args=method.getParameterTypes();
342 Object arg=Message.class.isAssignableFrom(args[1])?msg:data;
343
344 Object reply=null;
345 switch(method.getParameterTypes().length)
346 {
347 case 2:
348 reply=method.invoke(this,fromClient,arg);
349 break;
350 case 3:
351 reply=method.invoke(this,fromClient,arg,id);
352 break;
353 case 4:
354 reply=method.invoke(this,fromClient,channel,arg,id);
355 break;
356 }
357
358 if (reply != null)
359 send(fromClient,channel,reply,id);
360 }
361 catch(Exception e)
362 {
363 Log.debug("method",method);
364 exception(fromClient,toClient,msg,e);
365 }
366 catch(Error e)
367 {
368 Log.debug("method",method);
369 exception(fromClient,toClient,msg,e);
370 }
371 }
372 }
373
374
375
376 private class AsyncListen implements MessageListener, MessageListener.Asynchronous
377 {
378 public void deliver(Client fromClient, Client toClient, Message msg)
379 {
380 if (!_seeOwn && fromClient == getClient())
381 return;
382 String channel=(String)msg.get(Bayeux.CHANNEL_FIELD);
383 Method method=_methods.get(channel);
384 invoke(method,fromClient,toClient,msg);
385 }
386 }
387
388
389
390 private class SyncListen implements MessageListener, MessageListener.Synchronous
391 {
392 public void deliver(Client fromClient, Client toClient, Message msg)
393 {
394 if (!_seeOwn && fromClient == getClient())
395 return;
396 String channel=(String)msg.get(Bayeux.CHANNEL_FIELD);
397 Method method=_methods.get(channel);
398 invoke(method,fromClient,toClient,msg);
399 }
400 }
401
402
403
404 private class SyncWildListen implements MessageListener, MessageListener.Synchronous
405 {
406 Client _client;
407 Method _method;
408
409 public SyncWildListen(Client client, Method method)
410 {
411 _client=client;
412 _method=method;
413 }
414
415 public void deliver(Client fromClient, Client toClient, Message msg)
416 {
417 if (!_seeOwn && (fromClient == _client || fromClient == getClient()))
418 return;
419 invoke(_method,fromClient,toClient,msg);
420 }
421 };
422
423
424
425 private class AsyncWildListen implements MessageListener, MessageListener.Asynchronous
426 {
427 Client _client;
428 Method _method;
429
430 public AsyncWildListen(Client client, Method method)
431 {
432 _client=client;
433 _method=method;
434 }
435
436 public void deliver(Client fromClient, Client toClient, Message msg)
437 {
438 if (!_seeOwn && (fromClient == _client || fromClient == getClient()))
439 return;
440 invoke(_method,fromClient,toClient,msg);
441 }
442 };
443
444 }