1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.mortbay.io.nio;
16
17 import java.io.IOException;
18 import java.nio.channels.ByteChannel;
19 import java.nio.channels.CancelledKeyException;
20 import java.nio.channels.SelectableChannel;
21 import java.nio.channels.SelectionKey;
22 import java.nio.channels.Selector;
23 import java.nio.channels.ServerSocketChannel;
24 import java.nio.channels.SocketChannel;
25 import java.util.ArrayList;
26 import java.util.Iterator;
27 import java.util.List;
28
29 import org.mortbay.component.AbstractLifeCycle;
30 import org.mortbay.component.LifeCycle;
31 import org.mortbay.io.Connection;
32 import org.mortbay.io.EndPoint;
33 import org.mortbay.log.Log;
34 import org.mortbay.thread.Timeout;
35
36
37
38
39
40
41
42
43
44
45 public abstract class SelectorManager extends AbstractLifeCycle
46 {
47 private static final int __JVMBUG_THRESHHOLD=Integer.getInteger("org.mortbay.io.nio.JVMBUG_THRESHHOLD",128).intValue();
48 private static final int __JVMBUG_THRESHHOLD2=__JVMBUG_THRESHHOLD*2;
49 private static final int __JVMBUG_THRESHHOLD1=(__JVMBUG_THRESHHOLD2+__JVMBUG_THRESHHOLD)/2;
50 private boolean _delaySelectKeyUpdate=true;
51 private long _maxIdleTime;
52 private long _lowResourcesConnections;
53 private long _lowResourcesMaxIdleTime;
54 private transient SelectSet[] _selectSet;
55 private int _selectSets=1;
56 private volatile int _set;
57 private boolean _jvmBug0;
58 private boolean _jvmBug1;
59
60
61
62
63
64
65
66 public void setMaxIdleTime(long maxIdleTime)
67 {
68 _maxIdleTime=maxIdleTime;
69 }
70
71
72
73
74
75 public void setSelectSets(int selectSets)
76 {
77 long lrc = _lowResourcesConnections * _selectSets;
78 _selectSets=selectSets;
79 _lowResourcesConnections=lrc/_selectSets;
80 }
81
82
83
84
85
86 public long getMaxIdleTime()
87 {
88 return _maxIdleTime;
89 }
90
91
92
93
94
95 public int getSelectSets()
96 {
97 return _selectSets;
98 }
99
100
101
102
103
104 public boolean isDelaySelectKeyUpdate()
105 {
106 return _delaySelectKeyUpdate;
107 }
108
109
110
111
112
113
114
115 public void register(SocketChannel channel, Object att) throws IOException
116 {
117 int s=_set++;
118 s=s%_selectSets;
119 SelectSet[] sets=_selectSet;
120 if (sets!=null)
121 {
122 SelectSet set=sets[s];
123 set.addChange(channel,att);
124 set.wakeup();
125 }
126 }
127
128
129
130
131
132
133
134 public void register(ServerSocketChannel acceptChannel) throws IOException
135 {
136 int s=_set++;
137 s=s%_selectSets;
138 SelectSet set=_selectSet[s];
139 set.addChange(acceptChannel);
140 set.wakeup();
141 }
142
143
144
145
146
147 public long getLowResourcesConnections()
148 {
149 return _lowResourcesConnections*_selectSets;
150 }
151
152
153
154
155
156
157
158
159 public void setLowResourcesConnections(long lowResourcesConnections)
160 {
161 _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets;
162 }
163
164
165
166
167
168 public long getLowResourcesMaxIdleTime()
169 {
170 return _lowResourcesMaxIdleTime;
171 }
172
173
174
175
176
177
178 public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime)
179 {
180 _lowResourcesMaxIdleTime=lowResourcesMaxIdleTime;
181 }
182
183
184
185
186
187
188 public void doSelect(int acceptorID) throws IOException
189 {
190 SelectSet[] sets= _selectSet;
191 if (sets!=null && sets.length>acceptorID && sets[acceptorID]!=null)
192 sets[acceptorID].doSelect();
193 }
194
195
196
197
198
199
200 public void setDelaySelectKeyUpdate(boolean delaySelectKeyUpdate)
201 {
202 _delaySelectKeyUpdate=delaySelectKeyUpdate;
203 }
204
205
206
207
208
209
210
211 protected abstract SocketChannel acceptChannel(SelectionKey key) throws IOException;
212
213
214 public abstract boolean dispatch(Runnable task) throws IOException;
215
216
217
218
219
220 protected void doStart() throws Exception
221 {
222 _selectSet = new SelectSet[_selectSets];
223 for (int i=0;i<_selectSet.length;i++)
224 _selectSet[i]= new SelectSet(i);
225
226 super.doStart();
227 }
228
229
230
231 protected void doStop() throws Exception
232 {
233 SelectSet[] sets= _selectSet;
234 _selectSet=null;
235 if (sets!=null)
236 for (int i=0;i<sets.length;i++)
237 {
238 SelectSet set = sets[i];
239 if (set!=null)
240 set.stop();
241 }
242 super.doStop();
243 }
244
245
246
247
248
249 protected abstract void endPointClosed(SelectChannelEndPoint endpoint);
250
251
252
253
254
255 protected abstract void endPointOpened(SelectChannelEndPoint endpoint);
256
257
258 protected abstract Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint);
259
260
261
262
263
264
265
266
267
268 protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException;
269
270
271 protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment)
272 {
273 Log.warn(ex);
274 }
275
276
277
278
279 public class SelectSet
280 {
281 private transient int _change;
282 private transient List[] _changes;
283 private transient Timeout _idleTimeout;
284 private transient int _nextSet;
285 private transient Timeout _retryTimeout;
286 private transient Selector _selector;
287 private transient int _setID;
288 private transient int _jvmBug;
289 private volatile boolean _selecting;
290
291
292 SelectSet(int acceptorID) throws Exception
293 {
294 _setID=acceptorID;
295
296 _idleTimeout = new Timeout(this);
297 _idleTimeout.setDuration(getMaxIdleTime());
298 _retryTimeout = new Timeout(this);
299 _retryTimeout.setDuration(0L);
300
301
302 _selector = Selector.open();
303 _changes = new ArrayList[] {new ArrayList(),new ArrayList()};
304 _change=0;
305 }
306
307
308 public void addChange(Object point)
309 {
310 synchronized (_changes)
311 {
312 _changes[_change].add(point);
313 }
314 }
315
316
317 public void addChange(SelectableChannel channel, Object att)
318 {
319 if (att==null)
320 addChange(channel);
321 else if (att instanceof EndPoint)
322 addChange(att);
323 else
324 addChange(new ChangeSelectableChannel(channel,att));
325 }
326
327
328 public void cancelIdle(Timeout.Task task)
329 {
330 synchronized (this)
331 {
332 task.cancel();
333 }
334 }
335
336
337
338
339
340
341
342 public void doSelect() throws IOException
343 {
344 SelectionKey key=null;
345
346 try
347 {
348 List changes;
349 final Selector selector;
350 synchronized (_changes)
351 {
352 changes=_changes[_change];
353 _change=_change==0?1:0;
354 _selecting=true;
355 selector=_selector;
356 }
357
358
359
360 for (int i = 0; i < changes.size(); i++)
361 {
362 try
363 {
364 Object o = changes.get(i);
365
366 if (o instanceof EndPoint)
367 {
368
369 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)o;
370 endpoint.doUpdateKey();
371 }
372 else if (o instanceof Runnable)
373 {
374 dispatch((Runnable)o);
375 }
376 else if (o instanceof ChangeSelectableChannel)
377 {
378
379 final ChangeSelectableChannel asc = (ChangeSelectableChannel)o;
380 final SelectableChannel channel=asc._channel;
381 final Object att = asc._attachment;
382
383 if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected())
384 {
385 key = channel.register(selector,SelectionKey.OP_READ,att);
386 SelectChannelEndPoint endpoint = newEndPoint((SocketChannel)channel,this,key);
387 key.attach(endpoint);
388 endpoint.dispatch();
389 }
390 else if (channel.isOpen())
391 {
392 channel.register(selector,SelectionKey.OP_CONNECT,att);
393 }
394 }
395 else if (o instanceof SocketChannel)
396 {
397 final SocketChannel channel=(SocketChannel)o;
398
399 if (channel.isConnected())
400 {
401 key = channel.register(selector,SelectionKey.OP_READ,null);
402 SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
403 key.attach(endpoint);
404 endpoint.dispatch();
405 }
406 else
407 {
408 channel.register(selector,SelectionKey.OP_CONNECT,null);
409 }
410 }
411 else if (o instanceof ServerSocketChannel)
412 {
413 ServerSocketChannel channel = (ServerSocketChannel)o;
414 channel.register(getSelector(),SelectionKey.OP_ACCEPT);
415 }
416 else if (o instanceof ChangeTask)
417 {
418 ((ChangeTask)o).run();
419 }
420 else
421 throw new IllegalArgumentException(o.toString());
422 }
423 catch (CancelledKeyException e)
424 {
425 if (isRunning())
426 Log.warn(e);
427 else
428 Log.debug(e);
429 }
430 }
431 changes.clear();
432
433 long idle_next = 0;
434 long retry_next = 0;
435 long now=System.currentTimeMillis();
436 synchronized (this)
437 {
438 _idleTimeout.setNow(now);
439 _retryTimeout.setNow(now);
440 if (_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections)
441 _idleTimeout.setDuration(_lowResourcesMaxIdleTime);
442 else
443 _idleTimeout.setDuration(_maxIdleTime);
444 idle_next=_idleTimeout.getTimeToNext();
445 retry_next=_retryTimeout.getTimeToNext();
446 }
447
448
449 long wait = 1000L;
450 if (idle_next >= 0 && wait > idle_next)
451 wait = idle_next;
452 if (wait > 0 && retry_next >= 0 && wait > retry_next)
453 wait = retry_next;
454
455
456 if (wait > 10)
457 {
458 long before=now;
459 int selected=selector.select(wait);
460 now = System.currentTimeMillis();
461 _idleTimeout.setNow(now);
462 _retryTimeout.setNow(now);
463
464
465
466
467 if (__JVMBUG_THRESHHOLD>0 && selected==0 && wait>__JVMBUG_THRESHHOLD && (now-before)<(wait/2) )
468 {
469 _jvmBug++;
470 if (_jvmBug>=(__JVMBUG_THRESHHOLD2))
471 {
472 synchronized (this)
473 {
474
475 if (_jvmBug1)
476 Log.debug("seeing JVM BUG(s) - recreating selector");
477 else
478 {
479 _jvmBug1=true;
480 Log.info("seeing JVM BUG(s) - recreating selector");
481 }
482
483 final Selector new_selector = Selector.open();
484 Iterator iterator = _selector.keys().iterator();
485 while (iterator.hasNext())
486 {
487 SelectionKey k = (SelectionKey)iterator.next();
488 if (!k.isValid() || k.interestOps()==0)
489 continue;
490
491 final SelectableChannel channel = k.channel();
492 final Object attachment = k.attachment();
493
494 if (attachment==null)
495 addChange(channel);
496 else
497 addChange(channel,attachment);
498 }
499 _selector.close();
500 _selector=new_selector;
501 _jvmBug=0;
502 return;
503 }
504 }
505 else if (_jvmBug==__JVMBUG_THRESHHOLD || _jvmBug==__JVMBUG_THRESHHOLD1)
506 {
507
508 if (_jvmBug0)
509 Log.debug("seeing JVM BUG(s) - cancelling interestOps==0");
510 else
511 {
512 _jvmBug0=true;
513 Log.info("seeing JVM BUG(s) - cancelling interestOps==0");
514 }
515 Iterator iter = selector.keys().iterator();
516 while(iter.hasNext())
517 {
518 SelectionKey k = (SelectionKey) iter.next();
519 if (k.isValid()&&k.interestOps()==0)
520 {
521 k.cancel();
522 }
523 }
524 return;
525 }
526 }
527 else
528 _jvmBug=0;
529 }
530 else
531 {
532 selector.selectNow();
533 _jvmBug=0;
534 }
535
536
537 if (_selector==null || !selector.isOpen())
538 return;
539
540
541 Iterator iter = selector.selectedKeys().iterator();
542 while (iter.hasNext())
543 {
544 key = (SelectionKey) iter.next();
545
546 try
547 {
548 if (!key.isValid())
549 {
550 key.cancel();
551 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
552 if (endpoint != null)
553 endpoint.doUpdateKey();
554 continue;
555 }
556
557 Object att = key.attachment();
558 if (att instanceof SelectChannelEndPoint)
559 {
560 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)att;
561 endpoint.dispatch();
562 }
563 else if (key.isAcceptable())
564 {
565 SocketChannel channel = acceptChannel(key);
566 if (channel==null)
567 continue;
568
569 channel.configureBlocking(false);
570
571
572 _nextSet=++_nextSet%_selectSet.length;
573
574
575 if (_nextSet==_setID)
576 {
577
578 SelectionKey cKey = channel.register(_selectSet[_nextSet].getSelector(), SelectionKey.OP_READ);
579 SelectChannelEndPoint endpoint=newEndPoint(channel,_selectSet[_nextSet],cKey);
580 cKey.attach(endpoint);
581 if (endpoint != null)
582 endpoint.dispatch();
583 }
584 else
585 {
586
587 _selectSet[_nextSet].addChange(channel);
588 _selectSet[_nextSet].wakeup();
589 }
590 }
591 else if (key.isConnectable())
592 {
593
594 SocketChannel channel = (SocketChannel)key.channel();
595 boolean connected=false;
596 try
597 {
598 connected=channel.finishConnect();
599 }
600 catch(Exception e)
601 {
602 connectionFailed(channel,e,att);
603 }
604 finally
605 {
606 if (connected)
607 {
608 key.interestOps(SelectionKey.OP_READ);
609 SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
610 key.attach(endpoint);
611 endpoint.dispatch();
612 }
613 else
614 {
615 key.cancel();
616 }
617 }
618 }
619 else
620 {
621
622 SocketChannel channel = (SocketChannel)key.channel();
623 SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
624 key.attach(endpoint);
625 if (key.isReadable())
626 endpoint.dispatch();
627 }
628 key = null;
629 }
630 catch (CancelledKeyException e)
631 {
632 Log.ignore(e);
633 }
634 catch (Exception e)
635 {
636 if (isRunning())
637 Log.warn(e);
638 else
639 Log.ignore(e);
640
641 if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
642 {
643 key.interestOps(0);
644
645 key.cancel();
646 }
647 }
648 }
649
650
651 selector.selectedKeys().clear();
652
653
654 _idleTimeout.tick(now);
655 _retryTimeout.tick(now);
656
657 }
658 catch (CancelledKeyException e)
659 {
660 Log.ignore(e);
661 }
662 finally
663 {
664 _selecting=false;
665 }
666 }
667
668
669 public SelectorManager getManager()
670 {
671 return SelectorManager.this;
672 }
673
674
675 public long getNow()
676 {
677 return _idleTimeout.getNow();
678 }
679
680
681 public void scheduleIdle(Timeout.Task task)
682 {
683 synchronized (this)
684 {
685 if (_idleTimeout.getDuration() <= 0)
686 return;
687
688 task.schedule(_idleTimeout);
689 }
690 }
691
692
693 public void scheduleTimeout(Timeout.Task task, long timeout)
694 {
695 synchronized (this)
696 {
697 _retryTimeout.schedule(task, timeout);
698 }
699 }
700
701
702 public void wakeup()
703 {
704 Selector selector = _selector;
705 if (selector!=null)
706 selector.wakeup();
707 }
708
709
710 Selector getSelector()
711 {
712 return _selector;
713 }
714
715
716 void stop() throws Exception
717 {
718 boolean selecting=true;
719 while(selecting)
720 {
721 wakeup();
722 selecting=_selecting;
723 }
724
725 ArrayList keys=new ArrayList(_selector.keys());
726 Iterator iter =keys.iterator();
727
728 while (iter.hasNext())
729 {
730 SelectionKey key = (SelectionKey)iter.next();
731 if (key==null)
732 continue;
733 Object att=key.attachment();
734 if (att instanceof EndPoint)
735 {
736 EndPoint endpoint = (EndPoint)att;
737 try
738 {
739 endpoint.close();
740 }
741 catch(IOException e)
742 {
743 Log.ignore(e);
744 }
745 }
746 }
747
748 synchronized (this)
749 {
750 selecting=_selecting;
751 while(selecting)
752 {
753 wakeup();
754 selecting=_selecting;
755 }
756
757 _idleTimeout.cancelAll();
758 _retryTimeout.cancelAll();
759 try
760 {
761 if (_selector != null)
762 _selector.close();
763 }
764 catch (IOException e)
765 {
766 Log.ignore(e);
767 }
768 _selector=null;
769 }
770 }
771 }
772
773
774 private static class ChangeSelectableChannel
775 {
776 final SelectableChannel _channel;
777 final Object _attachment;
778
779 public ChangeSelectableChannel(SelectableChannel channel, Object attachment)
780 {
781 super();
782 _channel = channel;
783 _attachment = attachment;
784 }
785 }
786
787
788 private interface ChangeTask
789 {
790 public void run();
791 }
792 }