package com.alibaba.hbase.haclient;

import com.alibaba.hbase.client.AliHBaseConstants;
import com.alibaba.hbase.haclient.dualservice.DualExecutor;
import com.alibaba.hbase.haclient.dualservice.DualTableUtil;
import com.alibaba.lindorm.client.LindormClientConstants;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.client.AliHBaseMultiClusterConnection;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.zookeeper.CreateMode;

/* loaded from: input_file:com/alibaba/hbase/haclient/AliHBaseMultiClusterConnectionImpl.class */
public class AliHBaseMultiClusterConnectionImpl extends AliHBaseMultiClusterConnection implements Switchable {
    private static final Log LOG = LogFactory.getLog(AliHBaseMultiClusterConnectionImpl.class);
    private ClusterSwitchTracker masterClusterSwitchTracker;
    private List<ClusterSwitchTracker> slaveClusterSwitchTrackers;
    private volatile SwitchCommand currentSwitchCommand;
    private final Configuration originalConf;
    private final String masterClusterKey;
    private volatile String currentClusterKey;
    private volatile ConnectInfo connectInfo;
    private ZKWatcher linkZooKeeper;
    private Lock linkZookeeperLock;

    public AliHBaseMultiClusterConnectionImpl(Configuration configuration, ExecutorService executorService, User user) throws IOException {
        super(configuration, executorService, user);
        this.slaveClusterSwitchTrackers = new ArrayList();
        this.currentSwitchCommand = null;
        this.currentClusterKey = null;
        this.connectInfo = null;
        this.linkZooKeeper = null;
        this.linkZookeeperLock = new ReentrantLock();
        this.originalConf = new Configuration(configuration);
        String str = this.originalConf.get("hbase.client.endpoint");
        if (StringUtils.isBlank(str)) {
            throw new IOException("hbase endpoint can not be blank");
        }
        if (StringUtils.isBlank(this.originalConf.get(AliHBaseConstants.HACLIENT_CLUSTER_ID))) {
            throw new IOException("haclient cluster id can not be blank");
        }
        try {
            this.connectInfo = ConnectInfoUtil.getConnectInfoFromZK(str, configuration);
            ConnectInfoUtil.flushConnectInfo(this.connectInfo, configuration);
        } catch (Exception e) {
            LOG.warn("Get connect info from endpoint failed, " + e);
            this.connectInfo = ConnectInfoUtil.getConnectInfoFromXML(configuration);
        }
        if (null == this.connectInfo) {
            throw new IOException("Get connect info failed.");
        }
        try {
            startThreadForUpdateDualConfig(str, configuration);
        } catch (Exception e2) {
            LOG.warn("Start thread update dual table config failed " + e2);
        }
        this.masterClusterKey = this.connectInfo.getMasterWatchZK();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        try {
            this.masterClusterSwitchTracker = new ClusterSwitchTracker(ClusterSwitchUtil.createConfWithConnectKey(this.masterClusterKey, configuration), this, countDownLatch);
            List<String> slaveWatchZKList = this.connectInfo.getSlaveWatchZKList();
            if (!slaveWatchZKList.isEmpty()) {
                Thread.sleep(10L);
                Iterator<String> it = slaveWatchZKList.iterator();
                while (it.hasNext()) {
                    this.slaveClusterSwitchTrackers.add(new ClusterSwitchTracker(ClusterSwitchUtil.createConfWithConnectKey(it.next(), configuration), this, countDownLatch));
                }
            }
            countDownLatch.await();
        } catch (InterruptedException e3) {
            LOG.error("Interrupted during init", e3);
            Thread.currentThread().interrupt();
            throw new IOException(e3);
        }
    }

    private boolean isNeedToSwitch(SwitchCommand switchCommand) {
        return this.currentSwitchCommand == null || switchCommand.getTs() > this.currentSwitchCommand.getTs();
    }

    private void onChangeCluster(Configuration configuration, Configuration configuration2, SwitchCommand switchCommand, String str) throws IOException {
        onChangeCluster(configuration, configuration2);
        this.currentSwitchCommand = switchCommand;
        this.currentClusterKey = str;
        LOG.info("Switch successfully to " + switchCommand);
        String zkFromConnectInfo = ConnectInfoUtil.getZkFromConnectInfo(this.connectInfo, str);
        LOG.info("Create link node to " + zkFromConnectInfo);
        try {
            startThreadForCreateLinkNode(zkFromConnectInfo, this.conf.getInt(ConnectInfoUtil.LINK_RETRY_COUNT, 10));
        } catch (Exception e) {
            LOG.warn("Create link node failed : " + e);
        }
        LOG.info("Create link node to " + zkFromConnectInfo + " success");
    }

