/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.server.remotetask;

import com.facebook.presto.Session;
import com.facebook.presto.execution.ExecutionFailureInfo;
import com.facebook.presto.execution.FutureStateChange;
import com.facebook.presto.execution.Lifespan;
import com.facebook.presto.execution.NodeTaskMap;
import com.facebook.presto.execution.RemoteTask;
import com.facebook.presto.execution.ScheduledSplit;
import com.facebook.presto.execution.StateMachine;
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.execution.TaskInfo;
import com.facebook.presto.execution.TaskSource;
import com.facebook.presto.execution.TaskState;
import com.facebook.presto.execution.TaskStatus;
import com.facebook.presto.execution.buffer.BufferInfo;
import com.facebook.presto.execution.buffer.OutputBuffers;
import com.facebook.presto.execution.buffer.PageBufferInfo;
import com.facebook.presto.metadata.Split;
import com.facebook.presto.operator.TaskStats;
import com.facebook.presto.server.RequestHelpers;
import com.facebook.presto.server.TaskUpdateRequest;
import com.facebook.presto.server.remotetask.Backoff;
import com.facebook.presto.server.remotetask.ContinuousTaskStatusFetcher;
import com.facebook.presto.server.remotetask.RemoteTaskStats;
import com.facebook.presto.server.remotetask.RequestErrorTracker;
import com.facebook.presto.server.remotetask.SimpleHttpResponseCallback;
import com.facebook.presto.server.remotetask.SimpleHttpResponseHandler;
import com.facebook.presto.server.remotetask.TaskInfoFetcher;
import com.facebook.presto.server.smile.AdaptingJsonResponseHandler;
import com.facebook.presto.server.smile.BaseResponse;
import com.facebook.presto.server.smile.Codec;
import com.facebook.presto.server.smile.FullSmileResponseHandler;
import com.facebook.presto.server.smile.JsonCodecWrapper;
import com.facebook.presto.server.smile.SmileCodec;
import com.facebook.presto.spi.ErrorCodeSupplier;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.StandardErrorCode;
import com.facebook.presto.sql.planner.PlanFragment;
import com.facebook.presto.sql.planner.plan.PlanNode;
import com.facebook.presto.sql.planner.plan.PlanNodeId;
import com.facebook.presto.util.Failures;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Ticker;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Multimap;
import com.google.common.collect.SetMultimap;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.concurrent.SetThreadName;
import io.airlift.http.client.BodyGenerator;
import io.airlift.http.client.HttpClient;
import io.airlift.http.client.HttpStatus;
import io.airlift.http.client.HttpUriBuilder;
import io.airlift.http.client.Request;
import io.airlift.http.client.ResponseHandler;
import io.airlift.http.client.StaticBodyGenerator;
import io.airlift.http.client.StatusResponseHandler;
import io.airlift.log.Logger;
import io.airlift.units.Duration;
import java.net.URI;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.joda.time.DateTime;

