1 package org.mortbay.cometd.ext;
2
3 import java.util.HashMap;
4 import java.util.Iterator;
5 import java.util.Map;
6 import java.util.Queue;
7
8 import javax.servlet.UnavailableException;
9
10 import org.cometd.Bayeux;
11 import org.cometd.Client;
12 import org.cometd.Extension;
13 import org.cometd.Message;
14 import org.mortbay.cometd.MessageImpl;
15 import org.mortbay.util.ArrayQueue;
16
17
18
19
20
21
22
23 public class AcknowledgedMessagesClientExtension implements Extension
24 {
25 private final Client _client;
26 private final ArrayIdQueue<Message> _unackedQueue;
27
28 public AcknowledgedMessagesClientExtension(Client client)
29 {
30 _client=client;
31 _unackedQueue=new ArrayIdQueue<Message>(8,16,client);
32 _unackedQueue.setCurrentId(1);
33 }
34
35 public Message rcv(Client from, Message message)
36 {
37 return message;
38 }
39
40
41
42
43
44
45 public Message rcvMeta(Client from, Message message)
46 {
47 if (message.getChannel().equals(Bayeux.META_CONNECT))
48 {
49 synchronized(_client)
50 {
51 Map<String,Object> ext=message.getExt(false);
52 if (ext != null)
53 {
54 Long acked=(Long)ext.get("ack");
55 if (acked != null)
56 {
57
58
59 final int s=_unackedQueue.size();
60 if (s > 0)
61 {
62 if (_unackedQueue.getAssociatedIdUnsafe(s - 1) <= acked)
63 {
64
65 for (int i=0; i < s; i++)
66 {
67 final Message q=_unackedQueue.getUnsafe(i);
68 if (q instanceof MessageImpl)
69 ((MessageImpl)q).decRef();
70 }
71 _unackedQueue.clear();
72 }
73 else
74 {
75
76
77 for (int i=0; i < s; i++)
78 {
79 if (_unackedQueue.getAssociatedIdUnsafe(0) <= acked)
80 {
81 final Message q=_unackedQueue.remove();
82 if (q instanceof MessageImpl)
83 ((MessageImpl)q).decRef();
84 continue;
85 }
86 break;
87 }
88 }
89 }
90 }
91 }
92
93
94 final ArrayQueue<Message> messages=(ArrayQueue)from.getQueue();
95 final int cid=_unackedQueue.getCurrentId();
96 final int s=_unackedQueue.size();
97 for (int i=0; i < s; i++)
98 {
99 if (_unackedQueue.getAssociatedIdUnsafe(0) < cid)
100 messages.add(i,_unackedQueue.remove());
101 else
102 break;
103 }
104 }
105 }
106
107 return message;
108 }
109
110 public Message send(Client from, Message message)
111 {
112 synchronized(_client)
113 {
114 _unackedQueue.add(message);
115
116 ((MessageImpl)message).incRef();
117 }
118 return message;
119 }
120
121 public Message sendMeta(Client from, Message message)
122 {
123 if (message.getChannel().equals(Bayeux.META_CONNECT))
124 {
125 synchronized(_client)
126 {
127 Map<String,Object> ext=message.getExt(true);
128 ext.put("ack",_unackedQueue.getCurrentId());
129 _unackedQueue.incrementCurrentId();
130 }
131 }
132 return message;
133 }
134 }