/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.server.remotetask;

import com.facebook.airlift.concurrent.SetThreadName;
import com.facebook.airlift.http.client.HttpClient;
import com.facebook.airlift.http.client.HttpUriBuilder;
import com.facebook.airlift.http.client.Request;
import com.facebook.airlift.http.client.ResponseHandler;
import com.facebook.airlift.http.client.thrift.ThriftRequestUtils;
import com.facebook.airlift.http.client.thrift.ThriftResponseHandler;
import com.facebook.airlift.json.Codec;
import com.facebook.airlift.json.JsonCodec;
import com.facebook.airlift.json.smile.SmileCodec;
import com.facebook.airlift.log.Logger;
import com.facebook.airlift.units.Duration;
import com.facebook.drift.transport.netty.codec.Protocol;
import com.facebook.presto.execution.StateMachine;
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.execution.TaskStatus;
import com.facebook.presto.server.RequestErrorTracker;
import com.facebook.presto.server.RequestHelpers;
import com.facebook.presto.server.SimpleHttpResponseCallback;
import com.facebook.presto.server.SimpleHttpResponseHandler;
import com.facebook.presto.server.remotetask.RemoteTaskStats;
import com.facebook.presto.server.smile.AdaptingJsonResponseHandler;
import com.facebook.presto.server.smile.BaseResponse;
import com.facebook.presto.server.smile.FullSmileResponseHandler;
import com.facebook.presto.server.thrift.ThriftCodecWrapper;
import com.facebook.presto.server.thrift.ThriftHttpResponseHandler;
import com.facebook.presto.spi.ErrorCodeSupplier;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.StandardErrorCode;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.net.URI;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