public final class HttpRemoteTask
implements RemoteTask {
    private static final Logger log = Logger.get(HttpRemoteTask.class);
    private final TaskId taskId;
    private final URI taskLocation;
    private final Session session;
    private final String nodeId;
    private final PlanFragment planFragment;
    private final OptionalInt totalPartitions;
    private final AtomicLong nextSplitId = new AtomicLong();
    private final Duration maxErrorDuration;
    private final RemoteTaskStats stats;
    private final TaskInfoFetcher taskInfoFetcher;
    private final ContinuousTaskStatusFetcher taskStatusFetcher;
    @GuardedBy(value="this")
    private Future<?> currentRequest;
    @GuardedBy(value="this")
    private long currentRequestStartNanos;
    @GuardedBy(value="this")
    private final SetMultimap<PlanNodeId, ScheduledSplit> pendingSplits = HashMultimap.create();
    @GuardedBy(value="this")
    private volatile int pendingSourceSplitCount;
    @GuardedBy(value="this")
    private final SetMultimap<PlanNodeId, Lifespan> pendingNoMoreSplitsForLifespan = HashMultimap.create();
    @GuardedBy(value="this")
    private final Map<PlanNodeId, Boolean> noMoreSplits = new HashMap<PlanNodeId, Boolean>();
    @GuardedBy(value="this")
    private final AtomicReference<OutputBuffers> outputBuffers = new AtomicReference();
    private final FutureStateChange<?> whenSplitQueueHasSpace = new FutureStateChange();
    @GuardedBy(value="this")
    private boolean splitQueueHasSpace = true;
    @GuardedBy(value="this")
    private OptionalInt whenSplitQueueHasSpaceThreshold = OptionalInt.empty();
    private final boolean summarizeTaskInfo;
    private final HttpClient httpClient;
    private final Executor executor;
    private final ScheduledExecutorService errorScheduledExecutor;
    private final Codec<TaskInfo> taskInfoCodec;
    private final Codec<TaskUpdateRequest> taskUpdateRequestCodec;
    private final RequestErrorTracker updateErrorTracker;
    private final AtomicBoolean needsUpdate = new AtomicBoolean(true);
    private final AtomicBoolean sendPlan = new AtomicBoolean(true);
    private final NodeTaskMap.PartitionedSplitCountTracker partitionedSplitCountTracker;
    private final AtomicBoolean aborting = new AtomicBoolean(false);
    private final boolean isBinaryTransportEnabled;

    public HttpRemoteTask(Session session, TaskId taskId, String nodeId, URI location, PlanFragment planFragment, Multimap<PlanNodeId, Split> initialSplits, OptionalInt totalPartitions, OutputBuffers outputBuffers, HttpClient httpClient, Executor executor, ScheduledExecutorService updateScheduledExecutor, ScheduledExecutorService errorScheduledExecutor, Duration maxErrorDuration, Duration taskStatusRefreshMaxWait, Duration taskInfoUpdateInterval, boolean summarizeTaskInfo, Codec<TaskStatus> taskStatusCodec, Codec<TaskInfo> taskInfoCodec, Codec<TaskUpdateRequest> taskUpdateRequestCodec, NodeTaskMap.PartitionedSplitCountTracker partitionedSplitCountTracker, RemoteTaskStats stats, boolean isBinaryTransportEnabled) {
        Objects.requireNonNull(session, "session is null");
        Objects.requireNonNull(taskId, "taskId is null");
        Objects.requireNonNull(nodeId, "nodeId is null");
        Objects.requireNonNull(location, "location is null");
        Objects.requireNonNull(planFragment, "planFragment is null");
        Objects.requireNonNull(totalPartitions, "totalPartitions is null");
        Objects.requireNonNull(outputBuffers, "outputBuffers is null");
        Objects.requireNonNull(httpClient, "httpClient is null");
        Objects.requireNonNull(executor, "executor is null");
        Objects.requireNonNull(taskStatusCodec, "taskStatusCodec is null");
        Objects.requireNonNull(taskInfoCodec, "taskInfoCodec is null");
        Objects.requireNonNull(taskUpdateRequestCodec, "taskUpdateRequestCodec is null");
        Objects.requireNonNull(partitionedSplitCountTracker, "partitionedSplitCountTracker is null");
        Objects.requireNonNull(maxErrorDuration, "maxErrorDuration is null");
        Objects.requireNonNull(stats, "stats is null");
        try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s", new Object[]{taskId});){
            this.taskId = taskId;
            this.taskLocation = location;
            this.session = session;
            this.nodeId = nodeId;
            this.planFragment = planFragment;
            this.totalPartitions = totalPartitions;
            this.outputBuffers.set(outputBuffers);
            this.httpClient = httpClient;
            this.executor = executor;
            this.errorScheduledExecutor = errorScheduledExecutor;
            this.summarizeTaskInfo = summarizeTaskInfo;
            this.taskInfoCodec = taskInfoCodec;
            this.taskUpdateRequestCodec = taskUpdateRequestCodec;
            this.updateErrorTracker = new RequestErrorTracker(taskId, location, maxErrorDuration, errorScheduledExecutor, "updating task");
            this.partitionedSplitCountTracker = Objects.requireNonNull(partitionedSplitCountTracker, "partitionedSplitCountTracker is null");
            this.maxErrorDuration = maxErrorDuration;
            this.stats = stats;
            this.isBinaryTransportEnabled = isBinaryTransportEnabled;
            for (Map.Entry entry : Objects.requireNonNull(initialSplits, "initialSplits is null").entries()) {
                ScheduledSplit scheduledSplit = new ScheduledSplit(this.nextSplitId.getAndIncrement(), (PlanNodeId)entry.getKey(), (Split)entry.getValue());
                this.pendingSplits.put(entry.getKey(), (Object)scheduledSplit);
            }
            this.pendingSourceSplitCount = planFragment.getPartitionedSources().stream().filter(arg_0 -> initialSplits.containsKey(arg_0)).mapToInt(partitionedSource -> initialSplits.get(partitionedSource).size()).sum();
            List bufferStates = (List)outputBuffers.getBuffers().keySet().stream().map(outputId -> new BufferInfo((OutputBuffers.OutputBufferId)outputId, false, 0, 0L, PageBufferInfo.empty())).collect(ImmutableList.toImmutableList());
            TaskInfo initialTask = TaskInfo.createInitialTask(taskId, location, nodeId, bufferStates, new TaskStats(DateTime.now(), null));
            this.taskStatusFetcher = new ContinuousTaskStatusFetcher(this::failTask, initialTask.getTaskStatus(), taskStatusRefreshMaxWait, taskStatusCodec, executor, httpClient, maxErrorDuration, errorScheduledExecutor, stats, isBinaryTransportEnabled);
            this.taskInfoFetcher = new TaskInfoFetcher(this::failTask, initialTask, httpClient, taskInfoUpdateInterval, taskInfoCodec, maxErrorDuration, summarizeTaskInfo, executor, updateScheduledExecutor, errorScheduledExecutor, stats, isBinaryTransportEnabled);
            this.taskStatusFetcher.addStateChangeListener(newStatus -> {
                TaskState state = newStatus.getState();
                if (state.isDone()) {
                    this.cleanUpTask();
                } else {
                    partitionedSplitCountTracker.setPartitionedSplitCount(this.getPartitionedSplitCount());
                    this.updateSplitQueueSpace();
                }
            });
            partitionedSplitCountTracker.setPartitionedSplitCount(this.getPartitionedSplitCount());
            this.updateSplitQueueSpace();
        }
    }

    @Override
    public TaskId getTaskId() {
        return this.taskId;
    }

    @Override
    public String getNodeId() {
        return this.nodeId;
    }

    @Override
    public TaskInfo getTaskInfo() {
        return this.taskInfoFetcher.getTaskInfo();
    }

    @Override
    public TaskStatus getTaskStatus() {
        return this.taskStatusFetcher.getTaskStatus();
    }

    @Override
    public void start() {
        try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s", new Object[]{this.taskId});){
            this.scheduleUpdate();
            this.taskStatusFetcher.start();
            this.taskInfoFetcher.start();
        }
    }

    @Override
    public synchronized void addSplits(Multimap<PlanNodeId, Split> splitsBySource) {
        Objects.requireNonNull(splitsBySource, "splitsBySource is null");
        if (this.getTaskStatus().getState().isDone()) {
            return;
        }
        boolean needsUpdate = false;
        for (Map.Entry entry : splitsBySource.asMap().entrySet()) {
            PlanNodeId sourceId = (PlanNodeId)entry.getKey();
            Collection splits = (Collection)entry.getValue();
            Preconditions.checkState((!this.noMoreSplits.containsKey(sourceId) ? 1 : 0) != 0, (String)"noMoreSplits has already been set for %s", (Object)sourceId);
            int added = 0;
            for (Split split : splits) {
                if (!this.pendingSplits.put((Object)sourceId, (Object)new ScheduledSplit(this.nextSplitId.getAndIncrement(), sourceId, split))) continue;
                ++added;
            }
            if (this.planFragment.isPartitionedSources(sourceId)) {
                this.pendingSourceSplitCount += added;
                this.partitionedSplitCountTracker.setPartitionedSplitCount(this.getPartitionedSplitCount());
            }
            needsUpdate = true;
        }
        this.updateSplitQueueSpace();
        if (needsUpdate) {
            this.needsUpdate.set(true);
            this.scheduleUpdate();
        }
    }

    @Override
    public synchronized void noMoreSplits(PlanNodeId sourceId) {
        if (this.noMoreSplits.containsKey(sourceId)) {
            return;
        }
        this.noMoreSplits.put(sourceId, true);
        this.needsUpdate.set(true);
        this.scheduleUpdate();
    }

    @Override
    public synchronized void noMoreSplits(PlanNodeId sourceId, Lifespan lifespan) {
        if (this.pendingNoMoreSplitsForLifespan.put((Object)sourceId, (Object)lifespan)) {
            this.needsUpdate.set(true);
            this.scheduleUpdate();
        }
    }

    @Override
    public synchronized void setOutputBuffers(OutputBuffers newOutputBuffers) {
        if (this.getTaskStatus().getState().isDone()) {
            return;
        }
        if (newOutputBuffers.getVersion() > this.outputBuffers.get().getVersion()) {
            this.outputBuffers.set(newOutputBuffers);
            this.needsUpdate.set(true);
            this.scheduleUpdate();
        }
    }

    @Override
    public ListenableFuture<?> removeRemoteSource(TaskId remoteSourceTaskId) {
        URI remoteSourceUri = HttpUriBuilder.uriBuilderFrom((URI)this.taskLocation).appendPath("remote-source").appendPath(remoteSourceTaskId.toString()).build();
        Request request = Request.Builder.prepareDelete().setUri(remoteSourceUri).build();
        RequestErrorTracker errorTracker = new RequestErrorTracker(this.taskId, remoteSourceUri, this.maxErrorDuration, this.errorScheduledExecutor, "Remove exchange remote source");
        SettableFuture future = SettableFuture.create();
        this.doRemoveRemoteSource(errorTracker, request, future);
        return future;
    }

    private void doRemoveRemoteSource(final RequestErrorTracker errorTracker, final Request request, final SettableFuture<?> future) {
        errorTracker.startRequest();
        FutureCallback<StatusResponseHandler.StatusResponse> callback = new FutureCallback<StatusResponseHandler.StatusResponse>(){

            public void onSuccess(@Nullable StatusResponseHandler.StatusResponse response) {
                if (response == null) {
                    throw new PrestoException((ErrorCodeSupplier)StandardErrorCode.GENERIC_INTERNAL_ERROR, "Request failed with null response");
                }
                if (response.getStatusCode() != HttpStatus.OK.code()) {
                    throw new PrestoException((ErrorCodeSupplier)StandardErrorCode.GENERIC_INTERNAL_ERROR, "Request failed with HTTP status " + response.getStatusCode());
                }
                future.set(null);
            }

            public void onFailure(Throwable failedReason) {
                if (failedReason instanceof RejectedExecutionException && HttpRemoteTask.this.httpClient.isClosed()) {
                    log.error("Unable to destroy exchange source at %s. HTTP client is closed", new Object[]{request.getUri()});
                    future.setException(failedReason);
                    return;
                }
                try {
                    errorTracker.requestFailed(failedReason);
                }
                catch (PrestoException e) {
                    future.setException((Throwable)e);
                    return;
                }
                ListenableFuture<?> errorRateLimit = errorTracker.acquireRequestPermit();
                if (errorRateLimit.isDone()) {
                    HttpRemoteTask.this.doRemoveRemoteSource(errorTracker, request, future);
                } else {
                    errorRateLimit.addListener(() -> HttpRemoteTask.this.doRemoveRemoteSource(errorTracker, request, future), (Executor)HttpRemoteTask.this.errorScheduledExecutor);
                }
            }
        };
        Futures.addCallback((ListenableFuture)this.httpClient.executeAsync(request, (ResponseHandler)StatusResponseHandler.createStatusResponseHandler()), (FutureCallback)callback, (Executor)MoreExecutors.directExecutor());
    }

    @Override
    public int getPartitionedSplitCount() {
        TaskStatus taskStatus = this.getTaskStatus();
        if (taskStatus.getState().isDone()) {
            return 0;
        }
        return this.getPendingSourceSplitCount() + taskStatus.getQueuedPartitionedDrivers() + taskStatus.getRunningPartitionedDrivers();
    }

    @Override
    public int getQueuedPartitionedSplitCount() {
        TaskStatus taskStatus = this.getTaskStatus();
        if (taskStatus.getState().isDone()) {
            return 0;
        }
        return this.getPendingSourceSplitCount() + taskStatus.getQueuedPartitionedDrivers();
    }

    private int getPendingSourceSplitCount() {
        return this.pendingSourceSplitCount;
    }

    @Override
    public void addStateChangeListener(StateMachine.StateChangeListener<TaskStatus> stateChangeListener) {
        try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s", new Object[]{this.taskId});){
            this.taskStatusFetcher.addStateChangeListener(stateChangeListener);
        }
    }

    @Override
    public void addFinalTaskInfoListener(StateMachine.StateChangeListener<TaskInfo> stateChangeListener) {
        this.taskInfoFetcher.addFinalTaskInfoListener(stateChangeListener);
    }

    @Override
    public synchronized ListenableFuture<?> whenSplitQueueHasSpace(int threshold) {
        if (this.whenSplitQueueHasSpaceThreshold.isPresent()) {
            Preconditions.checkArgument((threshold == this.whenSplitQueueHasSpaceThreshold.getAsInt() ? 1 : 0) != 0, (Object)"Multiple split queue space notification thresholds not supported");
        } else {
            this.whenSplitQueueHasSpaceThreshold = OptionalInt.of(threshold);
            this.updateSplitQueueSpace();
        }
        if (this.splitQueueHasSpace) {
            return Futures.immediateFuture(null);
        }
        return this.whenSplitQueueHasSpace.createNewListener();
    }

    private synchronized void updateSplitQueueSpace() {
        if (!this.whenSplitQueueHasSpaceThreshold.isPresent()) {
            return;
        }
        boolean bl = this.splitQueueHasSpace = this.getQueuedPartitionedSplitCount() < this.whenSplitQueueHasSpaceThreshold.getAsInt();
        if (this.splitQueueHasSpace) {
            this.whenSplitQueueHasSpace.complete(null, this.executor);
        }
    }

    private synchronized void processTaskUpdate(TaskInfo newValue, List<TaskSource> sources) {
        this.updateTaskInfo(newValue);
        for (TaskSource source : sources) {
            PlanNodeId planNodeId = source.getPlanNodeId();
            int removed = 0;
            for (ScheduledSplit split : source.getSplits()) {
                if (!this.pendingSplits.remove((Object)planNodeId, (Object)split)) continue;
                ++removed;
            }
            if (source.isNoMoreSplits()) {
                this.noMoreSplits.put(planNodeId, false);
            }
            for (Lifespan lifespan : source.getNoMoreSplitsForLifespan()) {
                this.pendingNoMoreSplitsForLifespan.remove((Object)planNodeId, (Object)lifespan);
            }
            if (!this.planFragment.isPartitionedSources(planNodeId)) continue;
            this.pendingSourceSplitCount -= removed;
        }
        this.updateSplitQueueSpace();
        this.partitionedSplitCountTracker.setPartitionedSplitCount(this.getPartitionedSplitCount());
    }

    private void updateTaskInfo(TaskInfo taskInfo) {
        this.taskStatusFetcher.updateTaskStatus(taskInfo.getTaskStatus());
        this.taskInfoFetcher.updateTaskInfo(taskInfo);
    }

    private void scheduleUpdate() {
        this.executor.execute(this::sendUpdate);
    }

    private synchronized void sendUpdate() {
        HttpClient.HttpResponseFuture future;
        TaskStatus taskStatus = this.getTaskStatus();
        if (!this.needsUpdate.get() || taskStatus.getState().isDone()) {
            return;
        }
        if (this.currentRequest != null && !this.currentRequest.isDone()) {
            return;
        }
        ListenableFuture<?> errorRateLimit = this.updateErrorTracker.acquireRequestPermit();
        if (!errorRateLimit.isDone()) {
            errorRateLimit.addListener(this::sendUpdate, this.executor);
            return;
        }
        List<TaskSource> sources = this.getSources();
        Optional<PlanFragment> fragment = this.sendPlan.get() ? Optional.of(this.planFragment) : Optional.empty();
        TaskUpdateRequest updateRequest = new TaskUpdateRequest(this.session.toSessionRepresentation(), fragment, sources, this.outputBuffers.get(), this.totalPartitions);
        byte[] taskUpdateRequestJson = this.taskUpdateRequestCodec.toBytes(updateRequest);
        if (fragment.isPresent()) {
            this.stats.updateWithPlanBytes(taskUpdateRequestJson.length);
        }
        HttpUriBuilder uriBuilder = this.getHttpUriBuilder(taskStatus);
        Request request = RequestHelpers.setContentTypeHeaders(this.isBinaryTransportEnabled, Request.Builder.preparePost()).setUri(uriBuilder.build()).setBodyGenerator((BodyGenerator)StaticBodyGenerator.createStaticBodyGenerator((byte[])taskUpdateRequestJson)).build();
        Object responseHandler = this.isBinaryTransportEnabled ? FullSmileResponseHandler.createFullSmileResponseHandler((SmileCodec)this.taskInfoCodec) : AdaptingJsonResponseHandler.createAdaptingJsonResponseHandler(JsonCodecWrapper.unwrapJsonCodec(this.taskInfoCodec));
        this.updateErrorTracker.startRequest();
        this.currentRequest = future = this.httpClient.executeAsync(request, responseHandler);
        this.currentRequestStartNanos = System.nanoTime();
        this.needsUpdate.set(false);
        Futures.addCallback((ListenableFuture)future, new SimpleHttpResponseHandler<TaskInfo>(new UpdateResponseHandler(sources), request.getUri(), this.stats), (Executor)this.executor);
    }

    private synchronized List<TaskSource> getSources() {
        return (List)Stream.concat(this.planFragment.getPartitionedSourceNodes().stream(), this.planFragment.getRemoteSourceNodes().stream()).filter(Objects::nonNull).map(PlanNode::getId).map(this::getSource).filter(Objects::nonNull).collect(ImmutableList.toImmutableList());
    }

    private synchronized TaskSource getSource(PlanNodeId planNodeId) {
        Set splits = this.pendingSplits.get((Object)planNodeId);
        boolean pendingNoMoreSplits = Boolean.TRUE.equals(this.noMoreSplits.get(planNodeId));
        boolean noMoreSplits = this.noMoreSplits.containsKey(planNodeId);
        Set noMoreSplitsForLifespan = this.pendingNoMoreSplitsForLifespan.get((Object)planNodeId);
        TaskSource element = null;
        if (!splits.isEmpty() || !noMoreSplitsForLifespan.isEmpty() || pendingNoMoreSplits) {
            element = new TaskSource(planNodeId, splits, noMoreSplitsForLifespan, noMoreSplits);
        }
        return element;
    }

    @Override
    public synchronized void cancel() {
        try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s", new Object[]{this.taskId});){
            TaskStatus taskStatus = this.getTaskStatus();
            if (taskStatus.getState().isDone()) {
                return;
            }
            HttpUriBuilder uriBuilder = this.getHttpUriBuilder(taskStatus).addParameter("abort", new String[]{"false"});
            Request request = RequestHelpers.setContentTypeHeaders(this.isBinaryTransportEnabled, Request.Builder.prepareDelete()).setUri(uriBuilder.build()).build();
            this.scheduleAsyncCleanupRequest(HttpRemoteTask.createCleanupBackoff(), request, "cancel");
        }
    }

    private synchronized void cleanUpTask() {
        Preconditions.checkState((boolean)this.getTaskStatus().getState().isDone(), (Object)"attempt to clean up a task that is not done yet");
        this.pendingSplits.clear();
        this.pendingSourceSplitCount = 0;
        this.partitionedSplitCountTracker.setPartitionedSplitCount(this.getPartitionedSplitCount());
        this.splitQueueHasSpace = true;
        this.whenSplitQueueHasSpace.complete(null, this.executor);
        if (this.currentRequest != null) {
            this.currentRequest.cancel(true);
            this.currentRequest = null;
            this.currentRequestStartNanos = 0L;
        }
        this.taskStatusFetcher.stop();
        HttpUriBuilder uriBuilder = this.getHttpUriBuilder(this.getTaskStatus());
        Request request = RequestHelpers.setContentTypeHeaders(this.isBinaryTransportEnabled, Request.Builder.prepareDelete()).setUri(uriBuilder.build()).build();
        this.scheduleAsyncCleanupRequest(HttpRemoteTask.createCleanupBackoff(), request, "cleanup");
    }

    @Override
    public synchronized void abort() {
        if (this.getTaskStatus().getState().isDone()) {
            return;
        }
        this.abort(TaskStatus.failWith(this.getTaskStatus(), TaskState.ABORTED, (List<ExecutionFailureInfo>)ImmutableList.of()));
    }

    private synchronized void abort(TaskStatus status) {
        Preconditions.checkState((boolean)status.getState().isDone(), (Object)"cannot abort task with an incomplete status");
        try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s", new Object[]{this.taskId});){
            this.taskStatusFetcher.updateTaskStatus(status);
            HttpUriBuilder uriBuilder = this.getHttpUriBuilder(this.getTaskStatus());
            Request request = RequestHelpers.setContentTypeHeaders(this.isBinaryTransportEnabled, Request.Builder.prepareDelete()).setUri(uriBuilder.build()).build();
            this.scheduleAsyncCleanupRequest(HttpRemoteTask.createCleanupBackoff(), request, "abort");
        }
    }

    private void scheduleAsyncCleanupRequest(Backoff cleanupBackoff, Request request, String action) {
        if (!this.aborting.compareAndSet(false, true)) {
            return;
        }
        this.doScheduleAsyncCleanupRequest(cleanupBackoff, request, action);
    }

    private void doScheduleAsyncCleanupRequest(final Backoff cleanupBackoff, final Request request, final String action) {
        Object responseHandler = this.isBinaryTransportEnabled ? FullSmileResponseHandler.createFullSmileResponseHandler((SmileCodec)this.taskInfoCodec) : AdaptingJsonResponseHandler.createAdaptingJsonResponseHandler(JsonCodecWrapper.unwrapJsonCodec(this.taskInfoCodec));
        Futures.addCallback((ListenableFuture)this.httpClient.executeAsync(request, responseHandler), (FutureCallback)new FutureCallback<BaseResponse<TaskInfo>>(){

            public void onSuccess(BaseResponse<TaskInfo> result) {
                try {
                    HttpRemoteTask.this.updateTaskInfo(result.getValue());
                }
                finally {
                    if (!HttpRemoteTask.this.getTaskInfo().getTaskStatus().getState().isDone()) {
                        this.cleanUpLocally();
                    }
                }
            }

            public void onFailure(Throwable t) {
                if (t instanceof RejectedExecutionException && HttpRemoteTask.this.httpClient.isClosed()) {
                    RequestErrorTracker.logError(t, "Unable to %s task at %s. HTTP client is closed.", action, request.getUri());
                    this.cleanUpLocally();
                    return;
                }
                if (cleanupBackoff.failure()) {
                    RequestErrorTracker.logError(t, "Unable to %s task at %s. Back off depleted.", action, request.getUri());
                    this.cleanUpLocally();
                    return;
                }
                long delayNanos = cleanupBackoff.getBackoffDelayNanos();
                if (delayNanos == 0L) {
                    HttpRemoteTask.this.doScheduleAsyncCleanupRequest(cleanupBackoff, request, action);
                } else {
                    HttpRemoteTask.this.errorScheduledExecutor.schedule(() -> HttpRemoteTask.this.doScheduleAsyncCleanupRequest(cleanupBackoff, request, action), delayNanos, TimeUnit.NANOSECONDS);
                }
            }

            private void cleanUpLocally() {
                HttpRemoteTask.this.updateTaskInfo(HttpRemoteTask.this.getTaskInfo().withTaskStatus(HttpRemoteTask.this.getTaskStatus()));
            }
        }, (Executor)this.executor);
    }

    private void failTask(Throwable cause) {
        TaskStatus taskStatus = this.getTaskStatus();
        if (!taskStatus.getState().isDone()) {
            log.debug(cause, "Remote task %s failed with %s", new Object[]{taskStatus.getSelf(), cause});
        }
        this.abort(TaskStatus.failWith(this.getTaskStatus(), TaskState.FAILED, (List<ExecutionFailureInfo>)ImmutableList.of((Object)Failures.toFailure(cause))));
    }

    private HttpUriBuilder getHttpUriBuilder(TaskStatus taskStatus) {
        HttpUriBuilder uriBuilder = HttpUriBuilder.uriBuilderFrom((URI)taskStatus.getSelf());
        if (this.summarizeTaskInfo) {
            uriBuilder.addParameter("summarize", new String[0]);
        }
        return uriBuilder;
    }

    private static Backoff createCleanupBackoff() {
        return new Backoff(10, new Duration(10.0, TimeUnit.MINUTES), Ticker.systemTicker(), (List<Duration>)ImmutableList.builder().add((Object)new Duration(0.0, TimeUnit.MILLISECONDS)).add((Object)new Duration(100.0, TimeUnit.MILLISECONDS)).add((Object)new Duration(500.0, TimeUnit.MILLISECONDS)).add((Object)new Duration(1.0, TimeUnit.SECONDS)).add((Object)new Duration(10.0, TimeUnit.SECONDS)).build());
    }

    public String toString() {
        return MoreObjects.toStringHelper((Object)this).addValue((Object)this.getTaskInfo()).toString();
    }

    private class UpdateResponseHandler
    implements SimpleHttpResponseCallback<TaskInfo> {
        private final List<TaskSource> sources;

        private UpdateResponseHandler(List<TaskSource> sources) {
            this.sources = ImmutableList.copyOf((Collection)Objects.requireNonNull(sources, "sources is null"));
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void success(TaskInfo value) {
            try (SetThreadName ignored = new SetThreadName("UpdateResponseHandler-%s", new Object[]{HttpRemoteTask.this.taskId});){
                try {
                    long currentRequestStartNanos;
                    HttpRemoteTask httpRemoteTask = HttpRemoteTask.this;
                    synchronized (httpRemoteTask) {
                        HttpRemoteTask.this.currentRequest = null;
                        HttpRemoteTask.this.sendPlan.set(value.isNeedsPlan());
                        currentRequestStartNanos = HttpRemoteTask.this.currentRequestStartNanos;
                    }
                    this.updateStats(currentRequestStartNanos);
                    HttpRemoteTask.this.processTaskUpdate(value, this.sources);
                    HttpRemoteTask.this.updateErrorTracker.requestSucceeded();
                }
                finally {
                    HttpRemoteTask.this.sendUpdate();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void failed(Throwable cause) {
            try (SetThreadName ignored = new SetThreadName("UpdateResponseHandler-%s", new Object[]{HttpRemoteTask.this.taskId});){
                try {
                    long currentRequestStartNanos;
                    HttpRemoteTask httpRemoteTask = HttpRemoteTask.this;
                    synchronized (httpRemoteTask) {
                        HttpRemoteTask.this.currentRequest = null;
                        currentRequestStartNanos = HttpRemoteTask.this.currentRequestStartNanos;
                    }
                    this.updateStats(currentRequestStartNanos);
                    HttpRemoteTask.this.needsUpdate.set(true);
                    TaskStatus taskStatus = HttpRemoteTask.this.getTaskStatus();
                    if (!taskStatus.getState().isDone()) {
                        HttpRemoteTask.this.updateErrorTracker.requestFailed(cause);
                    }
                }
                catch (Error e) {
                    HttpRemoteTask.this.failTask(e);
                    throw e;
                }
                catch (RuntimeException e) {
                    HttpRemoteTask.this.failTask(e);
                }
                finally {
                    HttpRemoteTask.this.sendUpdate();
                }
            }
        }

        @Override
        public void fatal(Throwable cause) {
            try (SetThreadName ignored = new SetThreadName("UpdateResponseHandler-%s", new Object[]{HttpRemoteTask.this.taskId});){
                HttpRemoteTask.this.failTask(cause);
            }
        }

        private void updateStats(long currentRequestStartNanos) {
            Duration requestRoundTrip = Duration.nanosSince((long)currentRequestStartNanos);
            HttpRemoteTask.this.stats.updateRoundTripMillis(requestRoundTrip.toMillis());
        }
    }
}

