View Javadoc

1   //========================================================================
2   //Copyright 2004-2008 Mort Bay Consulting Pty. Ltd.
3   //------------------------------------------------------------------------
4   //Licensed under the Apache License, Version 2.0 (the "License");
5   //you may not use this file except in compliance with the License.
6   //You may obtain a copy of the License at 
7   //http://www.apache.org/licenses/LICENSE-2.0
8   //Unless required by applicable law or agreed to in writing, software
9   //distributed under the License is distributed on an "AS IS" BASIS,
10  //WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  //See the License for the specific language governing permissions and
12  //limitations under the License.
13  //========================================================================
14  
15  package org.mortbay.io.nio;
16  
17  import java.io.IOException;
18  import java.nio.channels.ClosedChannelException;
19  import java.nio.channels.SelectableChannel;
20  import java.nio.channels.SelectionKey;
21  import java.nio.channels.SocketChannel;
22  
23  import org.mortbay.io.Buffer;
24  import org.mortbay.io.Connection;
25  import org.mortbay.io.nio.SelectorManager.SelectSet;
26  import org.mortbay.jetty.EofException;
27  import org.mortbay.jetty.HttpException;
28  import org.mortbay.log.Log;
29  import org.mortbay.thread.Timeout;
30  
31  /* ------------------------------------------------------------ */
32  /**
33   * An Endpoint that can be scheduled by {@link SelectorManager}.
34   * 
35   * @author gregw
36   *
37   */
38  public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable
39  {
40      protected SelectorManager _manager;
41      protected SelectorManager.SelectSet _selectSet;
42      protected boolean _dispatched = false;
43      protected boolean _writable = true; 
44      protected SelectionKey _key;
45      protected int _interestOps;
46      protected boolean _readBlocked;
47      protected boolean _writeBlocked;
48      protected Connection _connection;
49  
50      private Timeout.Task _timeoutTask = new IdleTask();
51  
52      /* ------------------------------------------------------------ */
53      public Connection getConnection()
54      {
55          return _connection;
56      }
57      
58      /* ------------------------------------------------------------ */
59      public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key)
60      {
61          super(channel);
62  
63          _manager = selectSet.getManager();
64          _selectSet = selectSet;
65          _connection = _manager.newConnection(channel,this);
66          
67          _manager.endPointOpened(this); // TODO not here!
68          
69          _key = key;
70      }
71  
72      /* ------------------------------------------------------------ */
73      void dispatch() throws IOException
74      {
75          boolean dispatch_done = true;
76          try
77          {
78              if (dispatch(_manager.isDelaySelectKeyUpdate()))
79              {
80                  dispatch_done= false;
81                  dispatch_done = _manager.dispatch((Runnable)this);
82              }
83          }
84          finally
85          {
86              if (!dispatch_done)
87              {
88                  Log.warn("dispatch failed!");
89                  undispatch();
90              }
91          }
92      }
93      
94      /* ------------------------------------------------------------ */
95      /**
96       * Put the endpoint into the dispatched state.
97       * A blocked thread may be woken up by this call, or the endpoint placed in a state ready
98       * for a dispatch to a threadpool.
99       * @param assumeShortDispatch If true, the interested ops are not modified.
100      * @return True if the endpoint should be dispatched to a thread pool.
101      * @throws IOException
102      */
103     public boolean dispatch(boolean assumeShortDispatch) throws IOException
104     {
105         // If threads are blocked on this
106         synchronized (this)
107         {
108             if (_key == null || !_key.isValid())
109             {
110                 _readBlocked=false;
111                 _writeBlocked=false;
112                 this.notifyAll();
113                 return false;
114             }
115             
116             if (_readBlocked || _writeBlocked)
117             {
118                 if (_readBlocked && _key.isReadable())
119                     _readBlocked=false;
120                 if (_writeBlocked && _key.isWritable())
121                     _writeBlocked=false;
122 
123                 // wake them up is as good as a dispatched.
124                 this.notifyAll();
125                 
126                 // we are not interested in further selecting
127                 _key.interestOps(0);
128                 return false;
129             }
130 
131             if (!assumeShortDispatch)
132                 _key.interestOps(0);
133 
134             // Otherwise if we are still dispatched
135             if (_dispatched)
136             {
137                 // we are not interested in further selecting
138                 _key.interestOps(0);
139                 return false;
140             }
141 
142             // Remove writeable op
143             if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
144             {
145                 // Remove writeable op
146                 _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
147                 _key.interestOps(_interestOps);
148                 _writable = true; // Once writable is in ops, only removed with dispatch.
149             }
150 
151             _dispatched = true;
152         }
153         return true;
154     }
155 
156     /* ------------------------------------------------------------ */
157     public void scheduleIdle()
158     {
159         _selectSet.scheduleIdle(_timeoutTask);
160     }
161 
162     /* ------------------------------------------------------------ */
163     public void cancelIdle()
164     {
165         _selectSet.cancelIdle(_timeoutTask);
166     }
167 
168 
169     /* ------------------------------------------------------------ */
170     protected void idleExpired()
171     {
172         try
173         {
174             close();
175         }
176         catch (IOException e)
177         {
178             Log.ignore(e);
179         }
180     }
181     
182     /* ------------------------------------------------------------ */
183     /**
184      * Called when a dispatched thread is no longer handling the endpoint. The selection key
185      * operations are updated.
186      */
187     public void undispatch()
188     {
189         synchronized (this)
190         {
191             try
192             {
193                 _dispatched = false;
194                 updateKey();
195             }
196             catch (Exception e)
197             {
198                 // TODO investigate if this actually is a problem?
199                 Log.ignore(e);
200                 _interestOps = -1;
201                 _selectSet.addChange(this);
202             }
203         }
204     }
205 
206     /* ------------------------------------------------------------ */
207     /*
208      */
209     public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
210     {
211         int l = super.flush(header, buffer, trailer);
212         _writable = l > 0;
213         return l;
214     }
215 
216     /* ------------------------------------------------------------ */
217     /*
218      */
219     public int flush(Buffer buffer) throws IOException
220     {
221         int l = super.flush(buffer);
222         _writable = l > 0;
223         return l;
224     }
225 
226     /* ------------------------------------------------------------ */
227     /*
228      * Allows thread to block waiting for further events.
229      */
230     public boolean blockReadable(long timeoutMs) throws IOException
231     {
232         synchronized (this)
233         {
234             long start=_selectSet.getNow();
235             try
236             {   
237                 _readBlocked=true;
238                 while (isOpen() && _readBlocked)
239                 {
240                     try
241                     {
242                         updateKey();
243                         this.wait(timeoutMs);
244 
245                         if (_readBlocked && timeoutMs<(_selectSet.getNow()-start))
246                             return false;
247                     }
248                     catch (InterruptedException e)
249                     {
250                         Log.warn(e);
251                     }
252                 }
253             }
254             finally
255             {
256                 _readBlocked=false;
257             }
258         }
259         return true;
260     }
261 
262     /* ------------------------------------------------------------ */
263     /*
264      * Allows thread to block waiting for further events.
265      */
266     public boolean blockWritable(long timeoutMs) throws IOException
267     {
268         synchronized (this)
269         {
270             long start=_selectSet.getNow();
271             try
272             {   
273                 _writeBlocked=true;
274                 while (isOpen() && _writeBlocked)
275                 {
276                     try
277                     {
278                         updateKey();
279                         this.wait(timeoutMs);
280 
281                         if (_writeBlocked && timeoutMs<(_selectSet.getNow()-start))
282                             return false;
283                     }
284                     catch (InterruptedException e)
285                     {
286                         Log.warn(e);
287                     }
288                 }
289             }
290             finally
291             {
292                 _writeBlocked=false;
293             }
294         }
295         return true;
296     }
297 
298     /* ------------------------------------------------------------ */
299     public void setWritable(boolean writable)
300     {
301         _writable=writable;
302     }
303     
304     /* ------------------------------------------------------------ */
305     public void scheduleWrite()
306     {
307         _writable=false;
308         updateKey();
309     }
310     
311     /* ------------------------------------------------------------ */
312     /**
313      * Updates selection key. Adds operations types to the selection key as needed. No operations
314      * are removed as this is only done during dispatch. This method records the new key and
315      * schedules a call to doUpdateKey to do the keyChange
316      */
317     private void updateKey()
318     {
319         synchronized (this)
320         {
321             int ops=-1;
322             if (getChannel().isOpen())
323             {
324                 ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
325                 _interestOps = 
326                     ((!_dispatched || _readBlocked)  ? SelectionKey.OP_READ  : 0) 
327                 |   ((!_writable   || _writeBlocked) ? SelectionKey.OP_WRITE : 0);
328             }
329             if(_interestOps == ops && getChannel().isOpen())
330                 return;
331             
332         }
333         _selectSet.addChange(this);
334         _selectSet.wakeup();
335     }
336     
337     /* ------------------------------------------------------------ */
338     /**
339      * Synchronize the interestOps with the actual key. Call is scheduled by a call to updateKey
340      */
341     void doUpdateKey()
342     {
343         synchronized (this)
344         {
345             if (getChannel().isOpen())
346             {
347                 if (_interestOps>0)
348                 {
349                     if (_key==null || !_key.isValid())
350                     {
351                         SelectableChannel sc = (SelectableChannel)getChannel();
352                         if (sc.isRegistered())
353                         {
354                             updateKey();   
355                         }
356                         else
357                         {
358                             try
359                             {
360                                 _key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this);
361                             }
362                             catch (Exception e)
363                             {
364                                 Log.ignore(e);
365                                 if (_key!=null && _key.isValid())
366                                 {
367                                     _key.cancel();
368                                 }
369                                 cancelIdle();
370                                 _manager.endPointClosed(this);
371                                 _key = null;
372                             }
373                         }
374                     }
375                     else
376                     {
377                         _key.interestOps(_interestOps);
378                     }
379                 }
380                 else
381                 {
382                     if (_key.isValid())
383                         _key.interestOps(0);
384                     else
385                         _key=null;
386                 }
387             }
388             else    
389             {
390                 if (_key!=null && _key.isValid())
391                 {
392                     _key.interestOps(0);
393                     _key.cancel(); 
394                 }
395                 cancelIdle();
396                 _manager.endPointClosed(this);
397                 _key = null;
398             }
399         }
400     }
401 
402     /* ------------------------------------------------------------ */
403     /* 
404      */
405     public void run()
406     {
407         try
408         {
409             _connection.handle();
410         }
411         catch (ClosedChannelException e)
412         {
413             Log.ignore(e);
414         }
415         catch (EofException e)
416         {
417             Log.debug("EOF", e);
418             try{close();}
419             catch(IOException e2){Log.ignore(e2);}
420         }
421         catch (HttpException e)
422         {
423             Log.debug("BAD", e);
424             try{close();}
425             catch(IOException e2){Log.ignore(e2);}
426         }
427         catch (Throwable e)
428         {
429             Log.warn("handle failed", e);
430             try{close();}
431             catch(IOException e2){Log.ignore(e2);}
432         }
433         finally
434         {
435             undispatch();
436         }
437     }
438 
439     /* ------------------------------------------------------------ */
440     /*
441      * @see org.mortbay.io.nio.ChannelEndPoint#close()
442      */
443     public void close() throws IOException
444     {
445         try
446         {
447             super.close();
448         }
449         catch (IOException e)
450         {
451             Log.ignore(e);
452         }   
453         finally
454         {
455             updateKey();
456         }
457     }
458     
459     /* ------------------------------------------------------------ */
460     public String toString()
461     {
462         return "SCEP@" + hashCode() + "[d=" + _dispatched + ",io=" + _interestOps + ",w=" + _writable + ",b=" + _readBlocked + "|" + _writeBlocked + "]";
463     }
464 
465     /* ------------------------------------------------------------ */
466     public Timeout.Task getTimeoutTask()
467     {
468         return _timeoutTask;
469     }
470 
471     /* ------------------------------------------------------------ */
472     public SelectSet getSelectSet()
473     {
474         return _selectSet;
475     }
476 
477     /* ------------------------------------------------------------ */
478     /* ------------------------------------------------------------ */
479     /* ------------------------------------------------------------ */
480     public class IdleTask extends Timeout.Task 
481     {
482         /* ------------------------------------------------------------ */
483         /*
484          * @see org.mortbay.thread.Timeout.Task#expire()
485          */
486         public void expire()
487         {
488             idleExpired();
489         }
490 
491         public String toString()
492         {
493             return "TimeoutTask:" + SelectChannelEndPoint.this.toString();
494         }
495 
496     }
497 
498 }