/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hive.service.server;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.io.Closeable;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.api.BackgroundPathable;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.utils.PathUtils;
import org.apache.curator.utils.ZKPaths;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.registry.impl.ZookeeperUtils;
import org.apache.hive.service.AbstractService;
import org.apache.hive.service.ServiceException;
import org.apache.hive.service.cli.operation.OperationManager;
import org.apache.hive.service.server.HiveServer2;
import org.apache.hive.service.server.KillQueryImpl;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KillQueryZookeeperManager
extends AbstractService {
    private static final Logger LOG = LoggerFactory.getLogger(KillQueryZookeeperManager.class);
    private static final String SASL_LOGIN_CONTEXT_NAME = "KillQueryZooKeeperClient";
    public static final int MAX_WAIT_ON_CONFIRMATION_SECONDS = 30;
    public static final int MAX_WAIT_ON_KILL_SECONDS = 180;
    private CuratorFramework zooKeeperClient;
    private String zkPrincipal;
    private String zkKeytab;
    private String zkNameSpace;
    private final KillQueryImpl localKillQueryImpl;
    private final HiveServer2 hiveServer2;
    private HiveConf conf;
    private PathChildrenCache killQueryListener = null;

    public KillQueryZookeeperManager(OperationManager operationManager, HiveServer2 hiveServer2) {
        super(KillQueryZookeeperManager.class.getSimpleName());
        this.hiveServer2 = hiveServer2;
        this.localKillQueryImpl = new KillQueryImpl(operationManager, this);
    }

    @Override
    public synchronized void init(HiveConf conf) {
        this.conf = conf;
        this.zkNameSpace = HiveConf.getVar((Configuration)conf, (HiveConf.ConfVars)HiveConf.ConfVars.HIVE_ZOOKEEPER_KILLQUERY_NAMESPACE);
        Preconditions.checkArgument((!StringUtils.isBlank((CharSequence)this.zkNameSpace) ? 1 : 0) != 0, (Object)(HiveConf.ConfVars.HIVE_ZOOKEEPER_KILLQUERY_NAMESPACE.varname + " cannot be null or empty"));
        this.zkPrincipal = HiveConf.getVar((Configuration)conf, (HiveConf.ConfVars)HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL);
        this.zkKeytab = HiveConf.getVar((Configuration)conf, (HiveConf.ConfVars)HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB);
        this.zooKeeperClient = conf.getZKConfig().getNewZookeeperClient(this.getACLProviderForZKPath("/" + this.zkNameSpace));
        this.zooKeeperClient.getConnectionStateListenable().addListener((Object)new ZkConnectionStateListener());
        super.init(conf);
    }

    @Override
    public synchronized void start() {
        super.start();
        if (this.zooKeeperClient == null) {
            throw new ServiceException("Failed start zookeeperClient in KillQueryZookeeperManager");
        }
        try {
            block6: {
                ZookeeperUtils.setupZookeeperAuth((Configuration)this.getHiveConf(), (String)SASL_LOGIN_CONTEXT_NAME, (String)this.zkPrincipal, (String)this.zkKeytab);
                this.zooKeeperClient.start();
                try {
                    ((ACLBackgroundPathAndBytesable)this.zooKeeperClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)).forPath("/" + this.zkNameSpace);
                    if (ZookeeperUtils.isKerberosEnabled((Configuration)this.conf)) {
                        ((BackgroundPathable)this.zooKeeperClient.setACL().withACL(KillQueryZookeeperManager.createSecureAcls())).forPath("/" + this.zkNameSpace);
                    }
                    LOG.info("Created the root namespace: " + this.zkNameSpace + " on ZooKeeper");
                }
                catch (KeeperException e) {
                    if (e.code() == KeeperException.Code.NODEEXISTS) break block6;
                    LOG.error("Unable to create namespace: " + this.zkNameSpace + " on ZooKeeper", (Throwable)e);
                    throw e;
                }
            }
            this.killQueryListener = new PathChildrenCache(this.zooKeeperClient, "/" + this.zkNameSpace, false);
            this.killQueryListener.start(PathChildrenCache.StartMode.NORMAL);
            this.startListeningForQueries();
            CloseableUtils.class.getName();
        }
        catch (Exception e) {
            throw new RuntimeException("Failed start zookeeperClient in KillQueryZookeeperManager", e);
        }
        LOG.info("KillQueryZookeeperManager service started.");
    }

    private ACLProvider getACLProviderForZKPath(final String zkPath) {
        final boolean isSecure = ZookeeperUtils.isKerberosEnabled((Configuration)this.conf);
        return new ACLProvider(){

            public List<ACL> getDefaultAcl() {
                LOG.warn("getDefaultAcl was called");
                return Lists.newArrayList((Iterable)ZooDefs.Ids.OPEN_ACL_UNSAFE);
            }

            public List<ACL> getAclForPath(String path) {
                if (!isSecure || path == null || !path.contains(zkPath)) {
                    return Lists.newArrayList((Iterable)ZooDefs.Ids.OPEN_ACL_UNSAFE);
                }
                return KillQueryZookeeperManager.createSecureAcls();
            }
        };
    }

    private static List<ACL> createSecureAcls() {
        ArrayList<ACL> nodeAcls = new ArrayList<ACL>(ZooDefs.Ids.READ_ACL_UNSAFE);
        nodeAcls.addAll(ZooDefs.Ids.CREATOR_ALL_ACL);
        return nodeAcls;
    }

    private void startListeningForQueries() {
        PathChildrenCacheListener listener = (client, pathChildrenCacheEvent) -> {
            if (pathChildrenCacheEvent.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) {
                KillQueryZookeeperBarrier barrier = new KillQueryZookeeperBarrier(this.zooKeeperClient, "/" + this.zkNameSpace, ZKPaths.getNodeFromPath((String)pathChildrenCacheEvent.getData().getPath()));
                Optional<KillQueryZookeeperData> data = barrier.getKillQueryData();
                if (!data.isPresent()) {
                    return;
                }
                KillQueryZookeeperData killQuery = data.get();
                LOG.debug("Kill query request with id {}", (Object)killQuery.getQueryId());
                if (this.getServerHost().equals(killQuery.getRequestingServer())) {
                    return;
                }
                if (this.localKillQueryImpl.isLocalQuery(killQuery.getQueryId())) {
                    LOG.info("Killing query with id {}", (Object)killQuery.getQueryId());
                    barrier.confirmProgress(this.getServerHost());
                    try {
                        this.localKillQueryImpl.killLocalQuery(killQuery.getQueryId(), this.conf, killQuery.getDoAs(), killQuery.isDoAsAdmin());
                        barrier.confirmDone(this.getServerHost());
                    }
                    catch (Exception e) {
                        LOG.error("Unable to kill local query", (Throwable)e);
                        barrier.confirmFailed(this.getServerHost());
                    }
                } else {
                    LOG.debug("Confirm unknown kill query request with id {}", (Object)killQuery.getQueryId());
                    barrier.confirmNo(this.getServerHost());
                }
            }
        };
        LOG.info("Start to listen for kill query requests.");
        this.killQueryListener.getListenable().addListener((Object)listener);
    }

    @Override
    public synchronized void stop() {
        super.stop();
        LOG.info("Stopping KillQueryZookeeperManager service.");
        CloseableUtils.closeQuietly((Closeable)this.killQueryListener);
        CloseableUtils.closeQuietly((Closeable)this.zooKeeperClient);
    }

    private List<String> getAllServerUrls() {
        ArrayList<String> serverHosts = new ArrayList<String>();
        if (this.conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY) && !this.conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_ENABLE)) {
            String zooKeeperNamespace = this.conf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
            try {
                serverHosts.addAll((Collection)this.zooKeeperClient.getChildren().forPath("/" + zooKeeperNamespace));
            }
            catch (Exception e) {
                LOG.error("Unable the get available server hosts", (Throwable)e);
            }
        }
        return serverHosts;
    }

    private String getServerHost() {
        if (this.hiveServer2 == null) {
            return "";
        }
        try {
            return KillQueryZookeeperManager.removeDelimiter(this.hiveServer2.getServerInstanceURI());
        }
        catch (Exception e) {
            LOG.error("Unable to determine the server host", (Throwable)e);
            return "";
        }
    }

    public void killQuery(String queryIdOrTag, String doAs, boolean doAsAdmin) throws IOException {
        boolean result;
        List<String> serverHosts = this.getAllServerUrls();
        if (serverHosts.size() < 2) {
            return;
        }
        KillQueryZookeeperBarrier barrier = new KillQueryZookeeperBarrier(this.zooKeeperClient, "/" + this.zkNameSpace);
        try {
            barrier.setBarrier(queryIdOrTag, this.hiveServer2.getServerInstanceURI(), doAs, doAsAdmin);
            LOG.info("Created kill query barrier in path: {} for queryId: {}", (Object)barrier.getBarrierPath(), (Object)queryIdOrTag);
            result = barrier.waitOnBarrier(serverHosts.size() - 1, 30L, 180L, TimeUnit.SECONDS);
        }
        catch (Exception e) {
            LOG.error("Unable to create Barrier on Zookeeper for KillQuery", (Throwable)e);
            throw new IOException(e);
        }
        if (!result) {
            throw new IOException("Unable to kill query on remote servers");
        }
    }

    private static String removeDelimiter(String in) {
        if (in == null) {
            return null;
        }
        return in.replaceAll(":", "");
    }

    public static class KillQueryZookeeperBarrier {
        private final CuratorFramework client;
        private final String barrierPath;
        private final Watcher watcher = new Watcher(){

            public void process(WatchedEvent event) {
                client.postSafeNotify((Object)this);
            }
        };

        public KillQueryZookeeperBarrier(CuratorFramework client, String barrierRootPath) {
            this(client, barrierRootPath, UUID.randomUUID().toString());
        }

        public KillQueryZookeeperBarrier(CuratorFramework client, String barrierRootPath, String barrierPath) {
            this.client = client;
            this.barrierPath = PathUtils.validatePath((String)(barrierRootPath + "/" + barrierPath));
        }

        public String getBarrierPath() {
            return this.barrierPath;
        }

        public synchronized void setBarrier(String queryId, String requestingServer, String doAs, boolean doAsAdmin) throws Exception {
            try {
                KillQueryZookeeperData data = new KillQueryZookeeperData(queryId, requestingServer, doAs, doAsAdmin);
                this.client.create().creatingParentContainersIfNeeded().forPath(this.barrierPath, data.toString().getBytes(StandardCharsets.UTF_8));
            }
            catch (KeeperException.NodeExistsException e) {
                throw new IllegalStateException("Barrier with this path already exists");
            }
        }

        public synchronized Optional<KillQueryZookeeperData> getKillQueryData() throws Exception {
            if (this.client.checkExists().forPath(this.barrierPath) != null) {
                byte[] data = (byte[])this.client.getData().forPath(this.barrierPath);
                return Optional.of(new KillQueryZookeeperData(new String(data, StandardCharsets.UTF_8)));
            }
            return Optional.empty();
        }

        public synchronized void confirmNo(String serverId) throws Exception {
            if (this.client.checkExists().forPath(this.barrierPath) == null) {
                throw new IllegalStateException("Barrier is not initialised");
            }
            this.client.create().forPath(this.barrierPath + "/NO:" + serverId);
        }

        public synchronized void confirmProgress(String serverId) throws Exception {
            if (this.client.checkExists().forPath(this.barrierPath) == null) {
                throw new IllegalStateException("Barrier is not initialised");
            }
            this.client.create().forPath(this.barrierPath + "/PROGRESS:" + serverId);
        }

        public synchronized void confirmDone(String serverId) throws Exception {
            if (this.client.checkExists().forPath(this.barrierPath) != null) {
                if (this.client.checkExists().forPath(this.barrierPath + "/PROGRESS:" + serverId) != null) {
                    this.client.delete().forPath(this.barrierPath + "/PROGRESS:" + serverId);
                }
            } else {
                throw new IllegalStateException("Barrier is not initialised");
            }
            this.client.create().forPath(this.barrierPath + "/DONE:" + serverId);
        }

        public synchronized void confirmFailed(String serverId) throws Exception {
            if (this.client.checkExists().forPath(this.barrierPath) != null) {
                if (this.client.checkExists().forPath(this.barrierPath + "/PROGRESS:" + serverId) != null) {
                    this.client.delete().forPath(this.barrierPath + "/PROGRESS:" + serverId);
                }
            } else {
                throw new IllegalStateException("Barrier is not initialised");
            }
            this.client.create().forPath(this.barrierPath + "/FAILED:" + serverId);
        }

        public synchronized boolean waitOnBarrier(int confirmationCount, long maxWaitOnConfirmation, long maxWaitOnKill, TimeUnit unit) throws Exception {
            long startMs = System.currentTimeMillis();
            long startKill = -1L;
            long maxWaitMs = TimeUnit.MILLISECONDS.convert(maxWaitOnConfirmation, unit);
            long maxWaitOnKillMs = TimeUnit.MILLISECONDS.convert(maxWaitOnKill, unit);
            boolean progress = false;
            boolean result = false;
            while (true) {
                long elapsed;
                long thisWaitMs;
                List children = (List)((BackgroundPathable)this.client.getChildren().usingWatcher(this.watcher)).forPath(this.barrierPath);
                boolean concluded = false;
                for (String child : children) {
                    if (child.startsWith("DONE")) {
                        result = true;
                        concluded = true;
                        break;
                    }
                    if (child.startsWith("FAILED")) {
                        concluded = true;
                        break;
                    }
                    if (!child.startsWith("PROGRESS")) continue;
                    progress = true;
                }
                if (concluded) break;
                if (progress) {
                    if (startKill < 0L) {
                        startKill = System.currentTimeMillis();
                    }
                    if ((thisWaitMs = maxWaitOnKillMs - (elapsed = System.currentTimeMillis() - startKill)) <= 0L) break;
                    this.wait(thisWaitMs);
                    continue;
                }
                if (children.size() == confirmationCount) {
                    result = false;
                    break;
                }
                elapsed = System.currentTimeMillis() - startMs;
                thisWaitMs = maxWaitMs - elapsed;
                if (thisWaitMs <= 0L) break;
                this.wait(thisWaitMs);
            }
            this.client.delete().deletingChildrenIfNeeded().forPath(this.barrierPath);
            return result;
        }
    }

    public static class KillQueryZookeeperData {
        private String queryId;
        private String requestingServer;
        private String doAs;
        private boolean doAsAdmin;

        public KillQueryZookeeperData(String queryId, String requestingServer, String doAs, boolean doAsAdmin) {
            if (!StringUtils.equals((CharSequence)queryId, (CharSequence)KillQueryZookeeperManager.removeDelimiter(queryId))) {
                throw new IllegalArgumentException("QueryId can not contain any ':' character.");
            }
            this.queryId = queryId;
            this.requestingServer = KillQueryZookeeperManager.removeDelimiter(requestingServer);
            if (!StringUtils.equals((CharSequence)doAs, (CharSequence)KillQueryZookeeperManager.removeDelimiter(doAs))) {
                throw new IllegalArgumentException("doAs can not contain any ':' character.");
            }
            this.doAs = doAs;
            this.doAsAdmin = doAsAdmin;
        }

        public KillQueryZookeeperData(String data) {
            if (data == null) {
                return;
            }
            String[] elem = data.split(":");
            this.queryId = elem[0];
            this.requestingServer = elem[1];
            this.doAs = elem[2];
            this.doAsAdmin = Boolean.parseBoolean(elem[3]);
        }

        public String toString() {
            return this.queryId + ":" + this.requestingServer + ":" + this.doAs + ":" + this.doAsAdmin;
        }

        public String getQueryId() {
            return this.queryId;
        }

        public String getRequestingServer() {
            return this.requestingServer;
        }

        public String getDoAs() {
            return this.doAs;
        }

        public boolean isDoAsAdmin() {
            return this.doAsAdmin;
        }
    }

    private static class ZkConnectionStateListener
    implements ConnectionStateListener {
        private ZkConnectionStateListener() {
        }

        public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
            LOG.info("Connection state change notification received. State: {}", (Object)connectionState);
        }
    }
}

