/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.spark.execution;

import com.facebook.airlift.http.client.HttpClient;
import com.facebook.airlift.http.client.HttpStatus;
import com.facebook.airlift.http.client.HttpUriBuilder;
import com.facebook.airlift.json.JsonCodec;
import com.facebook.airlift.log.Logger;
import com.facebook.presto.Session;
import com.facebook.presto.SystemSessionProperties;
import com.facebook.presto.client.ServerInfo;
import com.facebook.presto.execution.TaskManagerConfig;
import com.facebook.presto.server.RequestErrorTracker;
import com.facebook.presto.server.smile.BaseResponse;
import com.facebook.presto.spark.classloader_interface.PrestoSparkFatalException;
import com.facebook.presto.spark.execution.http.PrestoSparkHttpServerClient;
import com.facebook.presto.spark.execution.property.NativeExecutionSystemConfig;
import com.facebook.presto.spark.execution.property.WorkerProperty;
import com.facebook.presto.spi.ErrorCodeSupplier;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.StandardErrorCode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.units.Duration;
import java.io.File;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.URI;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.spark.SparkEnv$;
import org.apache.spark.SparkFiles;

public class NativeExecutionProcess
implements AutoCloseable {
    private static final Logger log = Logger.get(NativeExecutionProcess.class);
    private static final String NATIVE_EXECUTION_TASK_ERROR_MESSAGE = "Native process launch failed with multiple retries.";
    private static final String WORKER_CONFIG_FILE = "/config.properties";
    private static final String WORKER_NODE_CONFIG_FILE = "/node.properties";
    private static final String WORKER_VELOX_CONFIG_FILE = "/velox.properties";
    private static final String WORKER_CONNECTOR_CONFIG_FILE = "/catalog/";
    private final Session session;
    private final PrestoSparkHttpServerClient serverClient;
    private final URI location;
    private final int port = NativeExecutionProcess.getAvailableTcpPort();
    private final TaskManagerConfig taskManagerConfig;
    private final ScheduledExecutorService errorRetryScheduledExecutor;
    private final RequestErrorTracker errorTracker;
    private final HttpClient httpClient;
    private final WorkerProperty<?, ?, ?, ?> workerProperty;
    private Process process;

    public NativeExecutionProcess(Session session, URI uri, HttpClient httpClient, ScheduledExecutorService errorRetryScheduledExecutor, JsonCodec<ServerInfo> serverInfoCodec, Duration maxErrorDuration, TaskManagerConfig taskManagerConfig, WorkerProperty<?, ?, ?, ?> workerProperty) throws IOException {
        this.session = Objects.requireNonNull(session, "session is null");
        this.location = NativeExecutionProcess.getBaseUriWithPort(Objects.requireNonNull(uri, "uri is null"), this.getPort());
        this.httpClient = Objects.requireNonNull(httpClient, "httpClient is null");
        this.serverClient = new PrestoSparkHttpServerClient(this.httpClient, this.location, serverInfoCodec);
        this.taskManagerConfig = Objects.requireNonNull(taskManagerConfig, "taskManagerConfig is null");
        this.errorRetryScheduledExecutor = Objects.requireNonNull(errorRetryScheduledExecutor, "errorRetryScheduledExecutor is null");
        this.errorTracker = new RequestErrorTracker((Object)"NativeExecution", uri, (ErrorCodeSupplier)StandardErrorCode.NATIVE_EXECUTION_TASK_ERROR, NATIVE_EXECUTION_TASK_ERROR_MESSAGE, maxErrorDuration, errorRetryScheduledExecutor, "getting native process status");
        this.workerProperty = Objects.requireNonNull(workerProperty, "workerProperty is null");
    }

    public synchronized void start() throws ExecutionException, InterruptedException, IOException {
        if (this.process != null && this.process.isAlive()) {
            return;
        }
        ProcessBuilder processBuilder = new ProcessBuilder(this.getLaunchCommand());
        processBuilder.redirectOutput(ProcessBuilder.Redirect.INHERIT);
        processBuilder.redirectError(ProcessBuilder.Redirect.INHERIT);
        try {
            this.process = processBuilder.start();
        }
        catch (IOException e) {
            log.error(String.format("Cannot start %s, error message: %s", processBuilder.command(), e.getMessage()));
            throw new PrestoException((ErrorCodeSupplier)StandardErrorCode.NATIVE_EXECUTION_PROCESS_LAUNCH_ERROR, String.format("Cannot start %s", processBuilder.command()), (Throwable)e);
        }
        try {
            this.getServerInfoWithRetry().get();
        }
        catch (Throwable t) {
            this.close();
            throw new PrestoSparkFatalException(t.getMessage(), t.getCause());
        }
    }

    @VisibleForTesting
    public SettableFuture<ServerInfo> getServerInfoWithRetry() {
        SettableFuture future = SettableFuture.create();
        this.doGetServerInfo((SettableFuture<ServerInfo>)future);
        return future;
    }

    @Override
    public void close() {
        if (this.process != null && this.process.isAlive()) {
            this.process.destroy();
            try {
                this.process.waitFor(1L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            finally {
                if (this.process.isAlive()) {
                    log.warn("Graceful shutdown of native execution process failed. Force killing it.");
                    this.process.destroyForcibly();
                }
            }
        }
        this.process = null;
    }

    public boolean isAlive() {
        return this.process != null && this.process.isAlive();
    }

    public int getPort() {
        return this.port;
    }

    private static URI getBaseUriWithPort(URI baseUri, int port) {
        return HttpUriBuilder.uriBuilderFrom((URI)baseUri).port(port).build();
    }

    private static int getAvailableTcpPort() throws IOException {
        ServerSocket socket = new ServerSocket(0);
        int port = socket.getLocalPort();
        socket.close();
        return port;
    }

    private String getNativeExecutionCatalogName(Session session) {
        Preconditions.checkArgument((boolean)session.getCatalog().isPresent(), (Object)"Catalog isn't set in the session.");
        return (String)session.getCatalog().get();
    }

    private void populateConfigurationFiles(String configBasePath) throws IOException {
        ((NativeExecutionSystemConfig)this.workerProperty.getSystemConfig()).setHttpServerPort(this.port);
        this.workerProperty.populateAllProperties(Paths.get(configBasePath, WORKER_VELOX_CONFIG_FILE), Paths.get(configBasePath, WORKER_CONFIG_FILE), Paths.get(configBasePath, WORKER_NODE_CONFIG_FILE), Paths.get(configBasePath, String.format("%s%s.properties", WORKER_CONNECTOR_CONFIG_FILE, this.getNativeExecutionCatalogName(this.session))));
    }

    private void doGetServerInfo(final SettableFuture<ServerInfo> future) {
        Futures.addCallback(this.serverClient.getServerInfo(), (FutureCallback)new FutureCallback<BaseResponse<ServerInfo>>(){

            public void onSuccess(@Nullable BaseResponse<ServerInfo> response) {
                if (response.getStatusCode() != HttpStatus.OK.code()) {
                    throw new PrestoException((ErrorCodeSupplier)StandardErrorCode.GENERIC_INTERNAL_ERROR, "Request failed with HTTP status " + response.getStatusCode());
                }
                future.set(response.getValue());
            }

            public void onFailure(Throwable failedReason) {
                if (failedReason instanceof RejectedExecutionException && NativeExecutionProcess.this.httpClient.isClosed()) {
                    log.error(String.format("Unable to start the native process. HTTP client is closed. Reason: %s", failedReason.getMessage()));
                    future.setException(failedReason);
                    return;
                }
                try {
                    NativeExecutionProcess.this.errorTracker.requestFailed(failedReason);
                }
                catch (PrestoException e) {
                    future.setException((Throwable)e);
                    return;
                }
                ListenableFuture errorRateLimit = NativeExecutionProcess.this.errorTracker.acquireRequestPermit();
                if (errorRateLimit.isDone()) {
                    NativeExecutionProcess.this.doGetServerInfo((SettableFuture<ServerInfo>)future);
                } else {
                    errorRateLimit.addListener(() -> NativeExecutionProcess.this.doGetServerInfo((SettableFuture<ServerInfo>)future), (Executor)NativeExecutionProcess.this.errorRetryScheduledExecutor);
                }
            }
        }, (Executor)MoreExecutors.directExecutor());
    }

    private String getProcessWorkingPath(String path) {
        File absolutePath = new File(path);
        String rootDirectory = SparkEnv$.MODULE$.get() != null ? SparkFiles.getRootDirectory() : ".";
        File workingDir = new File(rootDirectory);
        if (!absolutePath.isAbsolute()) {
            absolutePath = new File(workingDir, path);
        }
        if (!absolutePath.exists()) {
            log.error(String.format("File doesn't exist %s", absolutePath.getAbsolutePath()));
            throw new PrestoException((ErrorCodeSupplier)StandardErrorCode.NATIVE_EXECUTION_BINARY_NOT_EXIST, String.format("File doesn't exist %s", absolutePath.getAbsolutePath()));
        }
        return absolutePath.getAbsolutePath();
    }

    private List<String> getLaunchCommand() throws IOException {
        String executablePath = this.getProcessWorkingPath(SystemSessionProperties.getNativeExecutionExecutablePath((Session)this.session));
        String programArgs = SystemSessionProperties.getNativeExecutionProgramArguments((Session)this.session);
        String configPath = Paths.get(this.getProcessWorkingPath("./"), String.valueOf(this.port)).toAbsolutePath().toString();
        ImmutableList.Builder command = ImmutableList.builder();
        List<String> argsList = Arrays.asList(programArgs.split("\\s+"));
        boolean etcDirSet = false;
        for (int i = 0; i < argsList.size(); ++i) {
            String arg = argsList.get(i);
            if (!arg.equals("--etc_dir")) continue;
            etcDirSet = true;
            configPath = argsList.get(i + 1);
            break;
        }
        command.add((Object)executablePath).addAll(argsList);
        if (!etcDirSet) {
            command.add((Object)"--etc_dir").add((Object)configPath);
            this.populateConfigurationFiles(configPath);
        }
        ImmutableList commandList = command.build();
        log.info("Launching native process using command: %s %s", new Object[]{executablePath, String.join((CharSequence)" ", (Iterable<? extends CharSequence>)commandList)});
        return commandList;
    }
}

