View Javadoc

1   // ========================================================================
2   // Copyright 2006-2007 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // Licensed under the Apache License, Version 2.0 (the "License");
5   // you may not use this file except in compliance with the License.
6   // You may obtain a copy of the License at
7   // http://www.apache.org/licenses/LICENSE-2.0
8   // Unless required by applicable law or agreed to in writing, software
9   // distributed under the License is distributed on an "AS IS" BASIS,
10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  // See the License for the specific language governing permissions and
12  // limitations under the License.
13  // ========================================================================
14  
15  package org.mortbay.jetty.client;
16  
17  import java.io.IOException;
18  import java.io.InputStream;
19  import java.io.InterruptedIOException;
20  
21  import org.mortbay.io.Buffer;
22  import org.mortbay.io.Buffers;
23  import org.mortbay.io.ByteArrayBuffer;
24  import org.mortbay.io.Connection;
25  import org.mortbay.io.EndPoint;
26  import org.mortbay.io.nio.SelectChannelEndPoint;
27  import org.mortbay.jetty.HttpGenerator;
28  import org.mortbay.jetty.HttpHeaderValues;
29  import org.mortbay.jetty.HttpHeaders;
30  import org.mortbay.jetty.HttpParser;
31  import org.mortbay.jetty.HttpSchemes;
32  import org.mortbay.jetty.HttpVersions;
33  import org.mortbay.jetty.client.security.Authorization;
34  import org.mortbay.jetty.security.SslHttpChannelEndPoint;
35  import org.mortbay.log.Log;
36  import org.mortbay.thread.Timeout;
37  
38  /**
39   *
40   * @author Greg Wilkins
41   * @author Guillaume Nodet
42   */
43  public class HttpConnection implements Connection
44  {
45      HttpDestination _destination;
46      EndPoint _endp;
47      HttpGenerator _generator;
48      HttpParser _parser;
49      boolean _http11 = true;
50      Buffer _connectionHeader;
51      Buffer _requestContentChunk;
52      long _last;
53      boolean _requestComplete;
54      public String _message;
55      public boolean _reserved;
56  
57      /* The current exchange waiting for a response */
58      volatile HttpExchange _exchange;
59      HttpExchange _pipeline;
60  
61      public void dump() throws IOException
62      {
63          System.err.println("endp=" + _endp + " " + _endp.isBufferingInput() + " " + _endp.isBufferingOutput());
64          System.err.println("generator=" + _generator);
65          System.err.println("parser=" + _parser.getState() + " " + _parser.isMoreInBuffer());
66          System.err.println("exchange=" + _exchange);
67          if (_endp instanceof SslHttpChannelEndPoint)
68              ((SslHttpChannelEndPoint)_endp).dump();
69      }
70  
71      Timeout.Task _timeout = new Timeout.Task()
72      {
73          public void expired()
74          {
75              HttpExchange ex=null;
76              try
77              {
78                  synchronized (HttpConnection.this)
79                  {
80                      ex = _exchange;
81                      _exchange = null;
82                      if (ex != null)
83                          _destination.returnConnection(HttpConnection.this,true);
84                  }
85              }
86              catch (Exception e)
87              {
88                  Log.debug(e);
89              }
90              finally
91              {
92                  try
93                  {
94                      close();
95                  }
96                  catch (IOException e)
97                  {
98                      Log.ignore(e);
99                  }
100 
101                 if (ex!=null && ex.getStatus() < HttpExchange.STATUS_COMPLETED)
102                 {
103                     ex.setStatus(HttpExchange.STATUS_EXPIRED);
104                 }
105             }
106         }
107     };
108 
109     /* ------------------------------------------------------------ */
110     HttpConnection(Buffers buffers, EndPoint endp, int hbs, int cbs)
111     {
112         _endp = endp;
113         _generator = new HttpGenerator(buffers,endp,hbs,cbs);
114         _parser = new HttpParser(buffers,endp,new Handler(),hbs,cbs);
115     }
116 
117     public void setReserved (boolean reserved)
118     {
119         _reserved = reserved;
120     }
121 
122     public boolean isReserved()
123     {
124         return _reserved;
125     }
126 
127     /* ------------------------------------------------------------ */
128     public HttpDestination getDestination()
129     {
130         return _destination;
131     }
132 
133     /* ------------------------------------------------------------ */
134     public void setDestination(HttpDestination destination)
135     {
136         _destination = destination;
137     }
138 
139     /* ------------------------------------------------------------ */
140     public boolean send(HttpExchange ex) throws IOException
141     {
142         // _message =
143         // Thread.currentThread().getName()+": Generator instance="+_generator
144         // .hashCode()+" state= "+_generator.getState()+" _exchange="+_exchange;
145         synchronized (this)
146         {
147             if (_exchange != null)
148             {
149                 if (_pipeline != null)
150                     throw new IllegalStateException(this + " PIPELINED!!!  _exchange=" + _exchange);
151                 _pipeline = ex;
152                 return true;
153             }
154 
155             if (!_endp.isOpen())
156                 return false;
157 
158             ex.setStatus(HttpExchange.STATUS_WAITING_FOR_COMMIT);
159             _exchange = ex;
160 
161             if (_endp.isBlocking())
162                 this.notify();
163             else
164             {
165                 SelectChannelEndPoint scep = (SelectChannelEndPoint)_endp;
166                 scep.scheduleWrite();
167             }
168 
169             if (!_endp.isBlocking())
170                 _destination.getHttpClient().schedule(_timeout);
171 
172             return true;
173         }
174     }
175 
176     /* ------------------------------------------------------------ */
177     public void handle() throws IOException
178     {
179         int no_progress = 0;
180         long flushed = 0;
181 
182         boolean failed = false;
183         while (_endp.isBufferingInput() || _endp.isOpen())
184         {
185             synchronized (this)
186             {
187                 while (_exchange == null)
188                 {
189                     if (_endp.isBlocking())
190                     {
191                         try
192                         {
193                             this.wait();
194                         }
195                         catch (InterruptedException e)
196                         {
197                             throw new InterruptedIOException();
198                         }
199                     }
200                     else
201                     {
202                         // Hopefully just space?
203                         _parser.fill();
204                         _parser.skipCRLF();
205                         if (_parser.isMoreInBuffer())
206                         {
207                             Log.warn("unexpected data");
208                             close();
209                         }
210 
211                         return;
212                     }
213                 }
214             }
215             if (_exchange.getStatus() == HttpExchange.STATUS_WAITING_FOR_COMMIT)
216             {
217                 no_progress = 0;
218                 commitRequest();
219             }
220 
221             try
222             {
223                 long io = 0;
224                 _endp.flush();
225 
226                 if (_generator.isComplete())
227                 {
228                     if (!_requestComplete)
229                     {
230                         _requestComplete = true;
231                         _exchange.getEventListener().onRequestComplete();
232                     }
233                 }
234                 else
235                 {
236                     // Write as much of the request as possible
237                     synchronized (this)
238                     {
239                         if (_exchange == null)
240                             continue;
241                         flushed = _generator.flush();
242                         io += flushed;
243                     }
244 
245                     if (!_generator.isComplete())
246                     {
247                         InputStream in = _exchange.getRequestContentSource();
248                         if (in != null)
249                         {
250                             if (_requestContentChunk == null || _requestContentChunk.length() == 0)
251                             {
252                                 _requestContentChunk = _exchange.getRequestContentChunk();
253                                 if (_requestContentChunk != null)
254                                     _generator.addContent(_requestContentChunk,false);
255                                 else
256                                     _generator.complete();
257                                 io += _generator.flush();
258                             }
259                         }
260                         else
261                             _generator.complete();
262                     }
263                 }
264 
265 
266                 // If we are not ended then parse available
267                 if (!_parser.isComplete() && _generator.isCommitted())
268                 {
269                     long filled = _parser.parseAvailable();
270                     io += filled;
271                 }
272 
273                 if (io > 0)
274                     no_progress = 0;
275                 else if (no_progress++ >= 2 && !_endp.isBlocking())
276                 {
277                     // SSL may need an extra flush as it may have made "no progress" while actually doing a handshake.
278                     if (_endp instanceof SslHttpChannelEndPoint && !_generator.isComplete() && !_generator.isEmpty())
279                     {
280                         if (_generator.flush()>0)
281                             continue;
282                     }
283                     return;
284                 }
285             }
286             catch (Throwable e)
287             {
288                 if (e instanceof ThreadDeath)
289                     throw (ThreadDeath)e;
290                 
291                 synchronized (this)
292                 {
293                     if (_exchange != null)
294                     {
295                         _exchange.getEventListener().onException(e);
296                         _exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
297                     }
298                 }
299                 failed = true;
300                 if (e instanceof IOException)
301                     throw (IOException)e;
302  
303                 if (e instanceof Error)
304                     throw (Error)e;
305                 
306                 if (e instanceof RuntimeException)
307                     throw (RuntimeException)e;
308                 
309                throw new RuntimeException(e);
310             }
311             finally
312             {
313                 boolean complete = false;
314                 boolean close = failed; // always close the connection on error
315                 if (!failed)
316                 {
317                     // are we complete?
318                     if (_generator.isComplete())
319                     {
320                         if (!_requestComplete)
321                         {
322                             _requestComplete = true;
323                             _exchange.getEventListener().onRequestComplete();
324                         }
325 
326                         // we need to return the HttpConnection to a state that
327                         // it can be reused or closed out
328                         if (_parser.isComplete())
329                         {
330                             _destination.getHttpClient().cancel(_timeout);
331                             complete = true;
332                         }
333                     }
334                 }
335 
336                 if (complete || failed)
337                 {
338                     synchronized (this)
339                     {
340                         if (!close)
341                             close = shouldClose();
342 
343                         reset(true);
344                         no_progress = 0;
345                         flushed = -1;
346                         if (_exchange != null)
347                         {
348                             _exchange = null;
349 
350                             if (_pipeline == null)
351                             {
352                                 if (!isReserved())
353                                     _destination.returnConnection(this,close);
354                                 if (close)
355                                     return;
356                             }
357                             else
358                             {
359                                 if (close)
360                                 {
361                                     if (!isReserved())
362                                         _destination.returnConnection(this,close);
363                                     _destination.send(_pipeline);
364                                     _pipeline = null;
365                                     return;
366                                 }
367 
368                                 HttpExchange ex = _pipeline;
369                                 _pipeline = null;
370 
371                                 send(ex);
372                             }
373                         }
374                     }
375                 }
376             }
377         }
378     }
379 
380     /* ------------------------------------------------------------ */
381     public boolean isIdle()
382     {
383         synchronized (this)
384         {
385             return _exchange == null;
386         }
387     }
388 
389     /* ------------------------------------------------------------ */
390     public EndPoint getEndPoint()
391     {
392         return _endp;
393     }
394 
395     /* ------------------------------------------------------------ */
396     private void commitRequest() throws IOException
397     {
398         synchronized (this)
399         {
400             if (_exchange.getStatus() != HttpExchange.STATUS_WAITING_FOR_COMMIT)
401                 throw new IllegalStateException();
402 
403             _exchange.setStatus(HttpExchange.STATUS_SENDING_REQUEST);
404             _generator.setVersion(_exchange._version);
405 
406             String uri = _exchange._uri;
407             if (_destination.isProxied() && uri.startsWith("/"))
408             {
409                 // TODO suppress port 80 or 443
410                 uri = (_destination.isSecure()?HttpSchemes.HTTPS:HttpSchemes.HTTP) + "://" + _destination.getAddress().getHost() + ":"
411                         + _destination.getAddress().getPort() + uri;
412                 Authorization auth = _destination.getProxyAuthentication();
413                 if (auth != null)
414                     auth.setCredentials(_exchange);
415             }
416 
417             _generator.setRequest(_exchange._method,uri);
418 
419             if (_exchange._version >= HttpVersions.HTTP_1_1_ORDINAL)
420             {
421                 if (!_exchange._requestFields.containsKey(HttpHeaders.HOST_BUFFER))
422                     _exchange._requestFields.add(HttpHeaders.HOST_BUFFER,_destination.getHostHeader());
423             }
424 
425             if (_exchange._requestContent != null)
426             {
427                 _exchange._requestFields.putLongField(HttpHeaders.CONTENT_LENGTH,_exchange._requestContent.length());
428                 _generator.completeHeader(_exchange._requestFields,false);
429                 _generator.addContent(_exchange._requestContent,true);
430             }
431             else if (_exchange._requestContentSource != null)
432             {
433                 _generator.completeHeader(_exchange._requestFields,false);
434                 int available = _exchange._requestContentSource.available();
435                 if (available > 0)
436                 {
437                     // TODO deal with any known content length
438 
439                     // TODO reuse this buffer!
440                     byte[] buf = new byte[available];
441                     int length = _exchange._requestContentSource.read(buf);
442                     _generator.addContent(new ByteArrayBuffer(buf,0,length),false);
443                 }
444             }
445             else
446             {
447                 _exchange._requestFields.remove(HttpHeaders.CONTENT_LENGTH); // TODO
448                 // :
449                 // should
450                 // not
451                 // be
452                 // needed
453                 _generator.completeHeader(_exchange._requestFields,true);
454             }
455 
456             _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
457         }
458     }
459 
460     /* ------------------------------------------------------------ */
461     protected void reset(boolean returnBuffers) throws IOException
462     {
463         _requestComplete = false;
464         _connectionHeader = null;
465         _parser.reset(returnBuffers);
466         _generator.reset(returnBuffers);
467         _http11 = true;
468     }
469 
470     /* ------------------------------------------------------------ */
471     private boolean shouldClose()
472     {
473         if (_connectionHeader!=null)
474         {
475             if (HttpHeaderValues.CLOSE_BUFFER.equals(_connectionHeader))
476                 return true;
477             if (HttpHeaderValues.KEEP_ALIVE_BUFFER.equals(_connectionHeader))
478                 return false;
479         }
480         return !_http11;
481     }
482 
483     /* ------------------------------------------------------------ */
484     private class Handler extends HttpParser.EventHandler
485     {
486         @Override
487         public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
488         {
489             // System.out.println( method.toString() + "///" + url.toString() +
490             // "///" + version.toString() );
491             // TODO validate this is acceptable, the <!DOCTYPE goop was coming
492             // out here
493             // throw new IllegalStateException();
494         }
495 
496         @Override
497         public void startResponse(Buffer version, int status, Buffer reason) throws IOException
498         {
499             HttpExchange exchange = _exchange;
500             if (exchange!=null)
501             {
502                 _http11 = HttpVersions.HTTP_1_1_BUFFER.equals(version);
503                 exchange.getEventListener().onResponseStatus(version,status,reason);
504                 exchange.setStatus(HttpExchange.STATUS_PARSING_HEADERS);
505             }
506         }
507 
508         @Override
509         public void parsedHeader(Buffer name, Buffer value) throws IOException
510         {
511             HttpExchange exchange = _exchange;
512             if (exchange!=null)
513             {
514                 if (HttpHeaders.CACHE.getOrdinal(name) == HttpHeaders.CONNECTION_ORDINAL)
515                 {
516                     _connectionHeader = HttpHeaderValues.CACHE.lookup(value);
517                 }
518                 exchange.getEventListener().onResponseHeader(name,value);
519             }
520         }
521 
522         @Override
523         public void headerComplete() throws IOException
524         {
525             HttpExchange exchange = _exchange;
526             if (exchange!=null)
527                 exchange.setStatus(HttpExchange.STATUS_PARSING_CONTENT);
528         }
529 
530         @Override
531         public void content(Buffer ref) throws IOException
532         {
533             HttpExchange exchange = _exchange;
534             if (exchange!=null)
535                 exchange.getEventListener().onResponseContent(ref);
536         }
537 
538         @Override
539         public void messageComplete(long contextLength) throws IOException
540         {
541             HttpExchange exchange = _exchange;
542             if (exchange!=null)
543                 exchange.setStatus(HttpExchange.STATUS_COMPLETED);
544         }
545     }
546 
547     /* ------------------------------------------------------------ */
548     public String toString()
549     {
550         return "HttpConnection@" + hashCode() + "//" + _destination.getAddress().getHost() + ":" + _destination.getAddress().getPort();
551     }
552 
553     /* ------------------------------------------------------------ */
554     public String toDetailString()
555     {
556         return toString() + " ex=" + _exchange + " " + _timeout.getAge();
557     }
558 
559     /* ------------------------------------------------------------ */
560     /**
561      * @return the last
562      */
563     public long getLast()
564     {
565         return _last;
566     }
567 
568     /* ------------------------------------------------------------ */
569     /**
570      * @param last
571      *            the last to set
572      */
573     public void setLast(long last)
574     {
575         _last = last;
576     }
577 
578     /* ------------------------------------------------------------ */
579     public void close() throws IOException
580     {
581         try
582         {
583             _endp.close();
584         }
585         finally
586         {
587 
588             if (_exchange != null)
589             {
590                 _exchange.onException(new IOException("Connection closed"));
591             }
592         }
593     }
594 }