Bug 4447:FwdState.cc:447 "serverConnection() == conn" assertion After certain failures, FwdState::retryOrBail() may be called twice, once from FwdState::unregisterdServerEnd() [called from HttpStateData::swanSong()] and once from the FwdState's own connection close handler. This may result in two concurrent connections to the remote server, followed by an assertion upon a connection closure. This patch: - After HttpStateData failures, instead of closing the squid-to-peer connection directly (and, hence, triggering closure handlers), calls HttpStateData::closeServer() and mustStop() for a cleaner exit with fewer wasteful side effects and better debugging. - Creates and remembers a FwdState close handler AsyncCall so that comm_remove_close_handler() can cancel an already scheduled callback. The conversion to the AsyncCall was necessary because legacy [close handler callbacks] cannot be canceled once scheduled. This is a Measurement Factory project. === modified file 'src/FwdState.cc' --- a/src/FwdState.cc 2016-02-13 06:24:27 +0000 +++ b/src/FwdState.cc 2016-02-27 10:20:16 +0000 @@ -102,41 +102,42 @@ void FwdState::abort(void* d) { FwdState* fwd = (FwdState*)d; Pointer tmp = fwd; // Grab a temporary pointer to keep the object alive during our scope. if (Comm::IsConnOpen(fwd->serverConnection())) { fwd->closeServerConnection("store entry aborted"); } else { debugs(17, 7, HERE << "store entry aborted; no connection to close"); } fwd->serverDestinations.clear(); fwd->self = NULL; } void FwdState::closeServerConnection(const char *reason) { debugs(17, 3, "because " << reason << "; " << serverConn); - comm_remove_close_handler(serverConn->fd, fwdServerClosedWrapper, this); + comm_remove_close_handler(serverConn->fd, closeHandler); + closeHandler = NULL; fwdPconnPool->noteUses(fd_table[serverConn->fd].pconn.uses); serverConn->close(); } /**** PUBLIC INTERFACE ********************************************************/ FwdState::FwdState(const Comm::ConnectionPointer &client, StoreEntry * e, HttpRequest * r, const AccessLogEntryPointer &alp): al(alp) { debugs(17, 2, HERE << "Forwarding client request " << client << ", url=" << e->url() ); entry = e; clientConn = client; request = r; HTTPMSGLOCK(request); pconnRace = raceImpossible; start_t = squid_curtime; serverDestinations.reserve(Config.forward_max_tries); e->lock("FwdState"); EBIT_SET(e->flags, ENTRY_FWD_HDR_WAIT); } @@ -429,41 +430,42 @@ debugs(17, 5, HERE << "pconn race happened"); pconnRace = raceHappened; } if (ConnStateData *pinned_connection = request->pinnedConnection()) { pinned_connection->pinning.zeroReply = true; flags.dont_retry = true; // we want to propagate failure to the client debugs(17, 4, "zero reply on pinned connection"); } } /** * Frees fwdState without closing FD or generating an abort */ void FwdState::unregister(Comm::ConnectionPointer &conn) { debugs(17, 3, HERE << entry->url() ); assert(serverConnection() == conn); assert(Comm::IsConnOpen(conn)); - comm_remove_close_handler(conn->fd, fwdServerClosedWrapper, this); + comm_remove_close_handler(conn->fd, closeHandler); + closeHandler = NULL; serverConn = NULL; } // \deprecated use unregister(Comm::ConnectionPointer &conn) instead void FwdState::unregister(int fd) { debugs(17, 3, HERE << entry->url() ); assert(fd == serverConnection()->fd); unregister(serverConn); } /** * FooClient modules call fwdComplete() when they are done * downloading an object. Then, we either 1) re-forward the * request somewhere else if needed, or 2) call storeComplete() * to finish it off */ void FwdState::complete() @@ -660,41 +662,41 @@ { if (status != Comm::OK) { ErrorState *const anErr = makeConnectingError(ERR_CONNECT_FAIL); anErr->xerrno = xerrno; fail(anErr); /* it might have been a timeout with a partially open link */ if (conn != NULL) { if (conn->getPeer()) peerConnectFailed(conn->getPeer()); conn->close(); } retryOrBail(); return; } serverConn = conn; debugs(17, 3, HERE << serverConnection() << ": '" << entry->url() << "'" ); - comm_add_close_handler(serverConnection()->fd, fwdServerClosedWrapper, this); + closeHandler = comm_add_close_handler(serverConnection()->fd, fwdServerClosedWrapper, this); #if USE_OPENSSL if (!request->flags.pinned) { const CachePeer *p = serverConnection()->getPeer(); const bool peerWantsTls = p && p->use_ssl; // userWillTlsToPeerForUs assumes CONNECT == HTTPS const bool userWillTlsToPeerForUs = p && p->options.originserver && request->method == Http::METHOD_CONNECT; const bool needTlsToPeer = peerWantsTls && !userWillTlsToPeerForUs; const bool needTlsToOrigin = !p && request->url.getScheme() == AnyP::PROTO_HTTPS; if (needTlsToPeer || needTlsToOrigin || request->flags.sslPeek) { HttpRequest::Pointer requestPointer = request; AsyncCall::Pointer callback = asyncCall(17,4, "FwdState::ConnectedToPeer", FwdStatePeerAnswerDialer(&FwdState::connectedToPeer, this)); // Use positive timeout when less than one second is left. const time_t sslNegotiationTimeout = max(static_cast(1), timeLeft()); Ssl::PeerConnector *connector = new Ssl::PeerConnector(requestPointer, serverConnection(), clientConn, callback, sslNegotiationTimeout); AsyncJob::Start(connector); // will call our callback @@ -816,80 +818,81 @@ } request->flags.pinned = false; // XXX: what if the ConnStateData set this to flag existing credentials? // XXX: answer: the peer selection *should* catch it and give us only the pinned peer. so we reverse the =0 step below. // XXX: also, logs will now lie if pinning is broken and leads to an error message. if (serverDestinations[0]->peerType == PINNED) { ConnStateData *pinned_connection = request->pinnedConnection(); debugs(17,7, "pinned peer connection: " << pinned_connection); // pinned_connection may become nil after a pconn race if (pinned_connection) serverConn = pinned_connection->borrowPinnedConnection(request, serverDestinations[0]->getPeer()); else serverConn = NULL; if (Comm::IsConnOpen(serverConn)) { pinned_connection->stopPinnedConnectionMonitoring(); flags.connected_okay = true; ++n_tries; request->flags.pinned = true; if (pinned_connection->pinnedAuth()) request->flags.auth = true; - comm_add_close_handler(serverConn->fd, fwdServerClosedWrapper, this); + + closeHandler = comm_add_close_handler(serverConn->fd, fwdServerClosedWrapper, this); syncWithServerConn(pinned_connection->pinning.host); // the server may close the pinned connection before this request pconnRace = racePossible; dispatch(); return; } // Pinned connection failure. debugs(17,2,HERE << "Pinned connection failed: " << pinned_connection); ErrorState *anErr = new ErrorState(ERR_ZERO_SIZE_OBJECT, Http::scServiceUnavailable, request); fail(anErr); self = NULL; // refcounted return; } // Use pconn to avoid opening a new connection. const char *host = NULL; if (!serverDestinations[0]->getPeer()) host = request->GetHost(); Comm::ConnectionPointer temp; // Avoid pconns after races so that the same client does not suffer twice. // This does not increase the total number of connections because we just // closed the connection that failed the race. And re-pinning assumes this. if (pconnRace != raceHappened) temp = pconnPop(serverDestinations[0], host); const bool openedPconn = Comm::IsConnOpen(temp); pconnRace = openedPconn ? racePossible : raceImpossible; // if we found an open persistent connection to use. use it. if (openedPconn) { serverConn = temp; flags.connected_okay = true; debugs(17, 3, HERE << "reusing pconn " << serverConnection()); ++n_tries; - comm_add_close_handler(serverConnection()->fd, fwdServerClosedWrapper, this); + closeHandler = comm_add_close_handler(serverConnection()->fd, fwdServerClosedWrapper, this); syncWithServerConn(request->GetHost()); dispatch(); return; } // We will try to open a new connection, possibly to the same destination. // We reset serverDestinations[0] in case we are using it again because // ConnOpener modifies its destination argument. serverDestinations[0]->local.port(0); serverConn = NULL; #if URL_CHECKSUM_DEBUG entry->mem_obj->checkUrlChecksum(); #endif GetMarkingsToServer(request, *serverDestinations[0]); calls.connector = commCbCall(17,3, "fwdConnectDoneWrapper", CommConnectCbPtrFun(fwdConnectDoneWrapper, this)); === modified file 'src/FwdState.h' --- a/src/FwdState.h 2016-01-01 00:14:27 +0000 +++ b/src/FwdState.h 2016-02-26 20:06:09 +0000 @@ -135,32 +135,34 @@ Comm::ConnectionPointer clientConn; ///< a possibly open connection to the client. time_t start_t; int n_tries; // AsyncCalls which we set and may need cancelling. struct { AsyncCall::Pointer connector; ///< a call linking us to the ConnOpener producing serverConn. } calls; struct { bool connected_okay; ///< TCP link ever opened properly. This affects retry of POST,PUT,CONNECT,etc bool dont_retry; bool forward_completed; } flags; /** connections to open, in order, until successful */ Comm::ConnectionList serverDestinations; Comm::ConnectionPointer serverConn; ///< a successfully opened connection to a server. + AsyncCall::Pointer closeHandler; ///< The serverConn close handler + /// possible pconn race states typedef enum { raceImpossible, racePossible, raceHappened } PconnRace; PconnRace pconnRace; ///< current pconn race state // NP: keep this last. It plays with private/public CBDATA_CLASS2(FwdState); }; void getOutgoingAddress(HttpRequest * request, Comm::ConnectionPointer conn); #endif /* SQUID_FORWARD_H */ === modified file 'src/clients/Client.h' --- a/src/clients/Client.h 2016-02-19 23:15:41 +0000 +++ b/src/clients/Client.h 2016-02-27 10:09:11 +0000 @@ -87,41 +87,43 @@ void serverComplete2(); /**< Continuation of serverComplete */ bool completed; /**< serverComplete() has been called */ protected: // kids customize these virtual void haveParsedReplyHeaders(); /**< called when got final headers */ virtual void completeForwarding(); /**< default calls fwd->complete() */ // BodyConsumer for HTTP: consume request body. bool startRequestBodyFlow(); void handleMoreRequestBodyAvailable(); void handleRequestBodyProductionEnded(); virtual void handleRequestBodyProducerAborted() = 0; // sending of the request body to the server void sendMoreRequestBody(); // has body; kids overwrite to increment I/O stats counters virtual void sentRequestBody(const CommIoCbParams &io) = 0; virtual void doneSendingRequestBody() = 0; - virtual void closeServer() = 0; /**< end communication with the server */ + /// Use this to end communication with the server. The call cancels our + /// closure handler and tells FwdState to forget about the connection. + virtual void closeServer() = 0; virtual bool doneWithServer() const = 0; /**< did we end communication? */ /// whether we may receive more virgin response body bytes virtual bool mayReadVirginReplyBody() const = 0; /// Entry-dependent callbacks use this check to quit if the entry went bad bool abortOnBadEntry(const char *abortReason); bool blockCaching(); #if USE_ADAPTATION void startAdaptation(const Adaptation::ServiceGroupPointer &group, HttpRequest *cause); void adaptVirginReplyBody(const char *buf, ssize_t len); void cleanAdaptation(); virtual bool doneWithAdaptation() const; /**< did we end ICAP communication? */ // BodyConsumer for ICAP: consume adapted response body. void handleMoreAdaptedBodyAvailable(); void handleAdaptedBodyProductionEnded(); void handleAdaptedBodyProducerAborted(); === modified file 'src/comm.cc' --- a/src/comm.cc 2016-01-01 00:14:27 +0000 +++ b/src/comm.cc 2016-02-27 10:14:53 +0000 @@ -959,49 +959,50 @@ struct addrinfo *AI = NULL; to_addr.getAddrInfo(AI, fd_table[fd].sock_family); int x = sendto(fd, buf, len, 0, AI->ai_addr, AI->ai_addrlen); Ip::Address::FreeAddr(AI); PROF_stop(comm_udp_sendto); if (x >= 0) return x; #if _SQUID_LINUX_ if (ECONNREFUSED != errno) #endif debugs(50, DBG_IMPORTANT, "comm_udp_sendto: FD " << fd << ", (family=" << fd_table[fd].sock_family << ") " << to_addr << ": " << xstrerror()); return Comm::COMM_ERROR; } -void +AsyncCall::Pointer comm_add_close_handler(int fd, CLCB * handler, void *data) { debugs(5, 5, "comm_add_close_handler: FD " << fd << ", handler=" << handler << ", data=" << data); AsyncCall::Pointer call=commCbCall(5,4, "SomeCloseHandler", CommCloseCbPtrFun(handler, data)); comm_add_close_handler(fd, call); + return call; } void comm_add_close_handler(int fd, AsyncCall::Pointer &call) { debugs(5, 5, "comm_add_close_handler: FD " << fd << ", AsyncCall=" << call); /*TODO:Check for a similar scheduled AsyncCall*/ // for (c = fd_table[fd].closeHandler; c; c = c->next) // assert(c->handler != handler || c->data != data); call->setNext(fd_table[fd].closeHandler); fd_table[fd].closeHandler = call; } // remove function-based close handler void comm_remove_close_handler(int fd, CLCB * handler, void *data) { === modified file 'src/comm.h' --- a/src/comm.h 2016-01-01 00:14:27 +0000 +++ b/src/comm.h 2016-02-27 10:12:14 +0000 @@ -62,41 +62,41 @@ int comm_openex(int, int, Ip::Address &, int, const char *); unsigned short comm_local_port(int fd); int comm_udp_sendto(int sock, const Ip::Address &to, const void *buf, int buflen); void commCallCloseHandlers(int fd); /// clear a timeout handler by FD number void commUnsetFdTimeout(int fd); /** * Set or clear the timeout for some action on an active connection. * API to replace commSetTimeout() when a Comm::ConnectionPointer is available. */ int commSetConnTimeout(const Comm::ConnectionPointer &conn, int seconds, AsyncCall::Pointer &callback); int commUnsetConnTimeout(const Comm::ConnectionPointer &conn); int ignoreErrno(int); void commCloseAllSockets(void); void checkTimeouts(void); -void comm_add_close_handler(int fd, CLCB *, void *); +AsyncCall::Pointer comm_add_close_handler(int fd, CLCB *, void *); void comm_add_close_handler(int fd, AsyncCall::Pointer &); void comm_remove_close_handler(int fd, CLCB *, void *); void comm_remove_close_handler(int fd, AsyncCall::Pointer &); int comm_udp_recvfrom(int fd, void *buf, size_t len, int flags, Ip::Address &from); int comm_udp_recv(int fd, void *buf, size_t len, int flags); ssize_t comm_udp_send(int s, const void *buf, size_t len, int flags); bool comm_has_incomplete_write(int); /** The read channel has closed and the caller does not expect more data * but needs to detect connection aborts. The current detection method uses * 0-length reads: We read until the error occurs or the writer closes * the connection. If there is a read error, we close the connection. */ void commStartHalfClosedMonitor(int fd); bool commHasHalfClosedMonitor(int fd); // XXX: remove these wrappers which minimize client_side.cc changes in a commit inline void commMarkHalfClosed(int fd) { commStartHalfClosedMonitor(fd); } inline bool commIsHalfClosed(int fd) { return commHasHalfClosedMonitor(fd); } === modified file 'src/http.cc' --- a/src/http.cc 2016-02-19 23:15:41 +0000 +++ b/src/http.cc 2016-02-27 10:08:16 +0000 @@ -148,41 +148,42 @@ return serverConnection; } void HttpStateData::httpStateConnClosed(const CommCloseCbParams ¶ms) { debugs(11, 5, "httpStateFree: FD " << params.fd << ", httpState=" << params.data); doneWithFwd = "httpStateConnClosed()"; // assume FwdState is monitoring too mustStop("HttpStateData::httpStateConnClosed"); } void HttpStateData::httpTimeout(const CommTimeoutCbParams ¶ms) { debugs(11, 4, HERE << serverConnection << ": '" << entry->url() << "'" ); if (entry->store_status == STORE_PENDING) { fwd->fail(new ErrorState(ERR_READ_TIMEOUT, Http::scGatewayTimeout, fwd->request)); } - serverConnection->close(); + closeServer(); + mustStop("HttpStateData::httpTimeout"); } /// Remove an existing public store entry if the incoming response (to be /// stored in a currently private entry) is going to invalidate it. static void httpMaybeRemovePublic(StoreEntry * e, Http::StatusCode status) { int remove = 0; int forbidden = 0; StoreEntry *pe; // If the incoming response already goes into a public entry, then there is // nothing to remove. This protects ready-for-collapsing entries as well. if (!EBIT_TEST(e->flags, KEY_PRIVATE)) return; switch (status) { case Http::scOkay: @@ -1133,41 +1134,42 @@ debugs(11, 3, "http socket closing"); return; } if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) { abortTransaction("store entry aborted while reading reply"); return; } // handle I/O errors if (io.flag != Comm::OK || len < 0) { debugs(11, 2, HERE << io.conn << ": read failure: " << xstrerror() << "."); if (ignoreErrno(io.xerrno)) { flags.do_next_read = true; } else { ErrorState *err = new ErrorState(ERR_READ_ERROR, Http::scBadGateway, fwd->request); err->xerrno = io.xerrno; fwd->fail(err); flags.do_next_read = false; - serverConnection->close(); + closeServer(); + mustStop("HttpStateData::readReply"); } return; } // update I/O stats if (len > 0) { readBuf->appended(len); reply_bytes_read += len; #if USE_DELAY_POOLS DelayId delayId = entry->mem_obj->mostBytesAllowed(); delayId.bytesIn(len); #endif kb_incr(&(statCounter.server.all.kbytes_in), len); kb_incr(&(statCounter.server.http.kbytes_in), len); ++ IOStats.Http.reads; for (clen = len - 1, bin = 0; clen; ++bin) clen >>= 1; @@ -1289,41 +1291,42 @@ // parsed headers but got no reply debugs(11, DBG_IMPORTANT, "WARNING: HTTP: Invalid Response: No reply at all for " << entry->url() << " AKA " << request->GetHost() << request->urlpath.termedBuf() ); error = ERR_INVALID_RESP; } } else { assert(eof); if (readBuf->hasContent()) { error = ERR_INVALID_RESP; debugs(11, DBG_IMPORTANT, "WARNING: HTTP: Invalid Response: Headers did not parse at all for " << entry->url() << " AKA " << request->GetHost() << request->urlpath.termedBuf() ); } else { error = ERR_ZERO_SIZE_OBJECT; debugs(11, (request->flags.accelerated?DBG_IMPORTANT:2), "WARNING: HTTP: Invalid Response: No object data received for " << entry->url() << " AKA " << request->GetHost() << request->urlpath.termedBuf() ); } } assert(error != ERR_NONE); entry->reset(); fwd->fail(new ErrorState(error, Http::scBadGateway, fwd->request)); flags.do_next_read = false; - serverConnection->close(); + closeServer(); + mustStop("HttpStateData::continueAfterParsingHeader"); return false; // quit on error } /** truncate what we read if we read too much so that writeReplyBody() writes no more than what we should have read */ void HttpStateData::truncateVirginBody() { assert(flags.headers_parsed); HttpReply *vrep = virginReply(); int64_t clen = -1; if (!vrep->expectingBody(request->method, clen) || clen < 0) return; // no body or a body of unknown size, including chunked const int64_t body_bytes_read = reply_bytes_read - header_bytes_read; if (body_bytes_read - body_bytes_truncated <= clen) return; // we did not read too much or already took care of the extras if (const int64_t extras = body_bytes_read - body_bytes_truncated - clen) { @@ -1523,69 +1526,69 @@ { debugs(11, 5, HERE << serverConnection << ": size " << io.size << ": errflag " << io.flag << "."); #if URL_CHECKSUM_DEBUG entry->mem_obj->checkUrlChecksum(); #endif if (io.size > 0) { fd_bytes(io.fd, io.size, FD_WRITE); kb_incr(&(statCounter.server.all.kbytes_out), io.size); kb_incr(&(statCounter.server.http.kbytes_out), io.size); } if (io.flag == Comm::ERR_CLOSING) return; if (io.flag) { ErrorState *err = new ErrorState(ERR_WRITE_ERROR, Http::scBadGateway, fwd->request); err->xerrno = io.xerrno; fwd->fail(err); - serverConnection->close(); + closeServer(); + mustStop("HttpStateData::wroteLast"); return; } sendComplete(); } /// successfully wrote the entire request (including body, last-chunk, etc.) void HttpStateData::sendComplete() { /* * Set the read timeout here because it hasn't been set yet. * We only set the read timeout after the request has been * fully written to the peer. If we start the timeout * after connection establishment, then we are likely to hit * the timeout for POST/PUT requests that have very large * request bodies. */ typedef CommCbMemFunT TimeoutDialer; AsyncCall::Pointer timeoutCall = JobCallback(11, 5, TimeoutDialer, this, HttpStateData::httpTimeout); commSetConnTimeout(serverConnection, Config.Timeout.read, timeoutCall); flags.request_sent = true; request->hier.peer_http_request_sent = current_time; } -// Close the HTTP server connection. Used by serverComplete(). void HttpStateData::closeServer() { debugs(11,5, HERE << "closing HTTP server " << serverConnection << " this " << this); if (Comm::IsConnOpen(serverConnection)) { fwd->unregister(serverConnection); comm_remove_close_handler(serverConnection->fd, closeHandler); closeHandler = NULL; serverConnection->close(); } } bool HttpStateData::doneWithServer() const { return !Comm::IsConnOpen(serverConnection); } /* @@ -2354,41 +2357,42 @@ HttpStateData::handleMoreRequestBodyAvailable() { if (eof || !Comm::IsConnOpen(serverConnection)) { // XXX: we should check this condition in other callbacks then! // TODO: Check whether this can actually happen: We should unsubscribe // as a body consumer when the above condition(s) are detected. debugs(11, DBG_IMPORTANT, HERE << "Transaction aborted while reading HTTP body"); return; } assert(requestBodySource != NULL); if (requestBodySource->buf().hasContent()) { // XXX: why does not this trigger a debug message on every request? if (flags.headers_parsed && !flags.abuse_detected) { flags.abuse_detected = true; debugs(11, DBG_IMPORTANT, "http handleMoreRequestBodyAvailable: Likely proxy abuse detected '" << request->client_addr << "' -> '" << entry->url() << "'" ); if (virginReply()->sline.status() == Http::scInvalidHeader) { - serverConnection->close(); + closeServer(); + mustStop("HttpStateData::handleMoreRequestBodyAvailable"); return; } } } HttpStateData::handleMoreRequestBodyAvailable(); } // premature end of the request body void HttpStateData::handleRequestBodyProducerAborted() { Client::handleRequestBodyProducerAborted(); if (entry->isEmpty()) { debugs(11, 3, "request body aborted: " << serverConnection); // We usually get here when ICAP REQMOD aborts during body processing. // We might also get here if client-side aborts, but then our response // should not matter because either client-side will provide its own or // there will be no response at all (e.g., if the the client has left). ErrorState *err = new ErrorState(ERR_ICAP_FAILURE, Http::scInternalServerError, fwd->request);