/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.sdk.dataproxy.network.http;

import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.utils.URLEncodedUtils;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;
import org.apache.inlong.common.enums.DataProxyErrCode;
import org.apache.inlong.dataproxy.shaded.io.netty.handler.codec.http.HttpHeaderValues;
import org.apache.inlong.dataproxy.shaded.org.apache.commons.lang3.StringUtils;
import org.apache.inlong.sdk.dataproxy.common.ErrorCode;
import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
import org.apache.inlong.sdk.dataproxy.common.SdkConsts;
import org.apache.inlong.sdk.dataproxy.config.HostInfo;
import org.apache.inlong.sdk.dataproxy.network.ClientMgr;
import org.apache.inlong.sdk.dataproxy.network.http.HttpAsyncObj;
import org.apache.inlong.sdk.dataproxy.sender.BaseSender;
import org.apache.inlong.sdk.dataproxy.sender.http.HttpEventInfo;
import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.utils.HttpUtils;
import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HttpClientMgr
implements ClientMgr {
    private static final Logger logger = LoggerFactory.getLogger(HttpClientMgr.class);
    private static final LogCounter updConExptCnt = new LogCounter(10L, 100000L, 60000L);
    private static final LogCounter sendMsgExptCnt = new LogCounter(10L, 100000L, 60000L);
    private static final LogCounter asyncSendExptCnt = new LogCounter(10L, 100000L, 60000L);
    private final BaseSender sender;
    private final HttpMsgSenderConfig httpConfig;
    private CloseableHttpClient httpClient;
    private final LinkedBlockingQueue<HttpAsyncObj> messageCache;
    private final ExecutorService workerServices = Executors.newCachedThreadPool();
    private volatile boolean existSend = false;
    private final AtomicBoolean shutDown = new AtomicBoolean(false);
    private ConcurrentHashMap<String, HostInfo> usingNodeMaps = new ConcurrentHashMap();
    private final ConcurrentHashMap<String, Long> connFailNodeMap = new ConcurrentHashMap();
    private List<String> activeNodes = new ArrayList<String>();
    private volatile long lastUpdateTime = -1L;
    private final AtomicInteger reqSendIndex = new AtomicInteger(0);

    public HttpClientMgr(BaseSender sender, HttpMsgSenderConfig httpConfig) {
        this.sender = sender;
        this.httpConfig = httpConfig;
        this.messageCache = new LinkedBlockingQueue(httpConfig.getMaxInFlightReqCnt());
    }

    @Override
    public boolean start(ProcessResult procResult) {
        if (!HttpUtils.constructHttpClient(this.httpConfig.isRptDataByHttps(), this.httpConfig.getHttpSocketTimeoutMs(), this.httpConfig.getHttpConTimeoutMs(), this.httpConfig.getTlsVersion(), procResult)) {
            return false;
        }
        this.httpClient = (CloseableHttpClient)procResult.getRetData();
        for (int i = 0; i < this.httpConfig.getHttpAsyncRptWorkerNum(); ++i) {
            this.workerServices.execute(new HttpAsyncReportWorker(i));
        }
        logger.info("ClientMgr({}) started!", (Object)this.sender.getSenderId());
        return procResult.setSuccess();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void stop() {
        if (!this.shutDown.compareAndSet(false, true)) {
            return;
        }
        int remainCnt = 0;
        long stopTime = System.currentTimeMillis();
        logger.info("ClientMgr({}) is closing...", (Object)this.sender.getSenderId());
        if (!this.messageCache.isEmpty()) {
            if (!this.httpConfig.isDiscardHttpCacheWhenClosing()) {
                long startTime = System.currentTimeMillis();
                while (!this.messageCache.isEmpty() && System.currentTimeMillis() - startTime < this.httpConfig.getHttpCloseWaitPeriodMs()) {
                    ProxyUtils.sleepSomeTime(100L);
                }
            }
            this.existSend = true;
            while (!this.messageCache.isEmpty()) {
                HttpAsyncObj asyncObj = this.messageCache.poll();
                if (asyncObj == null) continue;
                boolean isSucc = true;
                long currentTime = System.currentTimeMillis();
                this.sender.getMetricHolder().addAsyncHttpSucGetMetric(asyncObj.getHttpEvent().getGroupId(), asyncObj.getHttpEvent().getStreamId(), asyncObj.getHttpEvent().getMsgCnt());
                try {
                    asyncObj.getCallback().onMessageAck(new ProcessResult(ErrorCode.SDK_CLOSED));
                }
                catch (Throwable ex) {
                    isSucc = false;
                    if (!asyncSendExptCnt.shouldPrint()) continue;
                    logger.error("HttpAsync({}) callback event exception", (Object)this.sender.getSenderId(), (Object)ex);
                }
                finally {
                    this.sender.releaseCachePermits(asyncObj.getHttpEvent().getBodySize());
                    if (isSucc) {
                        this.sender.getMetricHolder().addCallbackSucMetric(asyncObj.getHttpEvent().getGroupId(), asyncObj.getHttpEvent().getStreamId(), asyncObj.getHttpEvent().getMsgCnt(), currentTime - asyncObj.getRptMs(), System.currentTimeMillis() - currentTime);
                        continue;
                    }
                    this.sender.getMetricHolder().addCallbackFailMetric(ErrorCode.SDK_CLOSED.getErrCode(), asyncObj.getHttpEvent().getGroupId(), asyncObj.getHttpEvent().getStreamId(), asyncObj.getHttpEvent().getMsgCnt(), System.currentTimeMillis() - currentTime);
                }
            }
        }
        this.workerServices.shutdown();
        if (this.httpClient != null) {
            try {
                this.httpClient.close();
            }
            catch (Throwable throwable) {
                // empty catch block
            }
        }
        logger.info("ClientMgr({}) stopped, remain ({}) messages discarded, cost {} ms!", new Object[]{this.sender.getSenderId(), remainCnt, System.currentTimeMillis() - stopTime});
    }

    @Override
    public int getInflightMsgCnt() {
        return this.messageCache.size();
    }

    @Override
    public int getActiveNodeCnt() {
        return this.activeNodes.size();
    }

    @Override
    public void updateProxyInfoList(boolean nodeChanged, ConcurrentHashMap<String, HostInfo> hostInfoMap) {
        block12: {
            if (hostInfoMap.isEmpty() || this.shutDown.get()) {
                return;
            }
            long curTime = System.currentTimeMillis();
            try {
                ArrayList<HostInfo> candidateNodes = new ArrayList<HostInfo>(hostInfoMap.size());
                candidateNodes.addAll(hostInfoMap.values());
                Collections.sort(candidateNodes);
                Collections.shuffle(candidateNodes);
                int curTotalCnt = candidateNodes.size();
                int needActiveCnt = Math.min(this.httpConfig.getAliveConnections(), curTotalCnt);
                int maxCycleCnt = 3;
                this.connFailNodeMap.clear();
                ArrayList<String> realHosts = new ArrayList<String>();
                ConcurrentHashMap<String, HostInfo> tmpNodeMaps = new ConcurrentHashMap<String, HostInfo>();
                do {
                    int selectCnt = 0;
                    long selectTime = System.currentTimeMillis();
                    for (HostInfo hostInfo : candidateNodes) {
                        Long lstFailTime;
                        if (realHosts.contains(hostInfo.getReferenceName()) || (lstFailTime = this.connFailNodeMap.get(hostInfo.getReferenceName())) != null && selectTime - lstFailTime <= this.httpConfig.getHttpNodeReuseWaitIfFailMs()) continue;
                        tmpNodeMaps.put(hostInfo.getReferenceName(), hostInfo);
                        realHosts.add(hostInfo.getReferenceName());
                        if (lstFailTime != null) {
                            this.connFailNodeMap.remove(hostInfo.getReferenceName());
                        }
                        if (++selectCnt < needActiveCnt) continue;
                        break;
                    }
                    if (!realHosts.isEmpty()) break;
                    ProxyUtils.sleepSomeTime(1000L);
                } while (--maxCycleCnt > 0);
                if (realHosts.isEmpty()) {
                    if (nodeChanged) {
                        logger.error("ClientMgr({}) changed nodes, but all nodes failure, nodes={}, failNodes={}!", new Object[]{this.sender.getSenderId(), candidateNodes, this.connFailNodeMap});
                    } else {
                        logger.error("ClientMgr({}) re-choose nodes, but all nodes failure, nodes={}, failNodes={}!", new Object[]{this.sender.getSenderId(), candidateNodes, this.connFailNodeMap});
                    }
                } else {
                    this.lastUpdateTime = System.currentTimeMillis();
                    this.usingNodeMaps = tmpNodeMaps;
                    this.activeNodes = realHosts;
                    if (nodeChanged) {
                        logger.info("ClientMgr({}) changed nodes, wast {}ms, nodeCnt=(r:{}-a:{}), actives={}, fail={}", new Object[]{this.sender.getSenderId(), System.currentTimeMillis() - curTime, needActiveCnt, realHosts.size(), realHosts, this.connFailNodeMap.keySet()});
                    } else {
                        logger.info("ClientMgr({}) re-choose nodes, wast {}ms, nodeCnt=(r:{}-a:{}), actives={}, fail={}", new Object[]{this.sender.getSenderId(), System.currentTimeMillis() - curTime, needActiveCnt, realHosts.size(), realHosts, this.connFailNodeMap.keySet()});
                    }
                }
            }
            catch (Throwable ex) {
                if (!updConExptCnt.shouldPrint()) break block12;
                logger.warn("ClientMgr({}) update nodes throw exception", (Object)this.sender.getSenderId(), (Object)ex);
            }
        }
    }

    public boolean asyncSendMessage(HttpAsyncObj asyncObj, ProcessResult procResult) {
        if (this.shutDown.get()) {
            return procResult.setFailResult(ErrorCode.SDK_CLOSED);
        }
        List<String> curNodes = this.activeNodes;
        if (curNodes.isEmpty()) {
            return procResult.setFailResult(ErrorCode.EMPTY_ACTIVE_NODE_SET);
        }
        try {
            if (!this.messageCache.offer(asyncObj)) {
                return procResult.setFailResult(ErrorCode.HTTP_ASYNC_OFFER_FAIL);
            }
            return procResult.setSuccess();
        }
        catch (Throwable ex) {
            if (asyncSendExptCnt.shouldPrint()) {
                logger.warn("ClientMgr({}) async offer event exception", (Object)this.sender.getSenderId(), (Object)ex);
            }
            return procResult.setFailResult(ErrorCode.HTTP_ASYNC_OFFER_EXCEPTION, ex.getMessage());
        }
    }

    public boolean sendMessage(HttpEventInfo httpEvent, ProcessResult procResult) {
        if (this.shutDown.get()) {
            return procResult.setFailResult(ErrorCode.SDK_CLOSED);
        }
        List<String> curNodes = this.activeNodes;
        int curNodeSize = curNodes.size();
        if (curNodeSize == 0) {
            return procResult.setFailResult(ErrorCode.EMPTY_ACTIVE_NODE_SET);
        }
        int nullNodeCnt = 0;
        HostInfo back1thNode = null;
        long nodeSelectTime = System.currentTimeMillis();
        int startPos = this.reqSendIndex.getAndIncrement();
        for (int index = 0; index < curNodeSize; ++index) {
            String curNode;
            HostInfo hostInfo;
            if ((hostInfo = this.usingNodeMaps.get(curNode = curNodes.get(Math.abs(startPos++) % curNodeSize))) == null) {
                ++nullNodeCnt;
                continue;
            }
            Long lstFailTime = this.connFailNodeMap.get(hostInfo.getReferenceName());
            if (lstFailTime != null) {
                if (nodeSelectTime - lstFailTime <= this.httpConfig.getHttpNodeReuseWaitIfFailMs()) {
                    back1thNode = hostInfo;
                    continue;
                }
                this.connFailNodeMap.remove(hostInfo.getReferenceName(), lstFailTime);
            }
            return this.innSendMsgByHttp(httpEvent, hostInfo, procResult);
        }
        if (nullNodeCnt == curNodeSize) {
            return procResult.setFailResult(ErrorCode.EMPTY_ACTIVE_NODE_SET);
        }
        if (back1thNode != null) {
            return this.innSendMsgByHttp(httpEvent, back1thNode, procResult);
        }
        return procResult.setFailResult(ErrorCode.NO_VALID_REMOTE_NODE);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean innSendMsgByHttp(HttpEventInfo httpEvent, HostInfo hostInfo, ProcessResult procResult) {
        boolean bl;
        block48: {
            CloseableHttpResponse response;
            String rmtRptUrl;
            block47: {
                String returnStr;
                HttpPost httpPost;
                block41: {
                    boolean bl2;
                    block46: {
                        block45: {
                            int errCode;
                            JsonElement msgElement;
                            block42: {
                                boolean bl3;
                                block44: {
                                    block43: {
                                        block38: {
                                            boolean bl4;
                                            block40: {
                                                block39: {
                                                    block35: {
                                                        boolean bl5;
                                                        block37: {
                                                            block36: {
                                                                rmtRptUrl = (this.httpConfig.isRptDataByHttps() ? SdkConsts.PREFIX_HTTPS : SdkConsts.PREFIX_HTTP) + hostInfo.getReferenceName() + "/dataproxy/message";
                                                                if (!this.buildFormUrlPost(rmtRptUrl, httpEvent, procResult)) {
                                                                    return false;
                                                                }
                                                                httpPost = (HttpPost)procResult.getRetData();
                                                                response = null;
                                                                response = this.httpClient.execute((HttpUriRequest)httpPost);
                                                                returnStr = EntityUtils.toString((HttpEntity)response.getEntity());
                                                                int returnCode = response.getStatusLine().getStatusCode();
                                                                if (200 == returnCode) break block35;
                                                                if (sendMsgExptCnt.shouldPrint()) {
                                                                    logger.warn("ClientMgr({}) report event failure, errCode={}, returnStr={}", new Object[]{this.sender.getSenderId(), returnCode, returnStr});
                                                                }
                                                                if (response.getStatusLine().getStatusCode() >= 500) {
                                                                    this.connFailNodeMap.put(hostInfo.getReferenceName(), System.currentTimeMillis());
                                                                }
                                                                bl5 = procResult.setFailResult(ErrorCode.RMT_RETURN_FAILURE, response.getStatusLine().getStatusCode() + ":" + returnStr);
                                                                if (httpPost == null) break block36;
                                                                httpPost.releaseConnection();
                                                            }
                                                            if (response != null) {
                                                                try {
                                                                    response.close();
                                                                }
                                                                catch (Throwable ex) {
                                                                    if (!sendMsgExptCnt.shouldPrint()) break block37;
                                                                    logger.warn("ClientMgr({}) close response exception, url={}", new Object[]{this.sender.getSenderId(), rmtRptUrl, ex});
                                                                }
                                                            }
                                                        }
                                                        return bl5;
                                                    }
                                                    if (!StringUtils.isBlank(returnStr)) break block38;
                                                    bl4 = procResult.setFailResult(ErrorCode.RMT_RETURN_BLANK_CONTENT);
                                                    if (httpPost == null) break block39;
                                                    httpPost.releaseConnection();
                                                }
                                                if (response != null) {
                                                    try {
                                                        response.close();
                                                    }
                                                    catch (Throwable ex) {
                                                        if (!sendMsgExptCnt.shouldPrint()) break block40;
                                                        logger.warn("ClientMgr({}) close response exception, url={}", new Object[]{this.sender.getSenderId(), rmtRptUrl, ex});
                                                    }
                                                }
                                            }
                                            return bl4;
                                        }
                                        if (logger.isDebugEnabled()) {
                                            logger.debug("success to report event, url={}, result={}", (Object)rmtRptUrl, (Object)returnStr);
                                        }
                                        JsonObject jsonResponse = JsonParser.parseString((String)returnStr).getAsJsonObject();
                                        JsonElement codeElement = jsonResponse.get("code");
                                        msgElement = jsonResponse.get("msg");
                                        if (codeElement == null) break block41;
                                        errCode = codeElement.getAsInt();
                                        if (errCode != DataProxyErrCode.SUCCESS.getErrCode()) break block42;
                                        bl3 = procResult.setSuccess();
                                        if (httpPost == null) break block43;
                                        httpPost.releaseConnection();
                                    }
                                    if (response != null) {
                                        try {
                                            response.close();
                                        }
                                        catch (Throwable ex) {
                                            if (!sendMsgExptCnt.shouldPrint()) break block44;
                                            logger.warn("ClientMgr({}) close response exception, url={}", new Object[]{this.sender.getSenderId(), rmtRptUrl, ex});
                                        }
                                    }
                                }
                                return bl3;
                            }
                            bl2 = procResult.setFailResult(ErrorCode.DP_RETURN_FAILURE, errCode + ":" + (msgElement != null ? msgElement.getAsString() : ""));
                            if (httpPost == null) break block45;
                            httpPost.releaseConnection();
                        }
                        if (response != null) {
                            try {
                                response.close();
                            }
                            catch (Throwable ex) {
                                if (!sendMsgExptCnt.shouldPrint()) break block46;
                                logger.warn("ClientMgr({}) close response exception, url={}", new Object[]{this.sender.getSenderId(), rmtRptUrl, ex});
                            }
                        }
                    }
                    return bl2;
                }
                try {
                    bl = procResult.setFailResult(ErrorCode.DP_RETURN_UNKNOWN_ERROR, returnStr);
                    if (httpPost == null) break block47;
                }
                catch (Throwable ex) {
                    boolean bl6;
                    block50: {
                        block49: {
                            try {
                                if (sendMsgExptCnt.shouldPrint()) {
                                    logger.warn("ClientMgr({}) report event exception, url={}", new Object[]{this.sender.getSenderId(), rmtRptUrl, ex});
                                }
                                bl6 = procResult.setFailResult(ErrorCode.HTTP_VISIT_DP_EXCEPTION, ex.getMessage());
                                if (httpPost == null) break block49;
                            }
                            catch (Throwable throwable) {
                                block51: {
                                    if (httpPost != null) {
                                        httpPost.releaseConnection();
                                    }
                                    if (response != null) {
                                        try {
                                            response.close();
                                        }
                                        catch (Throwable ex2) {
                                            if (!sendMsgExptCnt.shouldPrint()) break block51;
                                            logger.warn("ClientMgr({}) close response exception, url={}", new Object[]{this.sender.getSenderId(), rmtRptUrl, ex2});
                                        }
                                    }
                                }
                                throw throwable;
                            }
                            httpPost.releaseConnection();
                        }
                        if (response != null) {
                            try {
                                response.close();
                            }
                            catch (Throwable ex3) {
                                if (!sendMsgExptCnt.shouldPrint()) break block50;
                                logger.warn("ClientMgr({}) close response exception, url={}", new Object[]{this.sender.getSenderId(), rmtRptUrl, ex3});
                            }
                        }
                    }
                    return bl6;
                }
                httpPost.releaseConnection();
            }
            if (response != null) {
                try {
                    response.close();
                }
                catch (Throwable ex) {
                    if (!sendMsgExptCnt.shouldPrint()) break block48;
                    logger.warn("ClientMgr({}) close response exception, url={}", new Object[]{this.sender.getSenderId(), rmtRptUrl, ex});
                }
            }
        }
        return bl;
    }

    private boolean buildFormUrlPost(String rmtRptUrl, HttpEventInfo httpEvent, ProcessResult procResult) {
        ArrayList<BasicNameValuePair> contents = new ArrayList<BasicNameValuePair>();
        try {
            HttpPost httpPost = new HttpPost(rmtRptUrl);
            httpPost.setHeader("Connection", HttpHeaderValues.CLOSE.toString());
            httpPost.setHeader("Content-Type", HttpHeaderValues.APPLICATION_X_WWW_FORM_URLENCODED.toString());
            contents.add(new BasicNameValuePair("groupId", httpEvent.getGroupId()));
            contents.add(new BasicNameValuePair("streamId", httpEvent.getStreamId()));
            contents.add(new BasicNameValuePair("dt", String.valueOf(httpEvent.getDtMs())));
            contents.add(new BasicNameValuePair("body", StringUtils.join(httpEvent.getBodyList(), this.httpConfig.getHttpEventsSeparator())));
            contents.add(new BasicNameValuePair("cnt", String.valueOf(httpEvent.getMsgCnt())));
            if (!this.httpConfig.isSepEventByLF()) {
                contents.add(new BasicNameValuePair("rcdDlmtr", this.httpConfig.getHttpEventsSeparator()));
            }
            String encodedContents = URLEncodedUtils.format(contents, (Charset)StandardCharsets.UTF_8);
            httpPost.setEntity((HttpEntity)new StringEntity(encodedContents));
            if (logger.isDebugEnabled()) {
                logger.debug("begin to post request to {}, encoded content is: {}", (Object)rmtRptUrl, (Object)encodedContents);
            }
            return procResult.setSuccess(httpPost);
        }
        catch (Throwable ex) {
            if (sendMsgExptCnt.shouldPrint()) {
                logger.warn("ClientMgr({}) build form-url content failure, content={}", new Object[]{this.sender.getSenderId(), contents, ex});
            }
            return procResult.setFailResult(ErrorCode.BUILD_FORM_CONTENT_EXCEPTION, ex.getMessage());
        }
    }

    private class HttpAsyncReportWorker
    implements Runnable {
        private final String workerId;

        public HttpAsyncReportWorker(int workerId) {
            this.workerId = HttpClientMgr.this.sender.getSenderId() + "-" + workerId;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            long curTime = 0L;
            ProcessResult procResult = new ProcessResult();
            logger.info("HttpAsyncReportWorker({}) started", (Object)this.workerId);
            while (!HttpClientMgr.this.shutDown.get() || !HttpClientMgr.this.messageCache.isEmpty()) {
                while (!HttpClientMgr.this.messageCache.isEmpty() && !HttpClientMgr.this.existSend) {
                    HttpAsyncObj asyncObj = (HttpAsyncObj)HttpClientMgr.this.messageCache.poll();
                    if (asyncObj == null) continue;
                    HttpClientMgr.this.sender.getMetricHolder().addAsyncHttpSucGetMetric(asyncObj.getHttpEvent().getGroupId(), asyncObj.getHttpEvent().getStreamId(), asyncObj.getHttpEvent().getMsgCnt());
                    try {
                        HttpClientMgr.this.sendMessage(asyncObj.getHttpEvent(), procResult);
                        curTime = System.currentTimeMillis();
                        asyncObj.getCallback().onMessageAck(procResult);
                    }
                    catch (Throwable ex) {
                        if (!asyncSendExptCnt.shouldPrint()) continue;
                        logger.error("HttpAsync({}) report event exception", (Object)this.workerId, (Object)ex);
                    }
                    finally {
                        HttpClientMgr.this.sender.releaseCachePermits(asyncObj.getHttpEvent().getBodySize());
                        if (procResult.isSuccess()) {
                            HttpClientMgr.this.sender.getMetricHolder().addCallbackSucMetric(asyncObj.getHttpEvent().getGroupId(), asyncObj.getHttpEvent().getStreamId(), asyncObj.getHttpEvent().getMsgCnt(), curTime - asyncObj.getRptMs(), System.currentTimeMillis() - curTime);
                            continue;
                        }
                        HttpClientMgr.this.sender.getMetricHolder().addCallbackFailMetric(procResult.getErrCode(), asyncObj.getHttpEvent().getGroupId(), asyncObj.getHttpEvent().getStreamId(), asyncObj.getHttpEvent().getMsgCnt(), System.currentTimeMillis() - curTime);
                    }
                }
                if (HttpClientMgr.this.existSend) break;
                ProxyUtils.sleepSomeTime(HttpClientMgr.this.httpConfig.getHttpAsyncWorkerIdleWaitMs());
            }
            logger.info("HttpAsyncReportWorker({}) stopped", (Object)this.workerId);
        }
    }
}

