/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.llap.daemon.impl;

import com.google.common.io.ByteArrayDataOutput;
import com.google.common.io.ByteStreams;
import com.google.protobuf.BlockingService;
import com.google.protobuf.ByteString;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import java.io.DataOutput;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.DaemonId;
import org.apache.hadoop.hive.llap.LlapUtil;
import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
import org.apache.hadoop.hive.llap.protocol.LlapManagementProtocolPB;
import org.apache.hadoop.hive.llap.protocol.LlapProtocolBlockingPB;
import org.apache.hadoop.hive.llap.security.LlapDaemonPolicyProvider;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.AbstractService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LlapProtocolServerImpl
extends AbstractService
implements LlapProtocolBlockingPB,
LlapManagementProtocolPB {
    private static final Logger LOG = LoggerFactory.getLogger(LlapProtocolServerImpl.class);
    private final int numHandlers;
    private final ContainerRunner containerRunner;
    private final int srvPort;
    private final int mngPort;
    private RPC.Server server;
    private RPC.Server mngServer;
    private final AtomicReference<InetSocketAddress> srvAddress;
    private final AtomicReference<InetSocketAddress> mngAddress;
    private org.apache.hadoop.hive.llap.security.SecretManager zkSecretManager;
    private String clusterUser = null;
    private boolean isRestrictedToClusterUser = false;
    private final DaemonId daemonId;
    private TokenRequiresSigning isSigningRequiredConfig = TokenRequiresSigning.TRUE;

    public LlapProtocolServerImpl(int numHandlers, ContainerRunner containerRunner, AtomicReference<InetSocketAddress> srvAddress, AtomicReference<InetSocketAddress> mngAddress, int srvPort, int mngPort, DaemonId daemonId) {
        super("LlapDaemonProtocolServerImpl");
        this.numHandlers = numHandlers;
        this.containerRunner = containerRunner;
        this.srvAddress = srvAddress;
        this.srvPort = srvPort;
        this.mngAddress = mngAddress;
        this.mngPort = mngPort;
        this.daemonId = daemonId;
        LOG.info("Creating: " + LlapProtocolServerImpl.class.getSimpleName() + " with port configured to: " + srvPort);
    }

    public LlapDaemonProtocolProtos.SubmitWorkResponseProto submitWork(RpcController controller, LlapDaemonProtocolProtos.SubmitWorkRequestProto request) throws ServiceException {
        try {
            return this.containerRunner.submitWork(request);
        }
        catch (IOException e) {
            throw new ServiceException((Throwable)e);
        }
    }

    public LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto sourceStateUpdated(RpcController controller, LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto request) throws ServiceException {
        try {
            return this.containerRunner.sourceStateUpdated(request);
        }
        catch (IOException e) {
            throw new ServiceException((Throwable)e);
        }
    }

    public LlapDaemonProtocolProtos.QueryCompleteResponseProto queryComplete(RpcController controller, LlapDaemonProtocolProtos.QueryCompleteRequestProto request) throws ServiceException {
        try {
            return this.containerRunner.queryComplete(request);
        }
        catch (IOException e) {
            throw new ServiceException((Throwable)e);
        }
    }

    public LlapDaemonProtocolProtos.TerminateFragmentResponseProto terminateFragment(RpcController controller, LlapDaemonProtocolProtos.TerminateFragmentRequestProto request) throws ServiceException {
        try {
            return this.containerRunner.terminateFragment(request);
        }
        catch (IOException e) {
            throw new ServiceException((Throwable)e);
        }
    }

    public void serviceStart() {
        final Configuration conf = this.getConfig();
        this.isSigningRequiredConfig = LlapProtocolServerImpl.getSigningConfig(conf);
        final BlockingService daemonImpl = LlapDaemonProtocolProtos.LlapDaemonProtocol.newReflectiveBlockingService((LlapDaemonProtocolProtos.LlapDaemonProtocol.BlockingInterface)this);
        final BlockingService managementImpl = LlapDaemonProtocolProtos.LlapManagementProtocol.newReflectiveBlockingService((LlapDaemonProtocolProtos.LlapManagementProtocol.BlockingInterface)this);
        if (!UserGroupInformation.isSecurityEnabled()) {
            this.startProtocolServers(conf, daemonImpl, managementImpl);
            return;
        }
        try {
            this.clusterUser = UserGroupInformation.getCurrentUser().getShortUserName();
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        if (LlapProtocolServerImpl.isPermissiveManagementAcl(conf)) {
            LOG.warn("Management protocol has a '*' ACL.");
            this.isRestrictedToClusterUser = true;
        }
        String llapPrincipal = HiveConf.getVar((Configuration)conf, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_KERBEROS_PRINCIPAL);
        String llapKeytab = HiveConf.getVar((Configuration)conf, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_KERBEROS_KEYTAB_FILE);
        this.zkSecretManager = org.apache.hadoop.hive.llap.security.SecretManager.createSecretManager((Configuration)conf, (String)llapPrincipal, (String)llapKeytab, (String)this.daemonId.getClusterString());
        UserGroupInformation daemonUgi = null;
        try {
            daemonUgi = LlapUtil.loginWithKerberos((String)llapPrincipal, (String)llapKeytab);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        daemonUgi.doAs((PrivilegedAction)new PrivilegedAction<Void>(){

            @Override
            public Void run() {
                LlapProtocolServerImpl.this.startProtocolServers(conf, daemonImpl, managementImpl);
                return null;
            }
        });
    }

    private static TokenRequiresSigning getSigningConfig(Configuration conf) {
        String signSetting;
        switch (signSetting = HiveConf.getVar((Configuration)conf, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_REMOTE_TOKEN_REQUIRES_SIGNING).toLowerCase()) {
            case "true": {
                return TokenRequiresSigning.TRUE;
            }
            case "except_llap_owner": {
                return TokenRequiresSigning.EXCEPT_OWNER;
            }
            case "false": {
                return TokenRequiresSigning.FALSE;
            }
        }
        throw new RuntimeException("Invalid value for " + HiveConf.ConfVars.LLAP_REMOTE_TOKEN_REQUIRES_SIGNING.varname + ": " + signSetting);
    }

    private static boolean isPermissiveManagementAcl(Configuration conf) {
        return HiveConf.getBoolVar((Configuration)conf, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_VALIDATE_ACLS) && "*".equals(HiveConf.getVar((Configuration)conf, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_MANAGEMENT_ACL)) && "".equals(HiveConf.getVar((Configuration)conf, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_MANAGEMENT_ACL_DENY));
    }

    private void startProtocolServers(Configuration conf, BlockingService daemonImpl, BlockingService managementImpl) {
        this.server = this.startProtocolServer(this.srvPort, this.numHandlers, this.srvAddress, conf, daemonImpl, LlapProtocolBlockingPB.class, HiveConf.ConfVars.LLAP_SECURITY_ACL, HiveConf.ConfVars.LLAP_SECURITY_ACL_DENY);
        this.mngServer = this.startProtocolServer(this.mngPort, 2, this.mngAddress, conf, managementImpl, LlapManagementProtocolPB.class, HiveConf.ConfVars.LLAP_MANAGEMENT_ACL, HiveConf.ConfVars.LLAP_MANAGEMENT_ACL_DENY);
    }

    private RPC.Server startProtocolServer(int srvPort, int numHandlers, AtomicReference<InetSocketAddress> bindAddress, Configuration conf, BlockingService impl, Class<?> protocolClass, HiveConf.ConfVars ... aclVars) {
        RPC.Server server;
        InetSocketAddress addr = new InetSocketAddress(srvPort);
        try {
            server = this.createServer(protocolClass, addr, conf, numHandlers, impl, aclVars);
            server.start();
        }
        catch (IOException e) {
            LOG.error("Failed to run RPC Server on port: " + srvPort, (Throwable)e);
            throw new RuntimeException(e);
        }
        InetSocketAddress serverBindAddress = NetUtils.getConnectAddress((Server)server);
        bindAddress.set(NetUtils.createSocketAddrForHost((String)serverBindAddress.getAddress().getCanonicalHostName(), (int)serverBindAddress.getPort()));
        LOG.info("Instantiated " + protocolClass.getSimpleName() + " at " + bindAddress);
        return server;
    }

    public void serviceStop() {
        if (this.server != null) {
            this.server.stop();
        }
        if (this.mngServer != null) {
            this.mngServer.stop();
        }
    }

    @InterfaceAudience.Private
    InetSocketAddress getBindAddress() {
        return this.srvAddress.get();
    }

    @InterfaceAudience.Private
    InetSocketAddress getManagementBindAddress() {
        return this.mngAddress.get();
    }

    private RPC.Server createServer(Class<?> pbProtocol, InetSocketAddress addr, Configuration conf, int numHandlers, BlockingService blockingService, HiveConf.ConfVars ... aclVars) throws IOException {
        Configuration serverConf = conf;
        boolean isSecurityEnabled = conf.getBoolean("hadoop.security.authorization", false);
        if (isSecurityEnabled) {
            for (HiveConf.ConfVars acl : aclVars) {
                if (conf.get(acl.varname) != null) continue;
                if (serverConf == conf) {
                    serverConf = new Configuration(conf);
                }
                serverConf.set(acl.varname, HiveConf.getVar((Configuration)serverConf, (HiveConf.ConfVars)acl));
            }
        }
        RPC.setProtocolEngine((Configuration)serverConf, pbProtocol, ProtobufRpcEngine.class);
        RPC.Builder builder = new RPC.Builder(serverConf).setProtocol(pbProtocol).setInstance((Object)blockingService).setBindAddress(addr.getHostName()).setPort(addr.getPort()).setNumHandlers(numHandlers);
        if (this.zkSecretManager != null) {
            builder = builder.setSecretManager((SecretManager)this.zkSecretManager);
        }
        RPC.Server server = builder.build();
        if (isSecurityEnabled) {
            server.refreshServiceAcl(serverConf, (PolicyProvider)new LlapDaemonPolicyProvider());
        }
        return server;
    }

    public LlapDaemonProtocolProtos.GetTokenResponseProto getDelegationToken(RpcController controller, LlapDaemonProtocolProtos.GetTokenRequestProto request) throws ServiceException {
        if (this.zkSecretManager == null) {
            throw new ServiceException("Operation not supported on unsecure cluster");
        }
        UserGroupInformation callingUser = null;
        Token token = null;
        try {
            callingUser = UserGroupInformation.getCurrentUser();
            boolean isSigningRequired = this.determineIfSigningIsRequired(callingUser);
            token = this.zkSecretManager.createLlapToken(request.hasAppId() ? request.getAppId() : null, null, isSigningRequired);
        }
        catch (IOException e) {
            throw new ServiceException((Throwable)e);
        }
        if (this.isRestrictedToClusterUser && !this.clusterUser.equals(callingUser.getShortUserName())) {
            throw new ServiceException("Management protocol ACL is too permissive. The access has been automatically restricted to " + this.clusterUser + "; " + callingUser.getShortUserName() + " is denied acccess. Please set " + HiveConf.ConfVars.LLAP_VALIDATE_ACLS.varname + " to false," + " or adjust " + HiveConf.ConfVars.LLAP_MANAGEMENT_ACL.varname + " and " + HiveConf.ConfVars.LLAP_MANAGEMENT_ACL_DENY.varname + " to a more restrictive ACL.");
        }
        ByteArrayDataOutput out = ByteStreams.newDataOutput();
        try {
            token.write((DataOutput)out);
        }
        catch (IOException e) {
            throw new ServiceException((Throwable)e);
        }
        ByteString bs = ByteString.copyFrom((byte[])out.toByteArray());
        LlapDaemonProtocolProtos.GetTokenResponseProto response = LlapDaemonProtocolProtos.GetTokenResponseProto.newBuilder().setToken(bs).build();
        return response;
    }

    private boolean determineIfSigningIsRequired(UserGroupInformation callingUser) {
        switch (this.isSigningRequiredConfig) {
            case FALSE: {
                return false;
            }
            case TRUE: {
                return true;
            }
            case EXCEPT_OWNER: {
                return !this.clusterUser.equals(callingUser.getShortUserName());
            }
        }
        throw new AssertionError((Object)("Unknown value " + (Object)((Object)this.isSigningRequiredConfig)));
    }

    private static enum TokenRequiresSigning {
        TRUE,
        FALSE,
        EXCEPT_OWNER;

    }
}

