View Javadoc

1   //========================================================================
2   //Copyright 2006-2007 Mort Bay Consulting Pty. Ltd.
3   //------------------------------------------------------------------------
4   //Licensed under the Apache License, Version 2.0 (the "License");
5   //you may not use this file except in compliance with the License.
6   //You may obtain a copy of the License at
7   //http://www.apache.org/licenses/LICENSE-2.0
8   //Unless required by applicable law or agreed to in writing, software
9   //distributed under the License is distributed on an "AS IS" BASIS,
10  //WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  //See the License for the specific language governing permissions and
12  //limitations under the License.
13  //========================================================================
14  package org.mortbay.jetty.client;
15  
16  import java.io.IOException;
17  import java.lang.reflect.Constructor;
18  import java.util.ArrayList;
19  import java.util.LinkedList;
20  import java.util.List;
21  import java.util.concurrent.ArrayBlockingQueue;
22  import javax.servlet.http.Cookie;
23  
24  import org.mortbay.io.Buffer;
25  import org.mortbay.io.ByteArrayBuffer;
26  import org.mortbay.jetty.HttpHeaders;
27  import org.mortbay.jetty.client.security.Authorization;
28  import org.mortbay.jetty.client.security.SecurityListener;
29  import org.mortbay.jetty.servlet.PathMap;
30  import org.mortbay.log.Log;
31  
32  /**
33  * @author Greg Wilkins
34  * @author Guillaume Nodet
35  */
36  public class HttpDestination
37  {
38      private ByteArrayBuffer _hostHeader;
39      private final Address _address;
40      private final LinkedList<HttpConnection> _connections = new LinkedList<HttpConnection>();
41      private final ArrayList<HttpConnection> _idle = new ArrayList<HttpConnection>();
42      private final HttpClient _client;
43      private final boolean _ssl;
44      private int _maxConnections;
45      private int _pendingConnections=0;
46      private ArrayBlockingQueue<Object> _newQueue = new ArrayBlockingQueue<Object>(10,true);
47      private int _newConnection=0;
48      private Address _proxy;
49      private Authorization _proxyAuthentication;
50      private PathMap _authorizations;
51      private List<Cookie> _cookies;
52  
53      public void dump() throws IOException
54      {
55          synchronized (this)
56          {
57              System.err.println(this);
58              System.err.println("connections="+_connections.size());
59              System.err.println("idle="+_idle.size());
60              System.err.println("pending="+_pendingConnections);
61              for (HttpConnection c : _connections)
62              {
63                  if (!c.isIdle())
64                      c.dump();
65              }
66          }
67      }
68  
69      /* The queue of exchanged for this destination if connections are limited */
70      private LinkedList<HttpExchange> _queue=new LinkedList<HttpExchange>();
71  
72      /* ------------------------------------------------------------ */
73      HttpDestination(HttpClient pool, Address address, boolean ssl, int maxConnections)
74      {
75          _client=pool;
76          _address=address;
77          _ssl=ssl;
78          _maxConnections=maxConnections;
79          String addressString = address.getHost();
80          if (address.getPort() != (_ssl ? 443 : 80)) addressString += ":" + address.getPort();
81          _hostHeader = new ByteArrayBuffer(addressString);
82      }
83  
84      /* ------------------------------------------------------------ */
85      public Address getAddress()
86      {
87          return _address;
88      }
89  
90      /* ------------------------------------------------------------ */
91      public Buffer getHostHeader()
92      {
93          return _hostHeader;
94      }
95  
96      /* ------------------------------------------------------------ */
97      public HttpClient getHttpClient()
98      {
99          return _client;
100     }
101 
102     /* ------------------------------------------------------------ */
103     public boolean isSecure()
104     {
105         return _ssl;
106     }
107 
108     /* ------------------------------------------------------------ */
109     public void addAuthorization(String pathSpec,Authorization authorization)
110     {
111         synchronized (this)
112         {
113             if (_authorizations==null)
114                 _authorizations=new PathMap();
115             _authorizations.put(pathSpec,authorization);
116         }
117 
118         // TODO query and remove methods
119     }
120 
121     /* ------------------------------------------------------------------------------- */
122     public void addCookie(Cookie cookie)
123     {
124         synchronized (this)
125         {
126             if (_cookies==null)
127                 _cookies=new ArrayList<Cookie>();
128             _cookies.add(cookie);
129         }
130 
131         // TODO query, remove and age methods
132     }
133 
134     /* ------------------------------------------------------------------------------- */
135     /**
136      * Get a connection. We either get an idle connection if one is available, or
137      * we make a new connection, if we have not yet reached maxConnections. If we
138      * have reached maxConnections, we wait until the number reduces.
139      * @param timeout max time prepared to block waiting to be able to get a connection
140      * @return
141      * @throws IOException
142      */
143     private HttpConnection getConnection(long timeout) throws IOException
144     {
145         HttpConnection connection = null;
146 
147         while ((connection == null) && (connection = getIdleConnection()) == null && timeout>0)
148         {
149             int totalConnections = 0;
150             boolean starting = false;
151             synchronized (this)
152             {
153                 totalConnections = _connections.size() + _pendingConnections;
154                 if (totalConnections < _maxConnections)
155                 {
156                     _newConnection++;
157                     startNewConnection();
158                     starting = true;
159                 }
160             }
161 
162             if (!starting)
163             {
164                 try
165                 {
166                     Thread.currentThread().sleep(200);
167                     timeout-=200;
168                 }
169                 catch (InterruptedException e)
170                 {
171                     Log.ignore(e);
172                 }
173             }
174             else
175             {
176                try
177                {
178                    Object o = _newQueue.take();
179                    if (o instanceof HttpConnection)
180                    {
181                        connection = (HttpConnection)o;
182                    }
183                    else
184                        throw (IOException)o;
185                }
186                catch (InterruptedException e)
187                {
188                    Log.ignore(e);
189                }
190            }
191         }
192         return connection;
193     }
194 
195     /* ------------------------------------------------------------------------------- */
196     public HttpConnection reserveConnection(long timeout) throws IOException
197     {
198         HttpConnection connection = getConnection(timeout);
199         if (connection != null)
200             connection.setReserved(true);
201         return connection;
202     }
203 
204     /* ------------------------------------------------------------------------------- */
205     public HttpConnection getIdleConnection() throws IOException
206     {
207         long now = System.currentTimeMillis();
208         long idleTimeout=_client.getIdleTimeout();
209         HttpConnection connection = null;
210         while (true)
211         {
212             synchronized (this)
213             {
214                 if (connection!=null)
215                 {
216                     _connections.remove(connection);
217                     connection.close();
218                     connection=null;
219                 }
220                 if (_idle.size() > 0)
221                     connection = _idle.remove(_idle.size()-1);
222             }
223 
224             if (connection==null)
225                 return null;
226 
227             long last = connection.getLast();
228             if (connection.getEndPoint().isOpen() && (last==0 || ((now-last)<idleTimeout)) )
229                 return connection;
230 
231         }
232     }
233 
234     /* ------------------------------------------------------------------------------- */
235     protected void startNewConnection()
236     {
237         try
238         {
239             synchronized (this)
240             {
241                 _pendingConnections++;
242             }
243             _client._connector.startConnection(this);
244         }
245         catch(Exception e)
246         {
247             onConnectionFailed(e);
248         }
249     }
250 
251     /* ------------------------------------------------------------------------------- */
252     public void onConnectionFailed(Throwable throwable)
253     {
254         Throwable connect_failure=null;
255 
256         synchronized (this)
257         {
258             _pendingConnections--;
259             if (_newConnection>0)
260             {
261                 connect_failure=throwable;
262                 _newConnection--;
263             }
264             else if (_queue.size()>0)
265             {
266                 HttpExchange ex=_queue.removeFirst();
267                 ex.getEventListener().onConnectionFailed(throwable);
268             }
269         }
270 
271         if(connect_failure!=null)
272         {
273             try
274             {
275                 _newQueue.put(connect_failure);
276             }
277             catch (InterruptedException e)
278             {
279                 Log.ignore(e);
280             }
281         }
282     }
283 
284     /* ------------------------------------------------------------------------------- */
285     public void onException(Throwable throwable)
286     {
287         synchronized (this)
288         {
289             _pendingConnections--;
290             if (_queue.size()>0)
291             {
292                 HttpExchange ex=_queue.removeFirst();
293                 ex.getEventListener().onException(throwable);
294                 ex.setStatus(HttpExchange.STATUS_EXCEPTED);
295             }
296         }
297     }
298 
299     /* ------------------------------------------------------------------------------- */
300     public void onNewConnection(HttpConnection connection) throws IOException
301     {
302         HttpConnection q_connection=null;
303 
304         synchronized (this)
305         {
306             _pendingConnections--;
307             _connections.add(connection);
308 
309             if (_newConnection>0)
310             {
311                 q_connection=connection;
312                 _newConnection--;
313             }
314             else if (_queue.size()==0)
315             {
316                 _idle.add(connection);
317             }
318             else
319             {
320                 HttpExchange ex=_queue.removeFirst();
321                 connection.send(ex);
322             }
323         }
324 
325         if (q_connection!=null)
326         {
327             try
328             {
329                 _newQueue.put(q_connection);
330             }
331             catch (InterruptedException e)
332             {
333                 Log.ignore(e);
334             }
335         }
336     }
337 
338     /* ------------------------------------------------------------------------------- */
339     public void returnConnection(HttpConnection connection, boolean close) throws IOException
340     {
341         if (connection.isReserved())
342             connection.setReserved(false);
343 
344         if (close)
345         {
346             try
347             {
348                 connection.close();
349             }
350             catch(IOException e)
351             {
352                 Log.ignore(e);
353             }
354         }
355 
356         if (!_client.isStarted())
357             return;
358 
359         if (!close && connection.getEndPoint().isOpen())
360         {
361             synchronized (this)
362             {
363                 if (_queue.size()==0)
364                 {
365                     connection.setLast(System.currentTimeMillis());
366                     _idle.add(connection);
367                 }
368                 else
369                 {
370                     HttpExchange ex = _queue.removeFirst();
371                     connection.send(ex);
372                 }
373                 this.notifyAll();
374             }
375         }
376         else
377         {
378             synchronized (this)
379             {
380                 _connections.remove(connection);
381                 if (!_queue.isEmpty())
382                     startNewConnection();
383             }
384         }
385     }
386 
387     /* ------------------------------------------------------------ */
388     public void send(HttpExchange ex) throws IOException
389     {
390         LinkedList<String> listeners = _client.getRegisteredListeners();
391 
392         if (listeners != null)
393         {
394             // Add registered listeners, fail if we can't load them
395             for (int i = listeners.size(); i > 0; --i)
396             {
397                 String listenerClass = listeners.get(i - 1);
398 
399                 try
400                 {
401                     Class listener = Class.forName(listenerClass);
402                     Constructor constructor = listener.getDeclaredConstructor(HttpDestination.class, HttpExchange.class);
403                     HttpEventListener elistener = (HttpEventListener) constructor.newInstance(this, ex);
404                     ex.setEventListener(elistener);
405                 }
406                 catch (Exception e)
407                 {
408                     Log.debug(e);
409                     throw new IOException("Unable to instantiate registered listener for destination: " + listenerClass );
410                 }
411             }
412         }
413 
414         // Security is supported by default and should be the first consulted
415         if ( _client.hasRealms() )
416         {
417             ex.setEventListener( new SecurityListener( this, ex ) );
418         }
419 
420         doSend(ex);
421     }
422 
423     /* ------------------------------------------------------------ */
424     public void resend(HttpExchange ex) throws IOException
425     {
426         ex.getEventListener().onRetry();
427         doSend(ex);
428     }
429 
430     /* ------------------------------------------------------------ */
431     protected void doSend(HttpExchange ex) throws IOException
432     {
433         // add cookies
434         // TODO handle max-age etc.
435         if (_cookies!=null)
436         {
437             StringBuilder buf=null;
438             for (Cookie cookie : _cookies)
439             {
440                 if (buf==null)
441                     buf=new StringBuilder();
442                 else
443                     buf.append("; ");
444                 buf.append(cookie.getName()); // TODO quotes
445                 buf.append("=");
446                 buf.append(cookie.getValue()); // TODO quotes
447             }
448             if (buf!=null)
449                 ex.addRequestHeader(HttpHeaders.COOKIE,buf.toString());
450         }
451 
452         // Add any known authorizations
453         if (_authorizations!=null)
454         {
455             Authorization auth= (Authorization)_authorizations.match(ex.getURI());
456             if (auth !=null)
457                 ((Authorization)auth).setCredentials(ex);
458         }
459 
460         HttpConnection connection = getIdleConnection();
461         if (connection != null)
462         {
463             boolean sent = connection.send(ex);
464             if (!sent) connection = null;
465         }
466 
467         if (connection == null)
468         {
469             synchronized (this)
470             {
471                 _queue.add(ex);
472                 if (_connections.size() + _pendingConnections < _maxConnections)
473                 {
474                      startNewConnection();
475                 }
476             }
477         }
478     }
479 
480     /* ------------------------------------------------------------ */
481     public synchronized String toString()
482     {
483         return "HttpDestination@" + hashCode() + "//" + _address.getHost() + ":" + _address.getPort() + "(" + _connections.size() + "," + _idle.size() + "," + _queue.size() + ")";
484     }
485 
486     /* ------------------------------------------------------------ */
487     public synchronized String toDetailString()
488     {
489         StringBuilder b = new StringBuilder();
490         b.append(toString());
491         b.append('\n');
492         synchronized(this)
493         {
494             for (HttpConnection connection : _connections)
495             {
496                 if (connection._exchange!=null)
497                 {
498                     b.append(connection.toDetailString());
499                     if (_idle.contains(connection))
500                         b.append(" IDLE");
501                     b.append('\n');
502                 }
503             }
504         }
505         b.append("--");
506         b.append('\n');
507 
508         return b.toString();
509     }
510 
511     /* ------------------------------------------------------------ */
512     public void setProxy(Address proxy)
513     {
514         _proxy=proxy;
515     }
516 
517     /* ------------------------------------------------------------ */
518     public Address getProxy()
519     {
520         return _proxy;
521     }
522 
523     /* ------------------------------------------------------------ */
524     public Authorization getProxyAuthentication()
525     {
526         return _proxyAuthentication;
527     }
528 
529     /* ------------------------------------------------------------ */
530     public void setProxyAuthentication(Authorization authentication)
531     {
532         _proxyAuthentication = authentication;
533     }
534 
535     /* ------------------------------------------------------------ */
536     public boolean isProxied()
537     {
538         return _proxy!=null;
539     }
540 
541     /* ------------------------------------------------------------ */
542     public void close() throws IOException
543     {
544         synchronized (this)
545         {
546             for (HttpConnection connection : _connections)
547             {
548                 connection.close();
549             }
550         }
551     }
552 
553 }