    private void switchToActiveWithNoCommand() throws IOException {
        String activeConnectKey = this.connectInfo.getActiveConnectKey();
        SwitchCommand switchCommand = SwitchCommand.NOCOMMAND;
        Configuration createConfWithConnectKey = ClusterSwitchUtil.createConfWithConnectKey(activeConnectKey, this.originalConf);
        Configuration configuration = null;
        if (this.dualServiceEnable) {
            configuration = ClusterSwitchUtil.createConfWithConnectKey(this.connectInfo.getStandbyConnectKey(), this.originalConf);
        }
        onChangeCluster(createConfWithConnectKey, configuration, switchCommand, activeConnectKey);
    }

    @Override // com.alibaba.hbase.haclient.Switchable
    public synchronized void onNodeChange(byte[] bArr) {
        String clusterKey;
        try {
            SwitchCommand switchCommand = ClusterSwitchUtil.toSwitchCommand(bArr);
            LOG.info("Received " + switchCommand + ", current=" + this.currentSwitchCommand);
            if (!isNeedToSwitch(switchCommand)) {
                LOG.info("Ignore switch command since current command is newer");
                return;
            }
            if (switchCommand.isSwitchBackToMaster()) {
                String activeConnectKey = this.connectInfo.getActiveConnectKey();
                if (this.currentClusterKey != null && this.currentClusterKey.equals(activeConnectKey)) {
                    LOG.info("Current is connect to active, skip switch");
                    return;
                } else {
                    LOG.info("From current " + this.currentClusterKey + ", Switch back to master cluster: " + activeConnectKey);
                    switchToActiveWithNoCommand();
                    return;
                }
            }
            if (!ClusterSwitchUtil.isValidConnectKey(switchCommand.getClusterKey())) {
                if (this.currentSwitchCommand != null) {
                    LOG.info("Ignore switch command since invalid cluster key");
                    return;
                } else {
                    LOG.info("Invalid cluster key; From current " + this.currentClusterKey + ", switch back to master cluster: " + this.connectInfo.getActiveConnectKey());
                    switchToActiveWithNoCommand();
                    return;
                }
            }
            try {
                clusterKey = switchCommand.getClusterKey();
            } catch (IOException e) {
                if (this.currentSwitchCommand != null) {
                    throw e;
                }
                LOG.error("Failed to switch to " + switchCommand + ", switch back to master cluster " + this.connectInfo.getActiveConnectKey() + " since no cluster is connected ever.", e);
                switchToActiveWithNoCommand();
            }
            if (!ConnectInfoUtil.isVaildTargetConnectKey(this.connectInfo, clusterKey)) {
                LOG.warn("Invalid target cluster key " + clusterKey + ", is not active " + this.connectInfo.getConnectConf().getActive() + " or standby " + this.connectInfo.getConnectConf().getStandby());
                if (this.currentSwitchCommand == null) {
                    LOG.info("Current SwitchCommand is null, switch to active");
                    switchToActiveWithNoCommand();
                    return;
                }
                return;
            }
            if (clusterKey.equals(Boolean.valueOf(clusterKey.equals(this.currentClusterKey)))) {
                LOG.info("Target connect key " + clusterKey + " is same with current cluster key " + this.currentClusterKey + ", skip switch");
                return;
            }
            String str = this.currentClusterKey;
            Configuration createConfWithConnectKey = ClusterSwitchUtil.createConfWithConnectKey(clusterKey, this.originalConf);
            Configuration configuration = null;
            if (this.dualServiceEnable) {
                configuration = clusterKey.equals(this.connectInfo.getActiveConnectKey()) ? ClusterSwitchUtil.createConfWithConnectKey(this.connectInfo.getStandbyConnectKey(), this.originalConf) : ClusterSwitchUtil.createConfWithConnectKey(this.connectInfo.getActiveConnectKey(), this.originalConf);
            }
            onChangeCluster(createConfWithConnectKey, configuration, switchCommand, clusterKey);
            LOG.info("From current " + str + ",Switch to cluster: " + switchCommand.getClusterKey());
        } catch (Throwable th) {
            LOG.error("Error happened when switching cluster, switch not finished", th);
        }
    }

