package org.apache.doris.common.publish;

import com.google.common.collect.ImmutableCollection;
import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.TAgentPublishRequest;
import org.apache.doris.thrift.TAgentResult;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TStatusCode;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;

/* loaded from: input_file:org/apache/doris/common/publish/ClusterStatePublisher.class */
public class ClusterStatePublisher {
    private static final Logger LOG = LogManager.getLogger(ClusterStatePublisher.class);
    private static volatile ClusterStatePublisher INSTANCE;
    private ExecutorService executor = ThreadPoolManager.newDaemonFixedThreadPool(5, 256, "cluster-state-publisher", true);
    private SystemInfoService clusterInfoService;

    /* loaded from: input_file:org/apache/doris/common/publish/ClusterStatePublisher$PublishWorker.class */
    public class PublishWorker implements Runnable {
        private ClusterStateUpdate stateUpdate;
        private Backend node;
        private ResponseHandler handler;

        public PublishWorker(ClusterStateUpdate clusterStateUpdate, Backend backend, ResponseHandler responseHandler) {
            this.stateUpdate = clusterStateUpdate;
            this.node = backend;
            this.handler = responseHandler;
        }

        @Override // java.lang.Runnable
        public void run() {
            TAgentResult publishClusterState;
            TNetworkAddress tNetworkAddress = new TNetworkAddress(this.node.getHost(), this.node.getBePort());
            BackendService.Client client = null;
            try {
                try {
                    client = ClientPool.backendPool.borrowObject(tNetworkAddress);
                    try {
                        TAgentPublishRequest thrift = this.stateUpdate.toThrift();
                        try {
                            publishClusterState = client.publishClusterState(thrift);
                        } catch (TException e) {
                            if (!ClientPool.backendPool.reopen(client)) {
                                throw e;
                            }
                            publishClusterState = client.publishClusterState(thrift);
                        }
                        if (publishClusterState.getStatus().getStatusCode() != TStatusCode.OK) {
                            ClusterStatePublisher.LOG.warn("Backend execute publish failed. backend=[{}], message=[{}]", tNetworkAddress, publishClusterState.getStatus().getErrorMsgs());
                        }
                        ClusterStatePublisher.LOG.debug("Success publish to backend([{}])", tNetworkAddress);
                        this.handler.onResponse(this.node);
                        ClientPool.backendPool.returnObject(tNetworkAddress, client);
                    } catch (TException e2) {
                        ClusterStatePublisher.LOG.warn("A thrift exception happened when publish to a backend. backend=[{}], reason=[{}]", tNetworkAddress, e2);
                        this.handler.onFailure(this.node, e2);
                        ClientPool.backendPool.invalidateObject(tNetworkAddress, client);
                        ClientPool.backendPool.returnObject(tNetworkAddress, null);
                    }
                } catch (Exception e3) {
                    ClusterStatePublisher.LOG.warn("Fetch a agent client failed. backend=[{}] reason=[{}]", tNetworkAddress, e3);
                    this.handler.onFailure(this.node, e3);
                }
            } catch (Throwable th) {
                ClientPool.backendPool.returnObject(tNetworkAddress, client);
                throw th;
            }
        }
    }

    public ClusterStatePublisher(SystemInfoService systemInfoService) {
        this.clusterInfoService = systemInfoService;
    }

    public static ClusterStatePublisher getInstance() {
        if (INSTANCE == null) {
            synchronized (ClusterStatePublisher.class) {
                if (INSTANCE == null) {
                    INSTANCE = new ClusterStatePublisher(Env.getCurrentSystemInfo());
                }
            }
        }
        return INSTANCE;
    }

    public void publish(ClusterStateUpdate clusterStateUpdate, Listener listener, int i) {
        ImmutableCollection values = this.clusterInfoService.getIdToBackend().values();
        AckResponseHandler ackResponseHandler = new AckResponseHandler(values, listener);
        Iterator it = values.iterator();
        while (it.hasNext()) {
            this.executor.submit(new PublishWorker(clusterStateUpdate, (Backend) it.next(), ackResponseHandler));
        }
        try {
            if (!ackResponseHandler.awaitAllInMs(i)) {
                Backend[] pendingNodes = ackResponseHandler.pendingNodes();
                if (pendingNodes.length > 0) {
                    LOG.warn("timed out waiting for all nodes to publish. (pending nodes: {})", Arrays.toString(pendingNodes));
                }
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
