View Javadoc

1   //========================================================================
2   //Copyright 2006-2007 Mort Bay Consulting Pty. Ltd.
3   //------------------------------------------------------------------------
4   //Licensed under the Apache License, Version 2.0 (the "License");
5   //you may not use this file except in compliance with the License.
6   //You may obtain a copy of the License at
7   //http://www.apache.org/licenses/LICENSE-2.0
8   //Unless required by applicable law or agreed to in writing, software
9   //distributed under the License is distributed on an "AS IS" BASIS,
10  //WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  //See the License for the specific language governing permissions and
12  //limitations under the License.
13  //========================================================================
14  package org.mortbay.jetty.client;
15  
16  import java.io.IOException;
17  import java.lang.reflect.Constructor;
18  import java.util.ArrayList;
19  import java.util.LinkedList;
20  import java.util.List;
21  import java.util.concurrent.ArrayBlockingQueue;
22  import javax.servlet.http.Cookie;
23  
24  import org.mortbay.io.Buffer;
25  import org.mortbay.io.ByteArrayBuffer;
26  import org.mortbay.jetty.HttpHeaders;
27  import org.mortbay.jetty.client.security.Authorization;
28  import org.mortbay.jetty.client.security.SecurityListener;
29  import org.mortbay.jetty.servlet.PathMap;
30  import org.mortbay.log.Log;
31  
32  /**
33  * @author Greg Wilkins
34  * @author Guillaume Nodet
35  */
36  public class HttpDestination
37  {
38      private ByteArrayBuffer _hostHeader;
39      private final Address _address;
40      private final LinkedList<HttpConnection> _connections = new LinkedList<HttpConnection>();
41      private final ArrayList<HttpConnection> _idle = new ArrayList<HttpConnection>();
42      private final HttpClient _client;
43      private final boolean _ssl;
44      private int _maxConnections;
45      private int _pendingConnections=0;
46      private ArrayBlockingQueue<Object> _newQueue = new ArrayBlockingQueue<Object>(10,true);
47      private int _newConnection=0;
48      private Address _proxy;
49      private Authorization _proxyAuthentication;
50      private PathMap _authorizations;
51      private List<Cookie> _cookies;
52  
53      public void dump() throws IOException
54      {
55          synchronized (this)
56          {
57              System.err.println(this);
58              System.err.println("connections="+_connections.size());
59              System.err.println("idle="+_idle.size());
60              System.err.println("pending="+_pendingConnections);
61              for (HttpConnection c : _connections)
62              {
63                  if (!c.isIdle())
64                      c.dump();
65              }
66          }
67      }
68  
69      /* The queue of exchanged for this destination if connections are limited */
70      private LinkedList<HttpExchange> _queue=new LinkedList<HttpExchange>();
71  
72      /* ------------------------------------------------------------ */
73      HttpDestination(HttpClient pool, Address address, boolean ssl, int maxConnections)
74      {
75          _client=pool;
76          _address=address;
77          _ssl=ssl;
78          _maxConnections=maxConnections;
79          String addressString = address.getHost();
80          if (address.getPort() != (_ssl ? 443 : 80)) addressString += ":" + address.getPort();
81          _hostHeader = new ByteArrayBuffer(addressString);
82      }
83  
84      /* ------------------------------------------------------------ */
85      public Address getAddress()
86      {
87          return _address;
88      }
89  
90      /* ------------------------------------------------------------ */
91      public Buffer getHostHeader()
92      {
93          return _hostHeader;
94      }
95  
96      /* ------------------------------------------------------------ */
97      public HttpClient getHttpClient()
98      {
99          return _client;
100     }
101 
102     /* ------------------------------------------------------------ */
103     public boolean isSecure()
104     {
105         return _ssl;
106     }
107 
108     /* ------------------------------------------------------------ */
109     public void addAuthorization(String pathSpec,Authorization authorization)
110     {
111         synchronized (this)
112         {
113             if (_authorizations==null)
114                 _authorizations=new PathMap();
115             _authorizations.put(pathSpec,authorization);
116         }
117 
118         // TODO query and remove methods
119     }
120 
121     /* ------------------------------------------------------------------------------- */
122     public void addCookie(Cookie cookie)
123     {
124         synchronized (this)
125         {
126             if (_cookies==null)
127                 _cookies=new ArrayList<Cookie>();
128             _cookies.add(cookie);
129         }
130 
131         // TODO query, remove and age methods
132     }
133 
134     /* ------------------------------------------------------------------------------- */
135     /**
136      * Get a connection. We either get an idle connection if one is available, or
137      * we make a new connection, if we have not yet reached maxConnections. If we
138      * have reached maxConnections, we wait until the number reduces.
139      * @param timeout max time prepared to block waiting to be able to get a connection
140      * @return
141      * @throws IOException
142      */
143     private HttpConnection getConnection(long timeout) throws IOException
144     {
145         HttpConnection connection = null;
146 
147         while ((connection == null) && (connection = getIdleConnection()) == null && timeout>0)
148         {
149             int totalConnections = 0;
150             boolean starting = false;
151             synchronized (this)
152             {
153                 totalConnections = _connections.size() + _pendingConnections;
154                 if (totalConnections < _maxConnections)
155                 {
156                     _newConnection++;
157                     startNewConnection();
158                     starting = true;
159                 }
160             }
161 
162             if (!starting)
163             {
164                 try
165                 {
166                     Thread.currentThread().sleep(200);
167                     timeout-=200;
168                 }
169                 catch (InterruptedException e)
170                 {
171                     Log.ignore(e);
172                 }
173             }
174             else
175             {
176                try
177                {
178                    Object o = _newQueue.take();
179                    if (o instanceof HttpConnection)
180                    {
181                        connection = (HttpConnection)o;
182                    }
183                    else
184                        throw (IOException)o;
185                }
186                catch (InterruptedException e)
187                {
188                    Log.ignore(e);
189                }
190            }
191         }
192         return connection;
193     }
194 
195     /* ------------------------------------------------------------------------------- */
196     public HttpConnection reserveConnection(long timeout) throws IOException
197     {
198         HttpConnection connection = getConnection(timeout);
199         if (connection != null)
200             connection.setReserved(true);
201         return connection;
202     }
203 
204     /* ------------------------------------------------------------------------------- */
205     public HttpConnection getIdleConnection() throws IOException
206     {
207         long now = System.currentTimeMillis();
208         long idleTimeout=_client.getIdleTimeout();
209         HttpConnection connection = null;
210         while (true)
211         {
212             synchronized (this)
213             {
214                 if (connection!=null)
215                 {
216                     _connections.remove(connection);
217                     connection.close();
218                     connection=null;
219                 }
220                 if (_idle.size() > 0)
221                     connection = _idle.remove(_idle.size()-1);
222             }
223 
224             if (connection==null)
225                 return null;
226 
227             long last = connection.getLast();
228             if (connection.getEndPoint().isOpen() && (last==0 || ((now-last)<idleTimeout)) )
229                 return connection;
230 
231         }
232     }
233 
234     /* ------------------------------------------------------------------------------- */
235     protected void startNewConnection()
236     {
237         try
238         {
239             synchronized (this)
240             {
241                 _pendingConnections++;
242             }
243             _client._connector.startConnection(this);
244         }
245         catch(Exception e)
246         {
247             onConnectionFailed(e);
248         }
249     }
250 
251     /* ------------------------------------------------------------------------------- */
252     public void onConnectionFailed(Throwable throwable)
253     {
254         Throwable connect_failure=null;
255 
256         synchronized (this)
257         {
258             _pendingConnections--;
259             if (_newConnection>0)
260             {
261                 connect_failure=throwable;
262                 _newConnection--;
263             }
264             else if (_queue.size()>0)
265             {
266                 HttpExchange ex=_queue.removeFirst();
267                 ex.getEventListener().onConnectionFailed(throwable);
268             }
269         }
270 
271         if(connect_failure!=null)
272         {
273             try
274             {
275                 _newQueue.put(connect_failure);
276             }
277             catch (InterruptedException e)
278             {
279                 Log.ignore(e);
280             }
281         }
282     }
283 
284     /* ------------------------------------------------------------------------------- */
285     public void onException(Throwable throwable)
286     {
287         synchronized (this)
288         {
289             _pendingConnections--;
290             if (_queue.size()>0)
291             {
292                 HttpExchange ex=_queue.removeFirst();
293                 ex.getEventListener().onException(throwable);
294                 ex.setStatus(HttpExchange.STATUS_EXCEPTED);
295             }
296         }
297     }
298 
299     /* ------------------------------------------------------------------------------- */
300     public void onNewConnection(HttpConnection connection) throws IOException
301     {
302         HttpConnection q_connection=null;
303 
304         synchronized (this)
305         {
306             _pendingConnections--;
307             _connections.add(connection);
308 
309             if (_newConnection>0)
310             {
311                 q_connection=connection;
312                 _newConnection--;
313             }
314             else if (_queue.size()==0)
315             {
316                 _idle.add(connection);
317             }
318             else
319             {
320                 HttpExchange ex=_queue.removeFirst();
321                 connection.send(ex);
322             }
323         }
324 
325         if (q_connection!=null)
326         {
327             try
328             {
329                 _newQueue.put(q_connection);
330             }
331             catch (InterruptedException e)
332             {
333                 Log.ignore(e);
334             }
335         }
336     }
337 
338     /* ------------------------------------------------------------------------------- */
339     public void returnConnection(HttpConnection connection, boolean close) throws IOException
340     {
341         if (connection.isReserved())
342             connection.setReserved(false);
343 
344         if (close)
345         {
346             try
347             {
348                 connection.close();
349             }
350             catch(IOException e)
351             {
352                 Log.ignore(e);
353             }
354         }
355 
356         if (!_client.isStarted())
357             return;
358 
359         if (!close && connection.getEndPoint().isOpen())
360         {
361             synchronized (this)
362             {
363                 if (_queue.size()==0)
364                 {
365                     connection.setLast(System.currentTimeMillis());
366                     _idle.add(connection);
367                 }
368                 else
369                 {
370                     HttpExchange ex = _queue.removeFirst();
371                     connection.send(ex);
372                 }
373                 this.notifyAll();
374             }
375         }
376         else
377         {
378             synchronized (this)
379             {
380                 _connections.remove(connection);
381                 if (!_queue.isEmpty())
382                     startNewConnection();
383             }
384         }
385     }
386 
387     /* ------------------------------------------------------------ */
388     public void send(HttpExchange ex) throws IOException
389     {
390         LinkedList<String> listeners = _client.getRegisteredListeners();
391 
392         if (listeners != null)
393         {
394             // Add registered listeners, fail if we can't load them
395             for (int i = listeners.size(); i > 0; --i)
396             {
397                 String listenerClass = listeners.get(i - 1);
398 
399                 try
400                 {
401                     Class listener = Class.forName(listenerClass);
402                     Constructor constructor = listener.getDeclaredConstructor(HttpDestination.class, HttpExchange.class);
403                     HttpEventListener elistener = (HttpEventListener) constructor.newInstance(this, ex);
404                     ex.setEventListener(elistener);
405                 }
406                 catch (Exception e)
407                 {
408                     Log.debug(e);
409                     throw new IOException("Unable to instantiate registered listener for destination: " + listenerClass );
410                 }
411             }
412         }
413 
414         // Security is supported by default and should be the first consulted
415         if ( _client.hasRealms() )
416         {
417             ex.setEventListener( new SecurityListener( this, ex ) );
418         }
419 
420         doSend(ex);
421     }
422 
423     /* ------------------------------------------------------------ */
424     public void resend(HttpExchange ex) throws IOException
425     {
426         ex.getEventListener().onRetry();
427         ex.reset();
428         doSend(ex);
429     }
430 
431     /* ------------------------------------------------------------ */
432     protected void doSend(HttpExchange ex) throws IOException
433     {
434         // add cookies
435         // TODO handle max-age etc.
436         if (_cookies!=null)
437         {
438             StringBuilder buf=null;
439             for (Cookie cookie : _cookies)
440             {
441                 if (buf==null)
442                     buf=new StringBuilder();
443                 else
444                     buf.append("; ");
445                 buf.append(cookie.getName()); // TODO quotes
446                 buf.append("=");
447                 buf.append(cookie.getValue()); // TODO quotes
448             }
449             if (buf!=null)
450                 ex.addRequestHeader(HttpHeaders.COOKIE,buf.toString());
451         }
452 
453         // Add any known authorizations
454         if (_authorizations!=null)
455         {
456             Authorization auth= (Authorization)_authorizations.match(ex.getURI());
457             if (auth !=null)
458                 ((Authorization)auth).setCredentials(ex);
459         }
460 
461         HttpConnection connection = getIdleConnection();
462         if (connection != null)
463         {
464             boolean sent = connection.send(ex);
465             if (!sent) connection = null;
466         }
467 
468         if (connection == null)
469         {
470             synchronized (this)
471             {
472                 _queue.add(ex);
473                 if (_connections.size() + _pendingConnections < _maxConnections)
474                 {
475                      startNewConnection();
476                 }
477             }
478         }
479     }
480 
481     /* ------------------------------------------------------------ */
482     public synchronized String toString()
483     {
484         return "HttpDestination@" + hashCode() + "//" + _address.getHost() + ":" + _address.getPort() + "(" + _connections.size() + "," + _idle.size() + "," + _queue.size() + ")";
485     }
486 
487     /* ------------------------------------------------------------ */
488     public synchronized String toDetailString()
489     {
490         StringBuilder b = new StringBuilder();
491         b.append(toString());
492         b.append('\n');
493         synchronized(this)
494         {
495             for (HttpConnection connection : _connections)
496             {
497                 if (connection._exchange!=null)
498                 {
499                     b.append(connection.toDetailString());
500                     if (_idle.contains(connection))
501                         b.append(" IDLE");
502                     b.append('\n');
503                 }
504             }
505         }
506         b.append("--");
507         b.append('\n');
508 
509         return b.toString();
510     }
511 
512     /* ------------------------------------------------------------ */
513     public void setProxy(Address proxy)
514     {
515         _proxy=proxy;
516     }
517 
518     /* ------------------------------------------------------------ */
519     public Address getProxy()
520     {
521         return _proxy;
522     }
523 
524     /* ------------------------------------------------------------ */
525     public Authorization getProxyAuthentication()
526     {
527         return _proxyAuthentication;
528     }
529 
530     /* ------------------------------------------------------------ */
531     public void setProxyAuthentication(Authorization authentication)
532     {
533         _proxyAuthentication = authentication;
534     }
535 
536     /* ------------------------------------------------------------ */
537     public boolean isProxied()
538     {
539         return _proxy!=null;
540     }
541 
542     /* ------------------------------------------------------------ */
543     public void close() throws IOException
544     {
545         synchronized (this)
546         {
547             for (HttpConnection connection : _connections)
548             {
549                 connection.close();
550             }
551         }
552     }
553 
554 }