View Javadoc

1   package org.mortbay.cometd.ext;
2   
3   import java.util.HashMap;
4   import java.util.Iterator;
5   import java.util.Map;
6   import java.util.Queue;
7   
8   import javax.servlet.UnavailableException;
9   
10  import org.cometd.Bayeux;
11  import org.cometd.Client;
12  import org.cometd.Extension;
13  import org.cometd.Message;
14  import org.mortbay.cometd.MessageImpl;
15  import org.mortbay.util.ArrayQueue;
16  
17  /**
18   * Acknowledged Message Client extension.
19   * 
20   * Tracks the batch id of messages sent to a client.
21   * 
22   */
23  public class AcknowledgedMessagesClientExtension implements Extension
24  {
25      private final Client _client;
26      private final ArrayIdQueue<Message> _unackedQueue;
27  
28      public AcknowledgedMessagesClientExtension(Client client)
29      {
30          _client=client;
31          _unackedQueue=new ArrayIdQueue<Message>(8,16,client);
32          _unackedQueue.setCurrentId(1);
33      }
34  
35      public Message rcv(Client from, Message message)
36      {
37          return message;
38      }
39  
40      /**
41       * Handle received meta messages. Looks for meta/connect messages with
42       * ext/ack fields. If present, delete all messages that have been acked and
43       * requeue messages that have not been acked.
44       */
45      public Message rcvMeta(Client from, Message message)
46      {
47          if (message.getChannel().equals(Bayeux.META_CONNECT))
48          {
49              synchronized(_client)
50              {
51                  Map<String,Object> ext=message.getExt(false);
52                  if (ext != null)
53                  {
54                      Long acked=(Long)ext.get("ack");
55                      if (acked != null)
56                      {
57                          // We have received an ack ID, so delete the acked
58                          // messages.
59                          final int s=_unackedQueue.size();
60                          if (s > 0)
61                          {
62                              if (_unackedQueue.getAssociatedIdUnsafe(s - 1) <= acked)
63                              {
64                                  // we can just clear the queue
65                                  for (int i=0; i < s; i++)
66                                  {
67                                      final Message q=_unackedQueue.getUnsafe(i);
68                                      if (q instanceof MessageImpl)
69                                          ((MessageImpl)q).decRef();
70                                  }
71                                  _unackedQueue.clear();
72                              }
73                              else
74                              {
75                                  // we need to remove elements until we see
76                                  // unacked
77                                  for (int i=0; i < s; i++)
78                                  {
79                                      if (_unackedQueue.getAssociatedIdUnsafe(0) <= acked)
80                                      {
81                                          final Message q=_unackedQueue.remove();
82                                          if (q instanceof MessageImpl)
83                                              ((MessageImpl)q).decRef();
84                                          continue;
85                                      }
86                                      break;
87                                  }
88                              }
89                          }
90                      }
91                  }
92  
93                  // requeue all unacked messages.
94                  final ArrayQueue<Message> messages=(ArrayQueue)from.getQueue();
95                  final int cid=_unackedQueue.getCurrentId();
96                  final int s=_unackedQueue.size();
97                  for (int i=0; i < s; i++)
98                  {
99                      if (_unackedQueue.getAssociatedIdUnsafe(0) < cid)
100                         messages.add(i,_unackedQueue.remove());
101                     else
102                         break;
103                 }
104             }
105         }
106 
107         return message;
108     }
109 
110     public Message send(Client from, Message message)
111     {
112         synchronized(_client)
113         {
114             _unackedQueue.add(message);
115             // prevent the message from being erased
116             ((MessageImpl)message).incRef();
117         }
118         return message;
119     }
120 
121     public Message sendMeta(Client from, Message message)
122     {
123         if (message.getChannel().equals(Bayeux.META_CONNECT))
124         {
125             synchronized(_client)
126             {
127                 Map<String,Object> ext=message.getExt(true);
128                 ext.put("ack",_unackedQueue.getCurrentId());
129                 _unackedQueue.incrementCurrentId();
130             }
131         }
132         return message;
133     }
134 }