class ContinuousTaskStatusFetcher
implements SimpleHttpResponseCallback<TaskStatus> {
    private static final Logger log = Logger.get(ContinuousTaskStatusFetcher.class);
    private final TaskId taskId;
    private final Consumer<Throwable> onFail;
    private final StateMachine<TaskStatus> taskStatus;
    private final Codec<TaskStatus> taskStatusCodec;
    private final Duration refreshMaxWait;
    private final Executor executor;
    private final HttpClient httpClient;
    private final RequestErrorTracker errorTracker;
    private final RemoteTaskStats stats;
    private final boolean binaryTransportEnabled;
    private final boolean thriftTransportEnabled;
    private final Protocol thriftProtocol;
    private final AtomicLong currentRequestStartNanos = new AtomicLong();
    @GuardedBy(value="this")
    private boolean running;
    @GuardedBy(value="this")
    private ListenableFuture<BaseResponse<TaskStatus>> future;

    public ContinuousTaskStatusFetcher(Consumer<Throwable> onFail, TaskId taskId, TaskStatus initialTaskStatus, Duration refreshMaxWait, Codec<TaskStatus> taskStatusCodec, Executor executor, HttpClient httpClient, Duration maxErrorDuration, ScheduledExecutorService errorScheduledExecutor, RemoteTaskStats stats, boolean binaryTransportEnabled, boolean thriftTransportEnabled, Protocol thriftProtocol) {
        Objects.requireNonNull(initialTaskStatus, "initialTaskStatus is null");
        this.taskId = Objects.requireNonNull(taskId, "taskId is null");
        this.onFail = Objects.requireNonNull(onFail, "onFail is null");
        this.taskStatus = new StateMachine("task-" + String.valueOf(taskId), executor, (Object)initialTaskStatus);
        this.refreshMaxWait = Objects.requireNonNull(refreshMaxWait, "refreshMaxWait is null");
        this.taskStatusCodec = Objects.requireNonNull(taskStatusCodec, "taskStatusCodec is null");
        this.executor = Objects.requireNonNull(executor, "executor is null");
        this.httpClient = Objects.requireNonNull(httpClient, "httpClient is null");
        this.errorTracker = RequestErrorTracker.taskRequestErrorTracker(taskId, initialTaskStatus.getSelf(), maxErrorDuration, errorScheduledExecutor, "getting task status");
        this.stats = Objects.requireNonNull(stats, "stats is null");
        this.binaryTransportEnabled = binaryTransportEnabled;
        this.thriftTransportEnabled = thriftTransportEnabled;
        this.thriftProtocol = Objects.requireNonNull(thriftProtocol, "thriftProtocol is null");
    }

    public synchronized void start() {
        if (this.running) {
            return;
        }
        this.running = true;
        this.scheduleNextRequest();
    }

    public synchronized void stop() {
        this.running = false;
        if (this.future != null) {
            this.future.cancel(false);
            this.future = null;
        }
    }

    private synchronized void scheduleNextRequest() {
        Object responseHandler;
        Request.Builder requestBuilder;
        TaskStatus taskStatus = this.getTaskStatus();
        if (!this.running || taskStatus.getState().isDone()) {
            return;
        }
        if (this.future != null && !this.future.isDone()) {
            log.error("Can not reschedule update because an update is already running");
            return;
        }
        ListenableFuture<?> errorRateLimit = this.errorTracker.acquireRequestPermit();
        if (!errorRateLimit.isDone()) {
            errorRateLimit.addListener(this::scheduleNextRequest, this.executor);
            return;
        }
        if (this.thriftTransportEnabled) {
            requestBuilder = ThriftRequestUtils.prepareThriftGet((Protocol)this.thriftProtocol);
            responseHandler = new ThriftResponseHandler(ThriftCodecWrapper.unwrapThriftCodec(this.taskStatusCodec));
        } else if (this.binaryTransportEnabled) {
            requestBuilder = RequestHelpers.getBinaryTransportBuilder(Request.Builder.prepareGet());
            responseHandler = FullSmileResponseHandler.createFullSmileResponseHandler((SmileCodec)this.taskStatusCodec);
        } else {
            requestBuilder = RequestHelpers.getJsonTransportBuilder(Request.Builder.prepareGet());
            responseHandler = AdaptingJsonResponseHandler.createAdaptingJsonResponseHandler((JsonCodec)this.taskStatusCodec);
        }
        Request request = requestBuilder.setUri(HttpUriBuilder.uriBuilderFrom((URI)taskStatus.getSelf()).appendPath("status").build()).setHeader("X-Presto-Current-State", taskStatus.getState().toString()).setHeader("X-Presto-Max-Wait", this.refreshMaxWait.toString()).build();
        this.errorTracker.startRequest();
        this.future = this.httpClient.executeAsync(request, (ResponseHandler)responseHandler);
        this.currentRequestStartNanos.set(System.nanoTime());
        Object callback = this.thriftTransportEnabled ? new ThriftHttpResponseHandler<TaskStatus>(this, request.getUri(), this.stats.getHttpResponseStats(), (ErrorCodeSupplier)StandardErrorCode.REMOTE_TASK_ERROR) : new SimpleHttpResponseHandler<TaskStatus>(this, request.getUri(), this.stats.getHttpResponseStats(), (ErrorCodeSupplier)StandardErrorCode.REMOTE_TASK_ERROR);
        Futures.addCallback(this.future, callback, (Executor)this.executor);
    }

    TaskStatus getTaskStatus() {
        return (TaskStatus)this.taskStatus.get();
    }

    public void success(TaskStatus value) {
        try (SetThreadName ignored = new SetThreadName("ContinuousTaskStatusFetcher-%s", new Object[]{this.taskId});){
            this.updateStats(this.currentRequestStartNanos.get());
            try {
                this.updateTaskStatus(value);
                this.errorTracker.requestSucceeded();
            }
            finally {
                this.scheduleNextRequest();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void failed(Throwable cause) {
        try (SetThreadName ignored = new SetThreadName("ContinuousTaskStatusFetcher-%s", new Object[]{this.taskId});){
            this.updateStats(this.currentRequestStartNanos.get());
            try {
                TaskStatus taskStatus = this.getTaskStatus();
                if (!taskStatus.getState().isDone()) {
                    this.errorTracker.requestFailed(cause);
                }
            }
            catch (Error e) {
                this.onFail.accept(e);
                throw e;
            }
            catch (RuntimeException e) {
                this.onFail.accept(e);
            }
            finally {
                this.scheduleNextRequest();
            }
        }
    }

    public void fatal(Throwable cause) {
        try (SetThreadName ignored = new SetThreadName("ContinuousTaskStatusFetcher-%s", new Object[]{this.taskId});){
            this.updateStats(this.currentRequestStartNanos.get());
            this.onFail.accept(cause);
        }
    }

    void updateTaskStatus(TaskStatus newValue) {
        AtomicBoolean taskMismatch = new AtomicBoolean();
        this.taskStatus.setIf((Object)newValue, oldValue -> {
            boolean isEmpty;
            boolean bl = isEmpty = oldValue.getTaskInstanceIdLeastSignificantBits() == 0L && oldValue.getTaskInstanceIdMostSignificantBits() == 0L;
            if (!(isEmpty || oldValue.getTaskInstanceIdLeastSignificantBits() == newValue.getTaskInstanceIdLeastSignificantBits() && oldValue.getTaskInstanceIdMostSignificantBits() == newValue.getTaskInstanceIdMostSignificantBits())) {
                taskMismatch.set(true);
                return false;
            }
            if (oldValue.getState().isDone()) {
                return false;
            }
            return newValue.getVersion() >= oldValue.getVersion();
        });
        if (taskMismatch.get()) {
            this.onFail.accept(new PrestoException((ErrorCodeSupplier)StandardErrorCode.REMOTE_TASK_MISMATCH, String.format("%s (%s)", "Could not communicate with the remote task. The node may have crashed or be under too much load. This is probably a transient issue, so please retry your query in a few minutes.", HostAddress.fromUri((URI)this.getTaskStatus().getSelf()))));
        }
    }

    public synchronized boolean isRunning() {
        return this.running;
    }

    public void addStateChangeListener(StateMachine.StateChangeListener<TaskStatus> stateChangeListener) {
        this.taskStatus.addStateChangeListener(stateChangeListener);
    }

    private void updateStats(long currentRequestStartNanos) {
        this.stats.statusRoundTripMillis(Duration.nanosSince((long)currentRequestStartNanos).toMillis());
    }
}