    private void startThreadForCreateLinkNode(final String str, final int i) {
        if (this.linkZooKeeper != null) {
            try {
                this.linkZooKeeper.close();
                this.linkZooKeeper = null;
            } catch (Exception e) {
                LOG.warn("old link zookeeper close failed : " + e);
            }
        }
        Thread thread = new Thread() { // from class: com.alibaba.hbase.haclient.AliHBaseMultiClusterConnectionImpl.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                if (AliHBaseMultiClusterConnectionImpl.this.linkZookeeperLock.tryLock()) {
                    int i2 = 0;
                    while (true) {
                        try {
                            if (i2 >= i) {
                                break;
                            }
                            i2++;
                            try {
                                if (AliHBaseMultiClusterConnectionImpl.this.linkZooKeeper == null) {
                                    Configuration createConfWithConnectKey = ClusterSwitchUtil.createConfWithConnectKey(str, AliHBaseMultiClusterConnectionImpl.this.conf);
                                    AliHBaseMultiClusterConnectionImpl.this.linkZooKeeper = new ZKWatcher(createConfWithConnectKey, "Link", (Abortable) null, false);
                                    String baseNode = ClusterSwitchUtil.getBaseNode(createConfWithConnectKey.get(AliHBaseConstants.HACLIENT_BASE_NODE, AliHBaseConstants.HACLIENT_BASE_NODE_DEFAULT), createConfWithConnectKey.get(AliHBaseConstants.HACLIENT_CLUSTER_ID));
                                    String joinZNode = ZNodePaths.joinZNode(baseNode, createConfWithConnectKey.get(ClusterSwitchUtil.ZOOKEEPER_LINK_NODE, ClusterSwitchUtil.ZOOKEEPER_LINK_NODE_DEFAULT));
                                    ZKUtil.createNodeIfNotExistsNoWatch(AliHBaseMultiClusterConnectionImpl.this.linkZooKeeper, baseNode, (byte[]) null, CreateMode.PERSISTENT);
                                    ZKUtil.createNodeIfNotExistsNoWatch(AliHBaseMultiClusterConnectionImpl.this.linkZooKeeper, joinZNode, (byte[]) null, CreateMode.PERSISTENT);
                                    if (!ZKUtil.createEphemeralNodeAndWatch(AliHBaseMultiClusterConnectionImpl.this.linkZooKeeper, ZNodePaths.joinZNode(joinZNode, StrUtil.generateRandomString(5)), (byte[]) null)) {
                                        String str2 = "create link node failed with " + i2;
                                        AliHBaseMultiClusterConnectionImpl.LOG.warn(str2);
                                        throw new Exception(str2);
                                    }
                                }
                                AliHBaseMultiClusterConnectionImpl.LOG.info("Successfully set link node on zk '" + AliHBaseMultiClusterConnectionImpl.this.linkZooKeeper.getQuorum() + "'");
                            } catch (Throwable th) {
                                AliHBaseMultiClusterConnectionImpl.LOG.warn("Failed set link node on zk '" + AliHBaseMultiClusterConnectionImpl.this.linkZooKeeper.getQuorum() + "', will retry again...", th);
                                Threads.sleep(LindormClientConstants.RPC_SKIP_ERROR_LOCATION_EXPIRE_TIME_DEFAULT);
                            }
                        } finally {
                            AliHBaseMultiClusterConnectionImpl.this.linkZookeeperLock.unlock();
                        }
                    }
                }
            }
        };
        thread.setDaemon(true);
        thread.setName("LinkNode-" + System.currentTimeMillis());
        thread.start();
    }

    private void startThreadForUpdateDualConfig(final String str, final Configuration configuration) {
        Thread thread = new Thread() { // from class: com.alibaba.hbase.haclient.AliHBaseMultiClusterConnectionImpl.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                List<String> dualTablesFromXML;
                try {
                    dualTablesFromXML = DualTableUtil.getDualTablesFromZK(str, configuration);
                    DualTableUtil.flushDualTables(dualTablesFromXML, configuration);
                } catch (Exception e) {
                    AliHBaseMultiClusterConnectionImpl.LOG.warn("Get dual tables from endpoint failed, " + e);
                    dualTablesFromXML = DualTableUtil.getDualTablesFromXML(AliHBaseMultiClusterConnectionImpl.this.conf);
                }
                if (dualTablesFromXML == null || dualTablesFromXML.isEmpty()) {
                    return;
                }
                for (String str2 : dualTablesFromXML) {
                    if (AliHBaseMultiClusterConnectionImpl.this.conf != null) {
                        AliHBaseMultiClusterConnectionImpl.this.conf.setBoolean(DualExecutor.createTableConfKey(str2, AliHBaseConstants.DUALSERVICE_ENABLE), true);
                    }
                    if (AliHBaseMultiClusterConnectionImpl.this.originalConf != null) {
                        AliHBaseMultiClusterConnectionImpl.this.originalConf.setBoolean(DualExecutor.createTableConfKey(str2, AliHBaseConstants.DUALSERVICE_ENABLE), true);
                    }
                }
                AliHBaseMultiClusterConnectionImpl.LOG.info("Successfully update dual table conf for " + StringUtils.join(dualTablesFromXML, ","));
            }
        };
        thread.setDaemon(true);
        thread.setName("UpdateDualTable-" + System.currentTimeMillis());
        thread.start();
    }

    @Override // org.apache.hadoop.hbase.client.AliHBaseMultiClusterConnection, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.closed) {
            return;
        }
        super.close();
        if (this.linkZooKeeper != null) {
            try {
                this.linkZooKeeper.close();
            } catch (Exception e) {
                LOG.warn("link zookeeper close failed : " + e);
            }
        }
    }

    @Override // org.apache.hadoop.hbase.client.AliHBaseMultiClusterConnection
    public Configuration getOriginalConf() {
        return this.originalConf;
    }
}
