From e2598bee5c9a4295bc9c0245dcf455df09794981 Mon Sep 17 00:00:00 2001 From: scw00 <616955249@qq.com> Date: Fri, 2 Jun 2017 21:59:46 +0800 Subject: [PATCH] Fix post transform retry --- mgmt/RecordsConfig.cc | 2 + proxy/http/HttpConfig.cc | 3 + proxy/http/HttpConfig.h | 2 + proxy/http/HttpSM.cc | 365 ++++++++++++++++++++++++++++++++++++--- proxy/http/HttpSM.h | 9 + proxy/http/HttpTunnel.cc | 209 +++++++++++++++++++++- proxy/http/HttpTunnel.h | 196 ++++++++++++++++++++- 7 files changed, 755 insertions(+), 31 deletions(-) diff --git a/mgmt/RecordsConfig.cc b/mgmt/RecordsConfig.cc index 8a711f8c6a6..e8f02b81ed4 100644 --- a/mgmt/RecordsConfig.cc +++ b/mgmt/RecordsConfig.cc @@ -407,6 +407,8 @@ static const RecordElement RecordsConfig[] = , {RECT_CONFIG, "proxy.config.http.strict_uri_parsing", RECD_INT, "0", RECU_DYNAMIC, RR_NULL, RECC_INT, "[0-1]", RECA_NULL} , + {RECT_CONFIG, "proxy.config.http.tunnel_faker_enabled", RECD_INT, "0", RECU_DYNAMIC, RR_NULL, RECC_NULL, nullptr, RECA_NULL} + , // # Send http11 requests // # // # 0 - Never diff --git a/proxy/http/HttpConfig.cc b/proxy/http/HttpConfig.cc index e6eb67a4308..583ff3dd34b 100644 --- a/proxy/http/HttpConfig.cc +++ b/proxy/http/HttpConfig.cc @@ -1045,6 +1045,7 @@ HttpConfig::startup() HttpEstablishStaticConfigLongLong(c.oride.flow_low_water_mark, "proxy.config.http.flow_control.low_water"); HttpEstablishStaticConfigByte(c.oride.post_check_content_length_enabled, "proxy.config.http.post.check.content_length.enabled"); HttpEstablishStaticConfigByte(c.strict_uri_parsing, "proxy.config.http.strict_uri_parsing"); + HttpEstablishStaticConfigByte(c.tunnel_faker_enabled, "proxy.config.http.tunnel_faker_enabled"); // [amc] This is a bit of a mess, need to figure out to make this cleaner. RecRegisterConfigUpdateCb("proxy.config.http.server_session_sharing.match", &http_server_session_sharing_cb, &c); @@ -1483,6 +1484,8 @@ HttpConfig::reconfigure() params->strict_uri_parsing = INT_TO_BOOL(m_master.strict_uri_parsing); + params->tunnel_faker_enabled = INT_TO_BOOL(m_master.tunnel_faker_enabled); + params->oride.down_server_timeout = m_master.oride.down_server_timeout; params->oride.client_abort_threshold = m_master.oride.client_abort_threshold; diff --git a/proxy/http/HttpConfig.h b/proxy/http/HttpConfig.h index 80a67d3c80e..ed504fef4be 100644 --- a/proxy/http/HttpConfig.h +++ b/proxy/http/HttpConfig.h @@ -792,6 +792,8 @@ struct HttpConfigParams : public ConfigInfo { MgmtByte strict_uri_parsing = 0; + MgmtByte tunnel_faker_enabled = 0; + MgmtByte reverse_proxy_enabled = 0; MgmtByte url_remap_required = 1; diff --git a/proxy/http/HttpSM.cc b/proxy/http/HttpSM.cc index b321fb8b83d..18a41510bd5 100644 --- a/proxy/http/HttpSM.cc +++ b/proxy/http/HttpSM.cc @@ -1943,7 +1943,7 @@ HttpSM::state_send_server_request_header(int event, void *data) method = t_state.hdr_info.server_request.method_get_wksidx(); if (!t_state.api_server_request_body_set && method != HTTP_WKSIDX_TRACE && (t_state.hdr_info.request_content_length > 0 || t_state.client_info.transfer_encoding == HttpTransact::CHUNKED_ENCODING)) { - if (post_transform_info.vc) { + if (post_transform_info.vc && t_state.current.attempts == 0) { setup_transform_to_server_transfer(); } else { do_setup_post_tunnel(HTTP_SERVER_VC); @@ -2000,7 +2000,21 @@ HttpSM::state_send_server_request_header(int event, void *data) case VC_EVENT_ERROR: case VC_EVENT_ACTIVE_TIMEOUT: case VC_EVENT_INACTIVITY_TIMEOUT: - handle_server_setup_error(event, data); + // save the handler and wait for fake vc success + if (t_state.http_config_param->tunnel_faker_enabled && t_state.current.attempts == 0 && tunnel.get_fvc() == nullptr) { + // server tunnel is not running, setup server tunnel. + // we need to initialize fvc first to set the default handler. + // because tunnel may success directly and go to HttpSM::tunnel_handler_post. + if (post_transform_info.vc) { + setup_transform_to_fake_transfer(event, data); + } else { + do_setup_ua_to_fake_tunnel(event, data); + } + } else { + if (t_state.http_config_param->tunnel_faker_enabled && tunnel.get_fvc()) + tunnel.set_fvc_handler(&HttpSM::fake_server_setup_error, event, data); + handle_server_setup_error(event, data); + } break; case VC_EVENT_READ_COMPLETE: @@ -2561,7 +2575,7 @@ HttpSM::tunnel_handler_post_or_put(HttpTunnelProducer *p) // MUST NOT clear the vc pointer from post_transform_info // as this causes a double close of the transform vc in transform_cleanup // - if (post_transform_info.vc != nullptr) { + if (post_transform_info.vc != nullptr && t_state.current.attempts == 0) { ink_assert(post_transform_info.entry->in_tunnel == true); ink_assert(post_transform_info.vc == post_transform_info.entry->vc); vc_table.cleanup_entry(post_transform_info.entry); @@ -2583,7 +2597,7 @@ HttpSM::tunnel_handler_post_or_put(HttpTunnelProducer *p) ink_assert(p->read_success == true); ink_assert(p->consumer_list.head->write_success == true); tunnel.deallocate_buffers(); - tunnel.reset(); + tunnel.reset(false); // When the ua completed sending it's data we must have // removed it from the tunnel ink_release_assert(ua_entry->in_tunnel == false); @@ -2595,6 +2609,47 @@ HttpSM::tunnel_handler_post_or_put(HttpTunnelProducer *p) } } +int +HttpSM::fake_server_setup_error(int event, void *data) +{ + Debug("fvc_handler", "state: %d, done: %d", tunnel.get_fvc_state(), tunnel.is_fvc_done()); + handle_server_setup_error(event, data); + return 0; +} + +int +HttpSM::fake_handle_post_failure(int event, void *data) +{ + Debug("fvc_handler", "state: %d, done: %d", tunnel.get_fvc_state(), tunnel.is_fvc_done()); + handle_post_failure(); + return 0; +} + +int +HttpSM::tunnel_handler_fake(int event, void *data) +{ + STATE_ENTER(&HttpSM::tunnel_handler_fake, event); + + ink_assert(tunnel.is_fvc_closed() || tunnel.is_fvc_done()); + Debug("fvc_handler", "state: %d, done: %d", tunnel.get_fvc_state(), tunnel.is_fvc_done()); + switch (event) { + case VC_EVENT_EOS: + case VC_EVENT_ERROR: + case VC_EVENT_INACTIVITY_TIMEOUT: + case VC_EVENT_ACTIVE_TIMEOUT: + tunnel.set_fvc_event(event); + case HTTP_TUNNEL_EVENT_DONE: + case VC_EVENT_WRITE_COMPLETE: + // all data was collected by fvc, call default_handler to continue. + tunnel.fvc_callcont(); + break; + default: + ink_release_assert(!"unexpected event"); + } + + return 0; +} + // int HttpSM::tunnel_handler_post(int event, void* data) // // Handles completion of any http request body tunnel @@ -2649,12 +2704,21 @@ HttpSM::tunnel_handler_post(int event, void *data) ink_assert(data == &tunnel); // The tunnel calls this when it is done + if (t_state.http_config_param->tunnel_faker_enabled && tunnel.get_fvc()) { + tunnel.get_fvc_state(&p->handler_state); + } + int p_handler_state = p->handler_state; tunnel_handler_post_or_put(p); switch (p_handler_state) { case HTTP_SM_POST_SERVER_FAIL: - handle_post_failure(); + if (t_state.http_config_param->tunnel_faker_enabled && t_state.current.attempts == 0 && tunnel.get_fvc() && + !tunnel.is_fvc_done()) { + tunnel.set_fvc_handler(&HttpSM::fake_handle_post_failure, event, data); + } else { + handle_post_failure(); + } break; case HTTP_SM_POST_UA_FAIL: break; @@ -2830,6 +2894,39 @@ HttpSM::is_http_server_eos_truncation(HttpTunnelProducer *p) } } +int +HttpSM::tunnel_handler_fake_downstream(int event, HttpTunnelProducer *p) +{ + STATE_ENTER(&HttpSM::tunnel_handler_fake_downstream, event); + ink_assert(tunnel.is_fvc_done()); + p->read_success = true; + return 0; +} + +int +HttpSM::tunnel_handler_fake_upstream(int event, HttpTunnelConsumer *c) +{ + switch (event) { + case VC_EVENT_WRITE_READY: + // run self producer + if (!tunnel.is_fvc_producer_run()) { + STATE_ENTER(&HttpSM::tunnel_handler_fake_upstream, event); + HttpTunnelProducer *p = c->self_producer; + ink_release_assert(p); + tunnel.tunnel_run(p); + } + return 0; + case VC_EVENT_ERROR: + case VC_EVENT_EOS: + default: + ink_release_assert("unknown event"); + } + + STATE_ENTER(&HttpSM::tunnel_handler_fake_upstream, event); + c->write_success = true; + return 0; +} + int HttpSM::tunnel_handler_server(int event, HttpTunnelProducer *p) { @@ -3513,6 +3610,8 @@ HttpSM::tunnel_handler_post_server(int event, HttpTunnelConsumer *c) server_entry->eos = true; c->vc->do_io_shutdown(IO_SHUTDOWN_WRITE); + Debug("fvc_handler", "read %ld", tunnel.get_fvc_reader()->read_avail()); + // We may be reading from a transform. In that case, we // want to close the transform HttpTunnelProducer *ua_producer; @@ -3524,6 +3623,19 @@ HttpSM::tunnel_handler_post_server(int event, HttpTunnelConsumer *c) c->producer->self_consumer->alive = false; } ua_producer = c->producer->self_consumer->producer; + } else if (c->producer->vc_type == HT_STATIC_WRITE) { + ua_producer = c->producer->self_consumer->producer; + if (ua_producer->vc_type == HT_TRANSFORM) { + ua_producer = ua_producer->self_consumer->producer; + } + + HttpTunnelConsumer *nc = c->producer->self_consumer; + c->producer->alive = false; + nc->producer->self_consumer = nullptr; + nc->vc->do_io_read(nullptr, 0, nullptr); + + tunnel.set_fvc_state(HTTP_SM_POST_SERVER_FAIL); + break; } else { ua_producer = c->producer; } @@ -3561,7 +3673,7 @@ HttpSM::tunnel_handler_post_server(int event, HttpTunnelConsumer *c) // agent as down so that tunnel concludes. ua_producer->alive = false; ua_producer->handler_state = HTTP_SM_POST_SERVER_FAIL; - ink_assert(tunnel.is_tunnel_alive() == false); + // ink_assert(tunnel.is_tunnel_alive() == false); break; case VC_EVENT_WRITE_COMPLETE: @@ -5055,6 +5167,11 @@ HttpSM::do_api_callout_internal() VConnection * HttpSM::do_post_transform_open() { + // we can not go again since post_transform has already been destroyed; + if (t_state.current.attempts > 0) { + return nullptr; + } + ink_assert(post_transform_info.vc == nullptr); if (is_action_tag_set("http_post_nullt")) { @@ -5272,7 +5389,7 @@ HttpSM::handle_post_failure() setup_server_read_response_header(); } else { tunnel.deallocate_buffers(); - tunnel.reset(); + tunnel.reset(false); // Server died t_state.current.state = HttpTransact::CONNECTION_CLOSED; call_transact_and_set_next_state(HttpTransact::HandleResponse); @@ -5386,7 +5503,7 @@ HttpSM::handle_server_setup_error(int event, void *data) } } else { // c could be null here as well - if (c != nullptr) { + if (c != nullptr && c->alive) { tunnel.handleEvent(event, c->write_vio); } } @@ -5398,7 +5515,8 @@ HttpSM::handle_server_setup_error(int event, void *data) vc_table.cleanup_entry(post_transform_info.entry); post_transform_info.entry = nullptr; tunnel.deallocate_buffers(); - tunnel.reset(); + tunnel.reset(false); + server_entry->in_tunnel = false; } } } @@ -5465,30 +5583,142 @@ HttpSM::handle_server_setup_error(int event, void *data) call_transact_and_set_next_state(HttpTransact::HandleResponse); } +/////////////////////////////////////////////////////////// +// +// setup ua to fake tunnel +// and wait for tunnel success. It is server failed case only. +// +/////////////////////////////////////////////////////////// void -HttpSM::setup_transform_to_server_transfer() +HttpSM::do_setup_ua_to_fake_tunnel(int event, void *data) +{ + ink_assert(post_transform_info.vc == nullptr); + ink_assert(!tunnel.get_fvc()); + + STATE_ENTER(&HttpSM::do_setup_ua_to_fake_tunnel, event); + + HttpTunnelProducer *p = nullptr; + bool chunked = (t_state.client_info.transfer_encoding == HttpTransact::CHUNKED_ENCODING); + + int64_t alloc_index; + // content length is undefined, use default buffer size + if (t_state.hdr_info.request_content_length == HTTP_UNDEFINED_CL) { + alloc_index = (int)t_state.txn_conf->default_buffer_size_index; + if (alloc_index < MIN_CONFIG_BUFFER_SIZE_INDEX || alloc_index > MAX_BUFFER_SIZE_INDEX) { + alloc_index = DEFAULT_REQUEST_BUFFER_SIZE_INDEX; + } + } else { + alloc_index = buffer_size_to_index(t_state.hdr_info.request_content_length); + } + + HTTP_SM_SET_DEFAULT_HANDLER(&HttpSM::tunnel_handler_fake); + + MIOBuffer *post_buffer = new_MIOBuffer(alloc_index); + IOBufferReader *buf_start = post_buffer->alloc_reader(); + int64_t post_bytes = chunked ? INT64_MAX : t_state.hdr_info.request_content_length; + + client_request_body_bytes = post_buffer->write(ua_buffer_reader, chunked ? ua_buffer_reader->read_avail() : post_bytes); + ua_buffer_reader->consume(client_request_body_bytes); + + tunnel.fake_vc_init(post_bytes); + + tunnel.set_fvc_handler(&HttpSM::fake_server_setup_error, event, data); + + p = tunnel.add_producer(ua_entry->vc, post_bytes - transfered_bytes, buf_start, &HttpSM::tunnel_handler_post_ua, HT_HTTP_CLIENT, + "user agent post"); + + tunnel.add_consumer(tunnel.get_fvc(), ua_entry->vc, &HttpSM::tunnel_handler_fake_upstream, HT_STATIC_READ, "fake read"); + + if (chunked) { + tunnel.set_producer_chunking_action(p, 0, TCA_PASSTHRU_CHUNKED_CONTENT); + } + + tunnel.tunnel_run(p); + + ua_entry->in_tunnel = true; +} + +/////////////////////////////////// +// +// setup transform to fake tunnel +// and wait for tunnel success. +// +////////////////////////////////// +void +HttpSM::setup_transform_to_fake_transfer(int event, void *data) { ink_assert(post_transform_info.vc != nullptr); ink_assert(post_transform_info.entry->vc == post_transform_info.vc); + STATE_ENTER(&HttpSM::setup_transform_to_fake_transfer, event); + + HttpTunnelProducer *p = nullptr; + + // fvc is running retrun + if (tunnel.get_fvc()) { + return; + } + int64_t nbytes = t_state.hdr_info.transform_request_cl; int64_t alloc_index = buffer_size_to_index(nbytes); MIOBuffer *post_buffer = new_MIOBuffer(alloc_index); IOBufferReader *buf_start = post_buffer->alloc_reader(); - HTTP_SM_SET_DEFAULT_HANDLER(&HttpSM::tunnel_handler_post); + HTTP_SM_SET_DEFAULT_HANDLER(&HttpSM::tunnel_handler_fake); + + HttpTunnelConsumer *c = tunnel.get_consumer(post_transform_info.vc); + + tunnel.fake_vc_init(nbytes); + tunnel.set_fvc_handler(&HttpSM::fake_server_setup_error, event, data); + + p = tunnel.add_producer(post_transform_info.vc, nbytes, buf_start, &HttpSM::tunnel_handler_transform_read, HT_TRANSFORM, + "post transform"); + tunnel.chain(c, p); + + post_transform_info.entry->in_tunnel = true; + + tunnel.add_consumer(tunnel.get_fvc(), post_transform_info.vc, &HttpSM::tunnel_handler_fake_upstream, HT_STATIC_READ, "fake read"); + + tunnel.tunnel_run(p); +} + +void +HttpSM::setup_transform_to_server_transfer() +{ + ink_assert(post_transform_info.vc != nullptr); + ink_assert(post_transform_info.entry->vc == post_transform_info.vc); + + int64_t nbytes = t_state.hdr_info.transform_request_cl; + int64_t alloc_index = buffer_size_to_index(nbytes); + MIOBuffer *post_buffer = new_MIOBuffer(alloc_index); + IOBufferReader *buf_start = post_buffer->alloc_reader(); HttpTunnelConsumer *c = tunnel.get_consumer(post_transform_info.vc); HttpTunnelProducer *p = tunnel.add_producer(post_transform_info.vc, nbytes, buf_start, &HttpSM::tunnel_handler_transform_read, HT_TRANSFORM, "post transform"); tunnel.chain(c, p); - post_transform_info.entry->in_tunnel = true; - tunnel.add_consumer(server_entry->vc, post_transform_info.vc, &HttpSM::tunnel_handler_post_server, HT_HTTP_SERVER, - "http server post"); - server_entry->in_tunnel = true; + if (!t_state.http_config_param->tunnel_faker_enabled) { + HTTP_SM_SET_DEFAULT_HANDLER(&HttpSM::tunnel_handler_post); + + tunnel.add_consumer(server_entry->vc, post_transform_info.vc, &HttpSM::tunnel_handler_post_server, HT_HTTP_SERVER, + "http server post"); + } else { + // setup fake vc + HTTP_SM_SET_DEFAULT_HANDLER(&HttpSM::tunnel_handler_fake); + tunnel.fake_vc_init(nbytes); + + HttpTunnelProducer *fp = tunnel.add_tunnel_fake(post_transform_info.vc, &HttpSM::tunnel_handler_fake_upstream, + &HttpSM::tunnel_handler_fake_downstream); + tunnel.add_consumer(server_entry->vc, fp->vc, &HttpSM::tunnel_handler_post_server, HT_HTTP_SERVER, "http server post"); + // save the default handler because we need to wait for fvc succeed. + tunnel.set_fvc_handler(&HttpSM::tunnel_handler_post, HTTP_TUNNEL_EVENT_DONE, &tunnel); + } + + post_transform_info.entry->in_tunnel = true; + server_entry->in_tunnel = true; tunnel.tunnel_run(p); } @@ -5516,11 +5746,76 @@ HttpSM::do_drain_request_body() } #endif /* PROXY_DRAIN */ +void +HttpSM::do_setup_fake_tunnel() +{ + bool chunked = (t_state.client_info.transfer_encoding == HttpTransact::CHUNKED_ENCODING); + int64_t alloc_index; + + HttpTunnelProducer *p = nullptr; + + ink_release_assert(tunnel.get_fvc()); + HTTP_SM_SET_DEFAULT_HANDLER(&HttpSM::tunnel_handler_post); + + // content length is undefined, use default buffer size + if (t_state.hdr_info.request_content_length == HTTP_UNDEFINED_CL) { + alloc_index = (int)t_state.txn_conf->default_buffer_size_index; + if (alloc_index < MIN_CONFIG_BUFFER_SIZE_INDEX || alloc_index > MAX_BUFFER_SIZE_INDEX) { + alloc_index = DEFAULT_REQUEST_BUFFER_SIZE_INDEX; + } + } else { + alloc_index = buffer_size_to_index(t_state.hdr_info.request_content_length); + } + MIOBuffer *post_buffer = new_MIOBuffer(alloc_index); + IOBufferReader *buf_start = post_buffer->alloc_reader(); + int64_t post_bytes = chunked ? INT64_MAX : t_state.hdr_info.request_content_length; + int64_t left_bytes = post_bytes - tunnel.get_fvc_transfer_bytes(); + + // Note: Many browsers, Netscape and IE included send two extra + // bytes (CRLF) at the end of the post. We just ignore those + // bytes since the sending them is not spec + + // Next order of business if copy the remaining data from the + // header buffer into new buffer + client_request_body_bytes = post_buffer->write(ua_buffer_reader, chunked ? ua_buffer_reader->read_avail() : post_bytes); + + // ua_buffer_reader->consume(client_request_body_bytes); + p = tunnel.add_producer(ua_entry->vc, left_bytes, buf_start, &HttpSM::tunnel_handler_post_ua, HT_HTTP_CLIENT, "user agent post"); + + // Do we have data in ua_buffer_reader + if (left_bytes > 0) { + HttpTunnelProducer *fp = + tunnel.setup_fake_tunnel(ua_entry->vc, &HttpSM::tunnel_handler_fake_upstream, &HttpSM::tunnel_handler_fake_downstream, false); + tunnel.add_consumer(server_entry->vc, fp->vc, &HttpSM::tunnel_handler_post_server, HT_HTTP_SERVER, "http server post"); + // run ua producer + tunnel.tunnel_run(p); + ua_entry->in_tunnel = true; + } else { + // the client producer has already finished. + p->alive = false; + p->read_success = true; + // client producer always succeed. We will reset it in HttpSM::handle_server_setup_error if not. + p->handler_state = HTTP_SM_POST_SUCCESS; + + HttpTunnelProducer *fp = + tunnel.setup_fake_tunnel(ua_entry->vc, &HttpSM::tunnel_handler_fake_upstream, &HttpSM::tunnel_handler_fake_downstream); + tunnel.add_consumer(server_entry->vc, fp->vc, &HttpSM::tunnel_handler_post_server, HT_HTTP_SERVER, "http server post"); + + fp->read_success = true; + fp->self_consumer->write_success = true; + ua_entry->in_tunnel = false; + // run fake producer + tunnel.tunnel_run(fp); + } + server_entry->in_tunnel = true; +} + void HttpSM::do_setup_post_tunnel(HttpVC_t to_vc_type) { bool chunked = (t_state.client_info.transfer_encoding == HttpTransact::CHUNKED_ENCODING); bool post_redirect = false; + int64_t post_bytes = 0; HttpTunnelProducer *p = nullptr; // YTS Team, yamsat Plugin @@ -5532,14 +5827,21 @@ HttpSM::do_setup_post_tunnel(HttpVC_t to_vc_type) post_redirect = true; // copy the post data into a new producer buffer for static producer tunnel.postbuf->postdata_producer_buffer->write(tunnel.postbuf->postdata_copy_buffer_start); - int64_t post_bytes = tunnel.postbuf->postdata_producer_reader->read_avail(); - transfered_bytes = post_bytes; - p = tunnel.add_producer(HTTP_TUNNEL_STATIC_PRODUCER, post_bytes, tunnel.postbuf->postdata_producer_reader, + post_bytes = tunnel.postbuf->postdata_producer_reader->read_avail(); + transfered_bytes = post_bytes; + p = tunnel.add_producer(HTTP_TUNNEL_STATIC_PRODUCER, post_bytes, tunnel.postbuf->postdata_producer_reader, (HttpProducerHandler) nullptr, HT_STATIC, "redirect static agent post"); // the tunnel has taken over the buffer and will free it tunnel.postbuf->postdata_producer_buffer = nullptr; tunnel.postbuf->postdata_producer_reader = nullptr; } else { + // This is retry case. All data was held in fvc. We just need to setup + // ua -> fvc -> server tunnel and run fvc producer. + if (t_state.current.attempts > 0 && t_state.http_config_param->tunnel_faker_enabled) { + do_setup_fake_tunnel(); + return; + } + int64_t alloc_index; // content length is undefined, use default buffer size if (t_state.hdr_info.request_content_length == HTTP_UNDEFINED_CL) { @@ -5552,7 +5854,7 @@ HttpSM::do_setup_post_tunnel(HttpVC_t to_vc_type) } MIOBuffer *post_buffer = new_MIOBuffer(alloc_index); IOBufferReader *buf_start = post_buffer->alloc_reader(); - int64_t post_bytes = chunked ? INT64_MAX : t_state.hdr_info.request_content_length; + post_bytes = chunked ? INT64_MAX : t_state.hdr_info.request_content_length; // Note: Many browsers, Netscape and IE included send two extra // bytes (CRLF) at the end of the post. We just ignore those @@ -5586,8 +5888,23 @@ HttpSM::do_setup_post_tunnel(HttpVC_t to_vc_type) tunnel.add_consumer(server_entry->vc, HTTP_TUNNEL_STATIC_PRODUCER, &HttpSM::tunnel_handler_post_server, HT_HTTP_SERVER, "redirect http server post"); } else { - HTTP_SM_SET_DEFAULT_HANDLER(&HttpSM::tunnel_handler_post); - tunnel.add_consumer(server_entry->vc, ua_entry->vc, &HttpSM::tunnel_handler_post_server, HT_HTTP_SERVER, "http server post"); + if (t_state.http_config_param->tunnel_faker_enabled) { + HTTP_SM_SET_DEFAULT_HANDLER(&HttpSM::tunnel_handler_fake); + // setup fake vc tunnel + ink_assert(tunnel.get_fvc() == nullptr); + + tunnel.fake_vc_init(post_bytes); + tunnel.set_fvc_handler(&HttpSM::tunnel_handler_post, HTTP_TUNNEL_EVENT_DONE, &tunnel); + + HttpTunnelProducer *fp = + tunnel.add_tunnel_fake(ua_entry->vc, &HttpSM::tunnel_handler_fake_upstream, &HttpSM::tunnel_handler_fake_downstream); + tunnel.add_consumer(server_entry->vc, fp->vc, &HttpSM::tunnel_handler_post_server, HT_HTTP_SERVER, "http server post"); + + } else { + HTTP_SM_SET_DEFAULT_HANDLER(&HttpSM::tunnel_handler_post); + tunnel.add_consumer(server_entry->vc, ua_entry->vc, &HttpSM::tunnel_handler_post_server, HT_HTTP_SERVER, + "http server post"); + } } server_entry->in_tunnel = true; break; @@ -5596,13 +5913,13 @@ HttpSM::do_setup_post_tunnel(HttpVC_t to_vc_type) break; } + ua_session->set_inactivity_timeout(HRTIME_SECONDS(t_state.txn_conf->transaction_no_activity_timeout_in)); + server_session->get_netvc()->set_inactivity_timeout(HRTIME_SECONDS(t_state.txn_conf->transaction_no_activity_timeout_out)); + if (chunked) { tunnel.set_producer_chunking_action(p, 0, TCA_PASSTHRU_CHUNKED_CONTENT); } - ua_session->set_inactivity_timeout(HRTIME_SECONDS(t_state.txn_conf->transaction_no_activity_timeout_in)); - server_session->get_netvc()->set_inactivity_timeout(HRTIME_SECONDS(t_state.txn_conf->transaction_no_activity_timeout_out)); - tunnel.tunnel_run(p); // If we're half closed, we got a FIN from the client. Forward it on to the origin server diff --git a/proxy/http/HttpSM.h b/proxy/http/HttpSM.h index 3226403bec9..df64aa42d79 100644 --- a/proxy/http/HttpSM.h +++ b/proxy/http/HttpSM.h @@ -360,6 +360,7 @@ class HttpSM : public Continuation int tunnel_handler(int event, void *data); int tunnel_handler_push(int event, void *data); int tunnel_handler_post(int event, void *data); + int tunnel_handler_fake(int event, void *data); // YTS Team, yamsat Plugin int tunnel_handler_for_partial_post(int event, void *data); @@ -405,6 +406,8 @@ class HttpSM : public Continuation int state_common_wait_for_transform_read(HttpTransformInfo *t_info, HttpSMHandler tunnel_handler, int event, void *data); // Tunnel event handlers + int tunnel_handler_fake_downstream(int event, HttpTunnelProducer *c); + int tunnel_handler_fake_upstream(int event, HttpTunnelConsumer *p); int tunnel_handler_server(int event, HttpTunnelProducer *p); int tunnel_handler_ua(int event, HttpTunnelConsumer *c); int tunnel_handler_ua_push(int event, HttpTunnelProducer *p); @@ -419,12 +422,17 @@ class HttpSM : public Continuation int tunnel_handler_transform_read(int event, HttpTunnelProducer *p); int tunnel_handler_plugin_agent(int event, HttpTunnelConsumer *c); + int fake_server_setup_error(int event, void *data); + int fake_handle_post_failure(int event, void *data); + void do_hostdb_lookup(); void do_hostdb_reverse_lookup(); void do_cache_lookup_and_read(); void do_http_server_open(bool raw = false); void send_origin_throttled_response(); void do_setup_post_tunnel(HttpVC_t to_vc_type); + void do_setup_ua_to_fake_tunnel(int event, void *data); + void do_setup_fake_tunnel(); void do_cache_prepare_write(); void do_cache_prepare_write_transform(); void do_cache_prepare_update(); @@ -467,6 +475,7 @@ class HttpSM : public Continuation void setup_100_continue_transfer(); HttpTunnelProducer *setup_push_transfer_to_cache(); void setup_transform_to_server_transfer(); + void setup_transform_to_fake_transfer(int event, void *data); void setup_cache_write_transfer(HttpCacheSM *c_sm, VConnection *source_vc, HTTPInfo *store_info, int64_t skip_bytes, const char *name); void issue_cache_update(); diff --git a/proxy/http/HttpTunnel.cc b/proxy/http/HttpTunnel.cc index fa66b2b6bef..56c6b191da5 100644 --- a/proxy/http/HttpTunnel.cc +++ b/proxy/http/HttpTunnel.cc @@ -556,23 +556,30 @@ HttpTunnel::init(HttpSM *sm_arg, Ptr &amutex) if (params->oride.flow_high_water_mark > 0) { flow_state.high_water = params->oride.flow_high_water_mark; } + // This should always be true, we handled default cases back in HttpConfig::reconfigure() ink_assert(flow_state.low_water <= flow_state.high_water); } void -HttpTunnel::reset() +HttpTunnel::reset(bool clear_fvc) { ink_assert(active == false); #ifdef DEBUG for (int i = 0; i < MAX_PRODUCERS; ++i) { - ink_assert(producers[i].alive == false); + ink_assert(producers[i].vc_type == HT_STATIC_WRITE || producers[i].alive == false); } for (int j = 0; j < MAX_CONSUMERS; ++j) { - ink_assert(consumers[j].alive == false); + ink_assert(consumers[j].vc_type == HT_STATIC_READ || consumers[j].alive == false); } #endif + if (clear_fvc && fvc) { + fvc->do_io_close(); + delete fvc; + fvc = nullptr; + } + num_producers = 0; num_consumers = 0; memset(consumers, 0, sizeof(consumers)); @@ -589,6 +596,12 @@ HttpTunnel::kill_tunnel() ink_assert(producer.alive == false); } active = false; + if (fvc) { + fvc->do_io_close(); + delete fvc; + fvc = nullptr; + } + this->deallocate_buffers(); this->deallocate_redirect_postdata_buffers(); this->reset(); @@ -707,6 +720,7 @@ HttpTunnel::add_producer(VConnection *vc, int64_t nbytes_arg, IOBufferReader *re p->do_chunked_passthru = false; p->init_bytes_done = reader_start->read_avail(); + if (p->nbytes < 0) { p->ntodo = p->nbytes; } else { // The byte count given us includes bytes @@ -727,6 +741,10 @@ HttpTunnel::add_producer(VConnection *vc, int64_t nbytes_arg, IOBufferReader *re } else { p->alive = true; } + + if (vc_type == HT_STATIC_WRITE) { + p->read_vio = p->vc->do_io_read(nullptr, 0, nullptr); + } } return p; } @@ -770,6 +788,10 @@ HttpTunnel::add_consumer(VConnection *vc, VConnection *producer, HttpConsumerHan p->consumer_list.push(c); p->num_consumers++; + if (vc_type == HT_STATIC_READ) { + c->write_vio = c->vc->do_io_write(nullptr, 0, nullptr); + } + return c; } @@ -793,6 +815,7 @@ HttpTunnel::tunnel_run(HttpTunnelProducer *p_arg) { Debug("http_tunnel", "tunnel_run started, p_arg is %s", p_arg ? "provided" : "NULL"); + ++reentrancy_count; if (p_arg) { producer_run(p_arg); } else { @@ -816,6 +839,7 @@ HttpTunnel::tunnel_run(HttpTunnelProducer *p_arg) active = false; sm->handleEvent(HTTP_TUNNEL_EVENT_DONE, this); } + --reentrancy_count; } void @@ -978,6 +1002,11 @@ HttpTunnel::producer_run(HttpTunnelProducer *p) } } c->write_vio = c->vc->do_io_write(this, c_write, c->buffer_reader); + if (c->vc_type == HT_STATIC_READ && c->self_producer) { + HttpConsumerHandler jump_point = c->vc_handler; + (sm->*jump_point)(VC_EVENT_WRITE_READY, c); + } + ink_assert(c_write > 0); } @@ -1377,7 +1406,6 @@ HttpTunnel::consumer_handler(int event, HttpTunnelConsumer *c) case VC_EVENT_WRITE_READY: this->consumer_reenable(c); break; - case VC_EVENT_WRITE_COMPLETE: case VC_EVENT_EOS: case VC_EVENT_ERROR: @@ -1765,3 +1793,176 @@ HttpTunnel::deallocate_redirect_postdata_buffers() postbuf = nullptr; } } + +///////////////////////////////////// +// +// setup the fake tunnel +// all data have been saved in this case +// +///////////////////////////////////// +HttpTunnelProducer * +HttpTunnel::setup_fake_tunnel(VConnection *producer, HttpConsumerHandler c_handler, HttpProducerHandler p_handler, bool fvc_success) +{ + int64_t alloc_index = buffer_size_to_index(fvc->get_reader()->read_avail()); + MIOBuffer *buf = new_MIOBuffer(alloc_index); + IOBufferReader *buf_start = buf->alloc_reader(); + + fvc->clean_vio(); + + buf->write(fvc->get_reader(), fvc->get_reader()->read_avail()); + + HttpTunnelConsumer *c = add_consumer(get_fvc(), producer, c_handler, HT_STATIC_READ, "fake read"); + + Debug("fvc_setup_tunnel", "%ld bytes need", fvc->get_bytes()); + HttpTunnelProducer *p = add_producer(get_fvc(), fvc->get_bytes(), buf_start, p_handler, HT_STATIC_WRITE, "fake write"); + + if (fvc_success) { + c->alive = false; + c->write_success = true; + } + chain(c, p); + return p; +} + +///////////////////////////////////// +// +// setup the fake tunnel to collect +// data +// +///////////////////////////////////// +HttpTunnelProducer * +HttpTunnel::add_tunnel_fake(VConnection *producer, HttpConsumerHandler c_handler, HttpProducerHandler p_handler) +{ + MIOBuffer *post_buffer = nullptr; + // FIXME: do chunck bug + if (fvc->get_bytes() < INT64_MAX) { + int64_t alloc_index = buffer_size_to_index(fvc->get_bytes()); + post_buffer = new_MIOBuffer(alloc_index); + } else { + post_buffer = new_MIOBuffer(3); + } + IOBufferReader *buf_start = post_buffer->alloc_reader(); + + fvc->clean_vio(); + + HttpTunnelConsumer *c = add_consumer(get_fvc(), producer, c_handler, HT_STATIC_READ, "fake read"); + HttpTunnelProducer *p = add_producer(get_fvc(), fvc->get_bytes(), buf_start, p_handler, HT_STATIC_WRITE, "fake write"); + + chain(c, p); + + return p; +} + +VIO * +FakeVConnection::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf) +{ + m_read_vio.op = VIO::READ; + m_read_vio.set_continuation(c); + m_read_vio.nbytes = nbytes; + m_read_vio.ndone = 0; + m_read_vio.vc_server = this; + + if (buf) { + m_read_vio.buffer.writer_for(buf); + reenable(&m_read_vio); + } + + return &m_read_vio; +} + +VIO * +FakeVConnection::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *buf, bool owner) +{ + m_write_vio.op = VIO::WRITE; + m_write_vio.set_continuation(c); + m_write_vio.nbytes = nbytes; + m_write_vio.ndone = 0; + m_write_vio.vc_server = this; + + if (buf) { + m_write_vio.buffer.reader_for(buf); + reenable(&m_write_vio); + } + + return &m_write_vio; +} + +void +FakeVConnection::do_io_close(int lerrno) +{ + if (m_closed) { + return; + } + m_closed = 1; + + if (mbuf) { + m_read_vio.buffer.clear(); + m_write_vio.buffer.clear(); + clean_vio(); + // we only free the mbuf in kill_tunnel now + // if we want to setup the "server ->fake ->client" tunnel + // we need to free buf there + // free_MIOBuffer(mbuf); + // mbuf = nullptr; + // buf_start = nullptr; + } +} + +void +FakeVConnection::reenable(VIO *vio) +{ + if (vio == &m_write_vio || vio == &m_read_vio) { + int64_t towrite = m_write_vio.ntodo(); + + if (towrite > 0) { + if (towrite > m_write_vio.get_reader()->read_avail()) { + towrite = m_write_vio.get_reader()->read_avail(); + } + + if (towrite > m_read_vio.ntodo() && m_read_vio._cont) { + towrite = m_read_vio.ntodo(); + } + + if (towrite > 0) { + if (m_read_vio._cont) { + m_read_vio.get_writer()->write(m_write_vio.get_reader(), towrite); + m_read_vio.ndone += towrite; + } + + if (!buf_start) { + buf_start = mbuf->alloc_reader(); + } + Debug("fvc_reenable", "mbuf write %ld, now : %ld", towrite, get_reader()->read_avail()); + mbuf->write(m_write_vio.get_reader(), towrite); + + m_write_vio.get_reader()->consume(towrite); + m_write_vio.ndone += towrite; + } + } + + // call consumer if we have data + if (m_write_vio.ntodo()) { + if (towrite > 0) { + m_write_vio._cont->handleEvent(VC_EVENT_WRITE_READY, &m_write_vio); + } + } else { + m_write_vio._cont->handleEvent(VC_EVENT_WRITE_COMPLETE, &m_write_vio); + } + + // call producer if we have data + if (m_read_vio._cont) { + if (m_read_vio.ntodo() > 0) { + if (m_write_vio.ntodo() <= 0) { + m_read_vio._cont->handleEvent(VC_EVENT_EOS, &m_read_vio); + } else if (towrite > 0) { + m_read_vio._cont->handleEvent(VC_EVENT_READ_READY, &m_read_vio); + } + } else { + m_read_vio._cont->handleEvent(VC_EVENT_READ_COMPLETE, &m_read_vio); + } + } + + } else { + ink_release_assert(!"unknown vio!"); + } +} diff --git a/proxy/http/HttpTunnel.h b/proxy/http/HttpTunnel.h index 2e9d32fb8b6..641dfff1fd9 100644 --- a/proxy/http/HttpTunnel.h +++ b/proxy/http/HttpTunnel.h @@ -43,8 +43,8 @@ #ifdef MAX_CONSUMERS #undef MAX_CONSUMERS #endif -#define MAX_PRODUCERS 2 -#define MAX_CONSUMERS 4 +#define MAX_PRODUCERS 3 +#define MAX_CONSUMERS 5 #define HTTP_TUNNEL_EVENT_DONE (HTTP_TUNNEL_EVENTS_START + 1) #define HTTP_TUNNEL_EVENT_PRECOMPLETE (HTTP_TUNNEL_EVENTS_START + 2) @@ -59,6 +59,7 @@ struct HttpTunnelProducer; class HttpSM; class HttpPagesHandler; +class FakeVConnection; typedef int (HttpSM::*HttpSMHandler)(int event, void *data); struct HttpTunnelConsumer; @@ -73,6 +74,8 @@ enum HttpTunnelType_t { HT_CACHE_WRITE, HT_TRANSFORM, HT_STATIC, + HT_STATIC_READ, + HT_STATIC_WRITE }; enum TunnelChunkingAction_t { @@ -82,6 +85,85 @@ enum TunnelChunkingAction_t { TCA_PASSTHRU_DECHUNKED_CONTENT }; +class FakeVConnection : public VConnection +{ +public: + virtual VIO *do_io_read(Continuation *c, int64_t nbytes = INT64_MAX, MIOBuffer *buf = 0); + virtual VIO *do_io_write(Continuation *c = NULL, int64_t nbytes = INT64_MAX, IOBufferReader *buf = 0, bool owner = false); + virtual void do_io_close(int lerrno = -1); + virtual void + do_io_shutdown(ShutdownHowTo_t howto) + { + ink_assert(0); + }; + virtual void reenable(VIO *vio); + + void alloc_buffer(); + + IOBufferReader * + get_reader() + { + if (!buf_start) { + buf_start = mbuf->alloc_reader(); + ink_assert(buf_start); + } + return buf_start; + } + + int64_t + get_bytes() + { + return bytes; + } + + bool + is_fvc_producer_run() + { + return m_read_vio.nbytes > 0; + } + + void + clean_vio() + { + m_read_vio.buffer.clear(); + m_write_vio.buffer.clear(); + ink_zero(m_read_vio); + ink_zero(m_write_vio); + handler_state = 0; + } + + FakeVConnection(Ptr &amutex, int64_t bytes_arg) : VConnection(amutex), bytes(bytes_arg) { alloc_buffer(); } + + ~FakeVConnection() + { + if (mbuf) { + free_MIOBuffer(mbuf); + mbuf = nullptr; + buf_start = nullptr; + } + } + + HttpSMHandler vc_handler = nullptr; + int event = 0; + void *data = nullptr; + int64_t bytes = 0; + int handler_state = 0; + volatile int m_closed = 0; + +private: + MIOBuffer *mbuf = nullptr; + IOBufferReader *buf_start = nullptr; + + VIO m_read_vio; + VIO m_write_vio; +}; + +inline void +FakeVConnection::alloc_buffer() +{ + mbuf = new_empty_MIOBuffer(); +} + struct ChunkedHandler { enum ChunkedState { CHUNK_READ_CHUNK = 0, @@ -301,7 +383,7 @@ class HttpTunnel : public Continuation HttpTunnel(); void init(HttpSM *sm_arg, Ptr &amutex); - void reset(); + void reset(bool clear_fvc = true); void kill_tunnel(); bool is_tunnel_active() const @@ -317,6 +399,11 @@ class HttpTunnel : public Continuation void allocate_redirect_postdata_buffers(IOBufferReader *ua_reader); void deallocate_redirect_postdata_buffers(); + void fake_vc_init(int64_t size); + HttpTunnelProducer *add_tunnel_fake(VConnection *producer, HttpConsumerHandler c_handler, HttpProducerHandler p_handler); + HttpTunnelProducer *setup_fake_tunnel(VConnection *producer, HttpConsumerHandler c_handler, HttpProducerHandler p_handler, + bool fvc_success = true); + HttpTunnelProducer *add_producer(VConnection *vc, int64_t nbytes, IOBufferReader *reader_start, HttpProducerHandler sm_handler, HttpTunnelType_t vc_type, const char *name); @@ -361,6 +448,93 @@ class HttpTunnel : public Continuation void close_vc(HttpTunnelProducer *p); void close_vc(HttpTunnelConsumer *c); + VConnection * + get_fvc() + { + return static_cast(fvc); + } + + bool + is_fvc_producer_run() + { + return fvc->is_fvc_producer_run(); + } + + int64_t + get_fvc_transfer_bytes() + { + return fvc->get_reader()->read_avail(); + } + + void + set_fvc_handler(HttpSMHandler handler, int event, void *data) + { + fvc->vc_handler = handler; + fvc->event = event; + fvc->data = data; + } + + void + fvc_callcont() + { + HttpSMHandler sm_handler = fvc->vc_handler; + (sm->*sm_handler)(fvc->event, fvc->data); + } + + void + set_fvc_event(int event) + { + fvc->event = event; + } + + void * + get_fvc_data() + { + return fvc->data; + } + + bool + is_fvc_done() + { + HttpTunnelConsumer *c = get_consumer(fvc); + return c->write_success; + } + + void + set_fvc_state(int state) + { + fvc->handler_state = state; + } + + int + get_fvc_state(int *state = nullptr) + { + if (fvc->handler_state != 0 && state != nullptr) { + *state = fvc->handler_state; + } + + return fvc->handler_state; + } + + // test only + IOBufferReader * + get_fvc_reader() + { + return fvc->get_reader(); + } + + bool + is_fvc_closed() + { + return fvc->m_closed; + } + + void + set_fvc_bytes(int64_t b) + { + fvc->bytes = b; + } + private: void internal_error(); void finish_all_internal(HttpTunnelProducer *p, bool chain); @@ -388,10 +562,26 @@ class HttpTunnel : public Continuation PostDataBuffers *postbuf; private: + FakeVConnection *fvc = nullptr; int reentrancy_count; bool call_sm; }; +// void HttpTunnel::fake_vc_init +// +// Initialize FakeVConnection +// +inline void +HttpTunnel::fake_vc_init(int64_t size) +{ + if (fvc) { + delete fvc; + fvc = nullptr; + } + + fvc = new FakeVConnection(mutex, size); +} + // void HttpTunnel::abort_cache_write_finish_others // // Abort all downstream cache writes and finsish