1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.mortbay.jetty.client;
16
17 import java.io.IOException;
18 import java.io.InputStream;
19 import java.io.InterruptedIOException;
20
21 import org.mortbay.io.Buffer;
22 import org.mortbay.io.Buffers;
23 import org.mortbay.io.ByteArrayBuffer;
24 import org.mortbay.io.Connection;
25 import org.mortbay.io.EndPoint;
26 import org.mortbay.io.nio.SelectChannelEndPoint;
27 import org.mortbay.jetty.HttpGenerator;
28 import org.mortbay.jetty.HttpHeaderValues;
29 import org.mortbay.jetty.HttpHeaders;
30 import org.mortbay.jetty.HttpParser;
31 import org.mortbay.jetty.HttpSchemes;
32 import org.mortbay.jetty.HttpVersions;
33 import org.mortbay.jetty.client.security.Authorization;
34 import org.mortbay.jetty.security.SslHttpChannelEndPoint;
35 import org.mortbay.log.Log;
36 import org.mortbay.thread.Timeout;
37
38
39
40
41
42
43 public class HttpConnection implements Connection
44 {
45 HttpDestination _destination;
46 EndPoint _endp;
47 HttpGenerator _generator;
48 HttpParser _parser;
49 boolean _http11 = true;
50 Buffer _connectionHeader;
51 Buffer _requestContentChunk;
52 long _last;
53 boolean _requestComplete;
54 public String _message;
55 public boolean _reserved;
56
57
58 volatile HttpExchange _exchange;
59 HttpExchange _pipeline;
60
61 public void dump() throws IOException
62 {
63 System.err.println("endp=" + _endp + " " + _endp.isBufferingInput() + " " + _endp.isBufferingOutput());
64 System.err.println("generator=" + _generator);
65 System.err.println("parser=" + _parser.getState() + " " + _parser.isMoreInBuffer());
66 System.err.println("exchange=" + _exchange);
67 if (_endp instanceof SslHttpChannelEndPoint)
68 ((SslHttpChannelEndPoint)_endp).dump();
69 }
70
71 Timeout.Task _timeout = new Timeout.Task()
72 {
73 public void expired()
74 {
75 HttpExchange ex=null;
76 try
77 {
78 synchronized (HttpConnection.this)
79 {
80 ex = _exchange;
81 _exchange = null;
82 if (ex != null)
83 _destination.returnConnection(HttpConnection.this,true);
84 }
85 }
86 catch (Exception e)
87 {
88 Log.debug(e);
89 }
90 finally
91 {
92 try
93 {
94 close();
95 }
96 catch (IOException e)
97 {
98 Log.ignore(e);
99 }
100
101 if (ex!=null && ex.getStatus() < HttpExchange.STATUS_COMPLETED)
102 {
103 ex.setStatus(HttpExchange.STATUS_EXPIRED);
104 }
105 }
106 }
107 };
108
109
110 HttpConnection(Buffers buffers, EndPoint endp, int hbs, int cbs)
111 {
112 _endp = endp;
113 _generator = new HttpGenerator(buffers,endp,hbs,cbs);
114 _parser = new HttpParser(buffers,endp,new Handler(),hbs,cbs);
115 }
116
117 public void setReserved (boolean reserved)
118 {
119 _reserved = reserved;
120 }
121
122 public boolean isReserved()
123 {
124 return _reserved;
125 }
126
127
128 public HttpDestination getDestination()
129 {
130 return _destination;
131 }
132
133
134 public void setDestination(HttpDestination destination)
135 {
136 _destination = destination;
137 }
138
139
140 public boolean send(HttpExchange ex) throws IOException
141 {
142
143
144
145 synchronized (this)
146 {
147 if (_exchange != null)
148 {
149 if (_pipeline != null)
150 throw new IllegalStateException(this + " PIPELINED!!! _exchange=" + _exchange);
151 _pipeline = ex;
152 return true;
153 }
154
155 if (!_endp.isOpen())
156 return false;
157
158 ex.setStatus(HttpExchange.STATUS_WAITING_FOR_COMMIT);
159 _exchange = ex;
160
161 if (_endp.isBlocking())
162 this.notify();
163 else
164 {
165 SelectChannelEndPoint scep = (SelectChannelEndPoint)_endp;
166 scep.scheduleWrite();
167 }
168
169 if (!_endp.isBlocking())
170 _destination.getHttpClient().schedule(_timeout);
171
172 return true;
173 }
174 }
175
176
177 public void handle() throws IOException
178 {
179 int no_progress = 0;
180 long flushed = 0;
181
182 boolean failed = false;
183 while (_endp.isBufferingInput() || _endp.isOpen())
184 {
185 synchronized (this)
186 {
187 while (_exchange == null)
188 {
189 if (_endp.isBlocking())
190 {
191 try
192 {
193 this.wait();
194 }
195 catch (InterruptedException e)
196 {
197 throw new InterruptedIOException();
198 }
199 }
200 else
201 {
202
203 _parser.fill();
204 _parser.skipCRLF();
205 if (_parser.isMoreInBuffer())
206 {
207 Log.warn("unexpected data");
208 close();
209 }
210
211 return;
212 }
213 }
214 }
215 if (_exchange.getStatus() == HttpExchange.STATUS_WAITING_FOR_COMMIT)
216 {
217 no_progress = 0;
218 commitRequest();
219 }
220
221 try
222 {
223 long io = 0;
224 _endp.flush();
225
226 if (_generator.isComplete())
227 {
228 if (!_requestComplete)
229 {
230 _requestComplete = true;
231 _exchange.getEventListener().onRequestComplete();
232 }
233 }
234 else
235 {
236
237 synchronized (this)
238 {
239 if (_exchange == null)
240 continue;
241 flushed = _generator.flush();
242 io += flushed;
243 }
244
245 if (!_generator.isComplete())
246 {
247 InputStream in = _exchange.getRequestContentSource();
248 if (in != null)
249 {
250 if (_requestContentChunk == null || _requestContentChunk.length() == 0)
251 {
252 _requestContentChunk = _exchange.getRequestContentChunk();
253 if (_requestContentChunk != null)
254 _generator.addContent(_requestContentChunk,false);
255 else
256 _generator.complete();
257 io += _generator.flush();
258 }
259 }
260 else
261 _generator.complete();
262 }
263 }
264
265
266
267 if (!_parser.isComplete() && _generator.isCommitted())
268 {
269 long filled = _parser.parseAvailable();
270 io += filled;
271 }
272
273 if (io > 0)
274 no_progress = 0;
275 else if (no_progress++ >= 2 && !_endp.isBlocking())
276 {
277
278 if (_endp instanceof SslHttpChannelEndPoint && !_generator.isComplete() && !_generator.isEmpty())
279 {
280 if (_generator.flush()>0)
281 continue;
282 }
283 return;
284 }
285 }
286 catch (Throwable e)
287 {
288 if (e instanceof ThreadDeath)
289 throw (ThreadDeath)e;
290
291 synchronized (this)
292 {
293 if (_exchange != null)
294 {
295 _exchange.getEventListener().onException(e);
296 _exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
297 }
298 }
299 failed = true;
300 if (e instanceof IOException)
301 throw (IOException)e;
302
303 if (e instanceof Error)
304 throw (Error)e;
305
306 if (e instanceof RuntimeException)
307 throw (RuntimeException)e;
308
309 throw new RuntimeException(e);
310 }
311 finally
312 {
313 boolean complete = false;
314 boolean close = failed;
315 if (!failed)
316 {
317
318 if (_generator.isComplete())
319 {
320 if (!_requestComplete)
321 {
322 _requestComplete = true;
323 _exchange.getEventListener().onRequestComplete();
324 }
325
326
327
328 if (_parser.isComplete())
329 {
330 _destination.getHttpClient().cancel(_timeout);
331 complete = true;
332 }
333 }
334 }
335
336 if (complete || failed)
337 {
338 synchronized (this)
339 {
340 if (!close)
341 close = shouldClose();
342
343 reset(true);
344 no_progress = 0;
345 flushed = -1;
346 if (_exchange != null)
347 {
348 _exchange = null;
349
350 if (_pipeline == null)
351 {
352 if (!isReserved())
353 _destination.returnConnection(this,close);
354 if (close)
355 return;
356 }
357 else
358 {
359 if (close)
360 {
361 if (!isReserved())
362 _destination.returnConnection(this,close);
363 _destination.send(_pipeline);
364 _pipeline = null;
365 return;
366 }
367
368 HttpExchange ex = _pipeline;
369 _pipeline = null;
370
371 send(ex);
372 }
373 }
374 }
375 }
376 }
377 }
378 }
379
380
381 public boolean isIdle()
382 {
383 synchronized (this)
384 {
385 return _exchange == null;
386 }
387 }
388
389
390 public EndPoint getEndPoint()
391 {
392 return _endp;
393 }
394
395
396 private void commitRequest() throws IOException
397 {
398 synchronized (this)
399 {
400 if (_exchange.getStatus() != HttpExchange.STATUS_WAITING_FOR_COMMIT)
401 throw new IllegalStateException();
402
403 _exchange.setStatus(HttpExchange.STATUS_SENDING_REQUEST);
404 _generator.setVersion(_exchange._version);
405
406 String uri = _exchange._uri;
407 if (_destination.isProxied() && uri.startsWith("/"))
408 {
409
410 uri = (_destination.isSecure()?HttpSchemes.HTTPS:HttpSchemes.HTTP) + "://" + _destination.getAddress().getHost() + ":"
411 + _destination.getAddress().getPort() + uri;
412 Authorization auth = _destination.getProxyAuthentication();
413 if (auth != null)
414 auth.setCredentials(_exchange);
415 }
416
417 _generator.setRequest(_exchange._method,uri);
418
419 if (_exchange._version >= HttpVersions.HTTP_1_1_ORDINAL)
420 {
421 if (!_exchange._requestFields.containsKey(HttpHeaders.HOST_BUFFER))
422 _exchange._requestFields.add(HttpHeaders.HOST_BUFFER,_destination.getHostHeader());
423 }
424
425 if (_exchange._requestContent != null)
426 {
427 _exchange._requestFields.putLongField(HttpHeaders.CONTENT_LENGTH,_exchange._requestContent.length());
428 _generator.completeHeader(_exchange._requestFields,false);
429 _generator.addContent(_exchange._requestContent,true);
430 }
431 else if (_exchange._requestContentSource != null)
432 {
433 _generator.completeHeader(_exchange._requestFields,false);
434 int available = _exchange._requestContentSource.available();
435 if (available > 0)
436 {
437
438
439
440 byte[] buf = new byte[available];
441 int length = _exchange._requestContentSource.read(buf);
442 _generator.addContent(new ByteArrayBuffer(buf,0,length),false);
443 }
444 }
445 else
446 {
447 _exchange._requestFields.remove(HttpHeaders.CONTENT_LENGTH);
448
449
450
451
452
453 _generator.completeHeader(_exchange._requestFields,true);
454 }
455
456 _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
457 }
458 }
459
460
461 protected void reset(boolean returnBuffers) throws IOException
462 {
463 _requestComplete = false;
464 _connectionHeader = null;
465 _parser.reset(returnBuffers);
466 _generator.reset(returnBuffers);
467 _http11 = true;
468 }
469
470
471 private boolean shouldClose()
472 {
473 if (_connectionHeader!=null)
474 {
475 if (HttpHeaderValues.CLOSE_BUFFER.equals(_connectionHeader))
476 return true;
477 if (HttpHeaderValues.KEEP_ALIVE_BUFFER.equals(_connectionHeader))
478 return false;
479 }
480 return !_http11;
481 }
482
483
484 private class Handler extends HttpParser.EventHandler
485 {
486 @Override
487 public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
488 {
489
490
491
492
493
494 }
495
496 @Override
497 public void startResponse(Buffer version, int status, Buffer reason) throws IOException
498 {
499 HttpExchange exchange = _exchange;
500 if (exchange!=null)
501 {
502 _http11 = HttpVersions.HTTP_1_1_BUFFER.equals(version);
503 exchange.getEventListener().onResponseStatus(version,status,reason);
504 exchange.setStatus(HttpExchange.STATUS_PARSING_HEADERS);
505 }
506 }
507
508 @Override
509 public void parsedHeader(Buffer name, Buffer value) throws IOException
510 {
511 HttpExchange exchange = _exchange;
512 if (exchange!=null)
513 {
514 if (HttpHeaders.CACHE.getOrdinal(name) == HttpHeaders.CONNECTION_ORDINAL)
515 {
516 _connectionHeader = HttpHeaderValues.CACHE.lookup(value);
517 }
518 exchange.getEventListener().onResponseHeader(name,value);
519 }
520 }
521
522 @Override
523 public void headerComplete() throws IOException
524 {
525 HttpExchange exchange = _exchange;
526 if (exchange!=null)
527 exchange.setStatus(HttpExchange.STATUS_PARSING_CONTENT);
528 }
529
530 @Override
531 public void content(Buffer ref) throws IOException
532 {
533 HttpExchange exchange = _exchange;
534 if (exchange!=null)
535 exchange.getEventListener().onResponseContent(ref);
536 }
537
538 @Override
539 public void messageComplete(long contextLength) throws IOException
540 {
541 HttpExchange exchange = _exchange;
542 if (exchange!=null)
543 exchange.setStatus(HttpExchange.STATUS_COMPLETED);
544 }
545 }
546
547
548 public String toString()
549 {
550 return "HttpConnection@" + hashCode() + "//" + _destination.getAddress().getHost() + ":" + _destination.getAddress().getPort();
551 }
552
553
554 public String toDetailString()
555 {
556 return toString() + " ex=" + _exchange + " " + _timeout.getAge();
557 }
558
559
560
561
562
563 public long getLast()
564 {
565 return _last;
566 }
567
568
569
570
571
572
573 public void setLast(long last)
574 {
575 _last = last;
576 }
577
578
579 public void close() throws IOException
580 {
581 try
582 {
583 _endp.close();
584 }
585 finally
586 {
587
588 if (_exchange != null)
589 {
590 _exchange.onException(new IOException("Connection closed"));
591 }
592 }
593 }
594 }