/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.spark.execution;

import com.facebook.airlift.http.client.HttpClient;
import com.facebook.airlift.http.client.HttpStatus;
import com.facebook.airlift.http.client.HttpUriBuilder;
import com.facebook.airlift.json.JsonCodec;
import com.facebook.airlift.log.Logger;
import com.facebook.presto.Session;
import com.facebook.presto.SystemSessionProperties;
import com.facebook.presto.client.ServerInfo;
import com.facebook.presto.execution.TaskManagerConfig;
import com.facebook.presto.server.RequestErrorTracker;
import com.facebook.presto.server.smile.BaseResponse;
import com.facebook.presto.spark.execution.http.PrestoSparkHttpServerClient;
import com.facebook.presto.spark.execution.property.NativeExecutionSystemConfig;
import com.facebook.presto.spark.execution.property.WorkerProperty;
import com.facebook.presto.spi.ErrorCodeSupplier;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.StandardErrorCode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.units.Duration;
import java.io.File;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.URI;
import java.nio.file.Paths;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nullable;
import org.apache.spark.SparkFiles;

public class NativeExecutionProcess
implements AutoCloseable {
    private static final Logger log = Logger.get(NativeExecutionProcess.class);
    private static final String NATIVE_EXECUTION_TASK_ERROR_MESSAGE = "Encountered too many errors talking to native process. The process may have crashed or be under too much load.";
    private static final String WORKER_CONFIG_FILE = "/config.properties";
    private static final String WORKER_NODE_CONFIG_FILE = "/node.properties";
    private static final String WORKER_CONNECTOR_CONFIG_FILE = "/catalog/";
    private final Session session;
    private final PrestoSparkHttpServerClient serverClient;
    private final URI location;
    private final int port = NativeExecutionProcess.getAvailableTcpPort();
    private final TaskManagerConfig taskManagerConfig;
    private final ScheduledExecutorService errorRetryScheduledExecutor;
    private final RequestErrorTracker errorTracker;
    private final HttpClient httpClient;
    private final WorkerProperty<?, ?, ?> workerProperty;
    private Process process;

    public NativeExecutionProcess(Session session, URI uri, HttpClient httpClient, ScheduledExecutorService errorRetryScheduledExecutor, JsonCodec<ServerInfo> serverInfoCodec, Duration maxErrorDuration, TaskManagerConfig taskManagerConfig, WorkerProperty<?, ?, ?> workerProperty) throws IOException {
        this.session = Objects.requireNonNull(session, "session is null");
        this.location = NativeExecutionProcess.getBaseUriWithPort(Objects.requireNonNull(uri, "uri is null"), this.port);
        this.httpClient = Objects.requireNonNull(httpClient, "httpClient is null");
        this.serverClient = new PrestoSparkHttpServerClient(this.httpClient, this.location, serverInfoCodec);
        this.taskManagerConfig = Objects.requireNonNull(taskManagerConfig, "taskManagerConfig is null");
        this.errorRetryScheduledExecutor = Objects.requireNonNull(errorRetryScheduledExecutor, "errorRetryScheduledExecutor is null");
        this.errorTracker = new RequestErrorTracker((Object)"NativeExecution", uri, (ErrorCodeSupplier)StandardErrorCode.NATIVE_EXECUTION_TASK_ERROR, NATIVE_EXECUTION_TASK_ERROR_MESSAGE, maxErrorDuration, errorRetryScheduledExecutor, "getting native process status");
        this.workerProperty = Objects.requireNonNull(workerProperty, "workerProperty is null");
    }

    public void start() throws ExecutionException, InterruptedException, IOException {
        String executablePath = this.getProcessWorkingPath(SystemSessionProperties.getNativeExecutionExecutablePath((Session)this.session));
        String configPath = Paths.get(this.getProcessWorkingPath("./"), String.valueOf(this.port)).toAbsolutePath().toString();
        this.populateConfigurationFiles(configPath);
        ProcessBuilder processBuilder = new ProcessBuilder(executablePath, "--v", "1", "--etc_dir", configPath);
        processBuilder.redirectOutput(ProcessBuilder.Redirect.INHERIT);
        processBuilder.redirectError(ProcessBuilder.Redirect.INHERIT);
        try {
            log.info("Launching %s \nConfig path: %s\n", new Object[]{executablePath, configPath});
            this.process = processBuilder.start();
        }
        catch (IOException e) {
            log.error(String.format("Cannot start %s, error message: %s", processBuilder.command(), e.getMessage()));
            throw new PrestoException((ErrorCodeSupplier)StandardErrorCode.NATIVE_EXECUTION_PROCESS_LAUNCH_ERROR, String.format("Cannot start %s", processBuilder.command()), (Throwable)e);
        }
        this.getServerInfoWithRetry().get();
    }

    @VisibleForTesting
    public SettableFuture<ServerInfo> getServerInfoWithRetry() {
        SettableFuture future = SettableFuture.create();
        this.doGetServerInfo((SettableFuture<ServerInfo>)future);
        return future;
    }

    @Override
    public void close() {
        if (this.process != null && this.process.isAlive()) {
            this.process.destroy();
        }
    }

    public int getPort() {
        return this.port;
    }

    private static URI getBaseUriWithPort(URI baseUri, int port) {
        return HttpUriBuilder.uriBuilderFrom((URI)baseUri).port(port).build();
    }

    private static int getAvailableTcpPort() throws IOException {
        ServerSocket socket = new ServerSocket(0);
        int port = socket.getLocalPort();
        socket.close();
        return port;
    }

    private String getNativeExecutionCatalogName(Session session) {
        Preconditions.checkArgument((boolean)session.getCatalog().isPresent(), (Object)"Catalog isn't set in the session.");
        return (String)session.getCatalog().get();
    }

    private void populateConfigurationFiles(String configBasePath) throws IOException {
        ((NativeExecutionSystemConfig)this.workerProperty.getSystemConfig()).setHttpServerPort(this.port);
        this.workerProperty.populateAllProperties(Paths.get(configBasePath, WORKER_CONFIG_FILE), Paths.get(configBasePath, WORKER_NODE_CONFIG_FILE), Paths.get(configBasePath, String.format("%s%s.properties", WORKER_CONNECTOR_CONFIG_FILE, this.getNativeExecutionCatalogName(this.session))));
    }

    private void doGetServerInfo(final SettableFuture<ServerInfo> future) {
        Futures.addCallback(this.serverClient.getServerInfo(), (FutureCallback)new FutureCallback<BaseResponse<ServerInfo>>(){

            public void onSuccess(@Nullable BaseResponse<ServerInfo> response) {
                if (response.getStatusCode() != HttpStatus.OK.code()) {
                    throw new PrestoException((ErrorCodeSupplier)StandardErrorCode.GENERIC_INTERNAL_ERROR, "Request failed with HTTP status " + response.getStatusCode());
                }
                future.set(response.getValue());
            }

            public void onFailure(Throwable failedReason) {
                if (failedReason instanceof RejectedExecutionException && NativeExecutionProcess.this.httpClient.isClosed()) {
                    log.error(String.format("Unable to start the native process. HTTP client is closed. Reason: %s", failedReason.getMessage()));
                    future.setException(failedReason);
                    return;
                }
                try {
                    NativeExecutionProcess.this.errorTracker.requestFailed(failedReason);
                }
                catch (PrestoException e) {
                    future.setException((Throwable)e);
                    return;
                }
                ListenableFuture errorRateLimit = NativeExecutionProcess.this.errorTracker.acquireRequestPermit();
                if (errorRateLimit.isDone()) {
                    NativeExecutionProcess.this.doGetServerInfo((SettableFuture<ServerInfo>)future);
                } else {
                    errorRateLimit.addListener(() -> NativeExecutionProcess.this.doGetServerInfo((SettableFuture<ServerInfo>)future), (Executor)NativeExecutionProcess.this.errorRetryScheduledExecutor);
                }
            }
        }, (Executor)MoreExecutors.directExecutor());
    }

    private String getProcessWorkingPath(String path) {
        File absolutePath = new File(path);
        File workingDir = new File(SparkFiles.getRootDirectory());
        if (!absolutePath.isAbsolute()) {
            absolutePath = new File(workingDir, path);
        }
        if (!absolutePath.exists()) {
            log.error(String.format("File doesn't exist %s", absolutePath.getAbsolutePath()));
            throw new PrestoException((ErrorCodeSupplier)StandardErrorCode.NATIVE_EXECUTION_BINARY_NOT_EXIST, String.format("File doesn't exist %s", absolutePath.getAbsolutePath()));
        }
        return absolutePath.getAbsolutePath();
    }
}

