/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.server;

import com.facebook.presto.OutputBuffers;
import com.facebook.presto.ScheduledSplit;
import com.facebook.presto.TaskSource;
import com.facebook.presto.execution.BufferInfo;
import com.facebook.presto.execution.RemoteTask;
import com.facebook.presto.execution.SharedBuffer;
import com.facebook.presto.execution.SharedBufferInfo;
import com.facebook.presto.execution.StateMachine;
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.execution.TaskInfo;
import com.facebook.presto.execution.TaskState;
import com.facebook.presto.metadata.Node;
import com.facebook.presto.operator.TaskContext;
import com.facebook.presto.operator.TaskStats;
import com.facebook.presto.server.TaskUpdateRequest;
import com.facebook.presto.spi.Split;
import com.facebook.presto.split.RemoteSplit;
import com.facebook.presto.sql.analyzer.Session;
import com.facebook.presto.sql.planner.OutputReceiver;
import com.facebook.presto.sql.planner.PlanFragment;
import com.facebook.presto.sql.planner.plan.PlanNode;
import com.facebook.presto.sql.planner.plan.PlanNodeId;
import com.facebook.presto.tuple.TupleInfo;
import com.facebook.presto.util.Failures;
import com.facebook.presto.util.SetThreadName;
import com.google.common.base.Function;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
import com.google.common.collect.SetMultimap;
import com.google.common.net.MediaType;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.RateLimiter;
import io.airlift.http.client.AsyncHttpClient;
import io.airlift.http.client.BodyGenerator;
import io.airlift.http.client.FullJsonResponseHandler;
import io.airlift.http.client.HttpStatus;
import io.airlift.http.client.HttpUriBuilder;
import io.airlift.http.client.JsonBodyGenerator;
import io.airlift.http.client.Request;
import io.airlift.http.client.ResponseHandler;
import io.airlift.http.client.StatusResponseHandler;
import io.airlift.json.JsonCodec;
import io.airlift.log.Logger;
import io.airlift.units.Duration;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.concurrent.GuardedBy;
import org.joda.time.DateTime;

public class HttpRemoteTask
implements RemoteTask {
    private static final Logger log = Logger.get(HttpRemoteTask.class);
    private final TaskId taskId;
    private final Session session;
    private final String nodeId;
    private final PlanFragment planFragment;
    private final int maxConsecutiveErrorCount;
    private final Duration minErrorDuration;
    private final AtomicLong nextSplitId = new AtomicLong();
    private final StateMachine<TaskInfo> taskInfo;
    @GuardedBy(value="this")
    private Future<?> currentRequest;
    @GuardedBy(value="this")
    private long currentRequestStartNanos;
    @GuardedBy(value="this")
    private final SetMultimap<PlanNodeId, ScheduledSplit> pendingSplits = HashMultimap.create();
    @GuardedBy(value="this")
    private boolean noMoreSplits;
    @GuardedBy(value="this")
    private final SetMultimap<PlanNodeId, URI> exchangeLocations = HashMultimap.create();
    @GuardedBy(value="this")
    private boolean noMoreExchangeLocations;
    @GuardedBy(value="this")
    private final Set<String> outputIds = new TreeSet<String>();
    @GuardedBy(value="this")
    private boolean noMoreOutputIds;
    @GuardedBy(value="this")
    private ContinuousTaskInfoFetcher continuousTaskInfoFetcher;
    private final AsyncHttpClient httpClient;
    private final Executor executor;
    private final JsonCodec<TaskInfo> taskInfoCodec;
    private final JsonCodec<TaskUpdateRequest> taskUpdateRequestCodec;
    private final List<TupleInfo> tupleInfos;
    private final Map<PlanNodeId, OutputReceiver> outputReceivers;
    private final RateLimiter errorRequestRateLimiter = RateLimiter.create((double)0.1);
    private final AtomicLong lastSuccessfulRequest = new AtomicLong(System.nanoTime());
    private final AtomicLong errorCount = new AtomicLong();
    private final Queue<Throwable> errorsSinceLastSuccess = new ConcurrentLinkedQueue<Throwable>();
    private final AtomicBoolean needsUpdate = new AtomicBoolean(true);

    public HttpRemoteTask(Session session, TaskId taskId, Node node, URI location, PlanFragment planFragment, Split initialSplit, Map<PlanNodeId, OutputReceiver> outputReceivers, Multimap<PlanNodeId, URI> initialExchangeLocations, Set<String> initialOutputIds, AsyncHttpClient httpClient, Executor executor, int maxConsecutiveErrorCount, Duration minErrorDuration, JsonCodec<TaskInfo> taskInfoCodec, JsonCodec<TaskUpdateRequest> taskUpdateRequestCodec) {
        Preconditions.checkNotNull((Object)session, (Object)"session is null");
        Preconditions.checkNotNull((Object)taskId, (Object)"taskId is null");
        Preconditions.checkNotNull((Object)location, (Object)"location is null");
        Preconditions.checkNotNull((Object)planFragment, (Object)"planFragment1 is null");
        Preconditions.checkNotNull(outputReceivers, (Object)"outputReceivers is null");
        Preconditions.checkNotNull(initialOutputIds, (Object)"initialOutputIds is null");
        Preconditions.checkNotNull((Object)httpClient, (Object)"httpClient is null");
        Preconditions.checkNotNull((Object)executor, (Object)"executor is null");
        Preconditions.checkNotNull(taskInfoCodec, (Object)"taskInfoCodec is null");
        Preconditions.checkNotNull(taskUpdateRequestCodec, (Object)"taskUpdateRequestCodec is null");
        try (SetThreadName setThreadName = new SetThreadName("HttpRemoteTask-%s", new Object[]{taskId});){
            this.taskId = taskId;
            this.session = session;
            this.nodeId = node.getNodeIdentifier();
            this.planFragment = planFragment;
            this.outputReceivers = ImmutableMap.copyOf(outputReceivers);
            this.outputIds.addAll(initialOutputIds);
            this.httpClient = httpClient;
            this.executor = executor;
            this.taskInfoCodec = taskInfoCodec;
            this.taskUpdateRequestCodec = taskUpdateRequestCodec;
            this.tupleInfos = planFragment.getTupleInfos();
            this.maxConsecutiveErrorCount = maxConsecutiveErrorCount;
            this.minErrorDuration = minErrorDuration;
            for (Map.Entry entry : initialExchangeLocations.entries()) {
                ScheduledSplit scheduledSplit = new ScheduledSplit(this.nextSplitId.getAndIncrement(), (Split)this.createRemoteSplitFor((URI)entry.getValue()));
                this.pendingSplits.put(entry.getKey(), (Object)scheduledSplit);
            }
            this.exchangeLocations.putAll(initialExchangeLocations);
            ImmutableList bufferStates = ImmutableList.copyOf((Iterable)Iterables.transform(initialOutputIds, (Function)new Function<String, BufferInfo>(){

                public BufferInfo apply(String outputId) {
                    return new BufferInfo(outputId, false, 0, 0L);
                }
            }));
            if (initialSplit != null) {
                Preconditions.checkState((boolean)planFragment.isPartitioned(), (Object)"Plan is not partitioned");
                this.pendingSplits.put((Object)planFragment.getPartitionedSource(), (Object)new ScheduledSplit(this.nextSplitId.getAndIncrement(), initialSplit));
            }
            TaskStats taskStats = new TaskContext(taskId, executor, session).getTaskStats();
            this.taskInfo = new StateMachine("task " + taskId, executor, (Object)new TaskInfo(taskId, 0L, TaskState.PLANNED, location, DateTime.now(), new SharedBufferInfo(SharedBuffer.QueueState.OPEN, 0L, 0L, (List)bufferStates), (Set)ImmutableSet.of(), taskStats, (List)ImmutableList.of(), (Map)ImmutableMap.of()));
        }
    }

    public TaskInfo getTaskInfo() {
        return (TaskInfo)this.taskInfo.get();
    }

    public void start() {
        try (SetThreadName setThreadName = new SetThreadName("HttpRemoteTask-%s", new Object[]{this.taskId});){
            this.scheduleUpdate();
        }
    }

    public synchronized void addSplit(Split split) {
        try (SetThreadName setThreadName = new SetThreadName("HttpRemoteTask-%s", new Object[]{this.taskId});){
            Preconditions.checkNotNull((Object)split, (Object)"split is null");
            Preconditions.checkState((!this.noMoreSplits ? 1 : 0) != 0, (Object)"noMoreSplits has already been set");
            Preconditions.checkState((boolean)this.planFragment.isPartitioned(), (Object)"Plan is not partitioned");
            if (!this.getTaskInfo().getState().isDone()) {
                this.pendingSplits.put((Object)this.planFragment.getPartitionedSource(), (Object)new ScheduledSplit(this.nextSplitId.getAndIncrement(), split));
                this.needsUpdate.set(true);
            }
            this.scheduleUpdate();
        }
    }

    public synchronized void noMoreSplits() {
        try (SetThreadName setThreadName = new SetThreadName("HttpRemoteTask-%s", new Object[]{this.taskId});){
            Preconditions.checkState((!this.noMoreSplits ? 1 : 0) != 0, (Object)"noMoreSplits has already been set");
            this.noMoreSplits = true;
            this.needsUpdate.set(true);
            this.scheduleUpdate();
        }
    }

    public synchronized void addExchangeLocations(Multimap<PlanNodeId, URI> additionalLocations, boolean noMore) {
        try (SetThreadName setThreadName = new SetThreadName("HttpRemoteTask-%s", new Object[]{this.taskId});){
            if (this.getTaskInfo().getState().isDone()) {
                return;
            }
            if (this.noMoreExchangeLocations == noMore && this.exchangeLocations.entries().containsAll(additionalLocations.entries())) {
                return;
            }
            Preconditions.checkState((!this.noMoreExchangeLocations ? 1 : 0) != 0, (Object)"Locations can not be added after noMoreExchangeLocations has been set");
            HashMultimap newExchangeLocations = HashMultimap.create(additionalLocations);
            newExchangeLocations.entries().removeAll(this.exchangeLocations.entries());
            for (Map.Entry entry : newExchangeLocations.entries()) {
                ScheduledSplit scheduledSplit = new ScheduledSplit(this.nextSplitId.getAndIncrement(), (Split)this.createRemoteSplitFor((URI)entry.getValue()));
                this.pendingSplits.put(entry.getKey(), (Object)scheduledSplit);
            }
            this.exchangeLocations.putAll(additionalLocations);
            this.noMoreExchangeLocations = noMore;
            this.needsUpdate.set(true);
            this.scheduleUpdate();
        }
    }

    public synchronized void addOutputBuffers(Set<String> outputBuffers, boolean noMore) {
        try (SetThreadName setThreadName = new SetThreadName("HttpRemoteTask-%s", new Object[]{this.taskId});){
            if (this.getTaskInfo().getState().isDone()) {
                return;
            }
            if (this.noMoreOutputIds == noMore && this.outputIds.containsAll(outputBuffers)) {
                return;
            }
            Preconditions.checkState((!this.noMoreOutputIds ? 1 : 0) != 0, (Object)"New buffers can not be added after noMoreOutputIds has been set");
            this.outputIds.addAll(outputBuffers);
            this.noMoreOutputIds = noMore;
            this.needsUpdate.set(true);
            this.scheduleUpdate();
        }
    }

    public synchronized int getQueuedSplits() {
        try (SetThreadName setThreadName = new SetThreadName("HttpRemoteTask-%s", new Object[]{this.taskId});){
            int pendingSplitCount = 0;
            if (this.planFragment.isPartitioned()) {
                pendingSplitCount = this.pendingSplits.get((Object)this.planFragment.getPartitionedSource()).size();
            }
            int n = pendingSplitCount + ((TaskInfo)this.taskInfo.get()).getStats().getQueuedDrivers();
            return n;
        }
    }

    public void addStateChangeListener(StateMachine.StateChangeListener<TaskInfo> stateChangeListener) {
        try (SetThreadName setThreadName = new SetThreadName("HttpRemoteTask-%s", new Object[]{this.taskId});){
            this.taskInfo.addStateChangeListener(stateChangeListener);
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public Duration waitForTaskToFinish(Duration maxWait) throws InterruptedException {
        try (SetThreadName setThreadName = new SetThreadName("HttpRemoteTask-%s", new Object[]{this.taskId});){
            while (true) {
                TaskInfo currentState = (TaskInfo)this.taskInfo.get();
                if (maxWait.toMillis() <= 1L || currentState.getState().isDone()) {
                    Duration duration = maxWait;
                    return duration;
                }
                maxWait = this.taskInfo.waitForStateChange((Object)currentState, maxWait);
                continue;
                break;
            }
        }
    }

    private synchronized void updateTaskInfo(final TaskInfo newValue) {
        for (Map.Entry entry : newValue.getOutputs().entrySet()) {
            OutputReceiver outputReceiver = this.outputReceivers.get(entry.getKey());
            Preconditions.checkState((outputReceiver != null ? 1 : 0) != 0, (String)"Got Result for node %s which is not an output receiver!", (Object[])new Object[]{entry.getKey()});
            for (Object result : (Set)entry.getValue()) {
                outputReceiver.updateOutput(result);
            }
        }
        if (newValue.getState().isDone()) {
            this.pendingSplits.clear();
        }
        this.taskInfo.setIf((Object)newValue, (Predicate)new Predicate<TaskInfo>(){

            public boolean apply(TaskInfo oldValue) {
                if (oldValue.getState().isDone()) {
                    return false;
                }
                return newValue.getVersion() >= oldValue.getVersion();
            }
        });
    }

    private synchronized void scheduleUpdate() {
        AsyncHttpClient.AsyncHttpResponseFuture future;
        if (!this.needsUpdate.get() || ((TaskInfo)this.taskInfo.get()).getState().isDone()) {
            return;
        }
        if (this.currentRequest != null && Duration.nanosSince((long)this.currentRequestStartNanos).compareTo(new Duration(2.0, TimeUnit.SECONDS)) >= 0) {
            this.needsUpdate.set(true);
            this.currentRequest.cancel(true);
            this.currentRequest = null;
            this.currentRequestStartNanos = 0L;
        }
        if (this.currentRequest != null && !this.currentRequest.isDone()) {
            return;
        }
        if (this.errorCount.get() > 0L) {
            this.errorRequestRateLimiter.acquire();
        }
        List<TaskSource> sources = this.getSources();
        TaskUpdateRequest updateRequest = new TaskUpdateRequest(this.session, this.planFragment, sources, new OutputBuffers(this.outputIds, this.noMoreOutputIds));
        Request request = Request.Builder.preparePost().setUri(HttpUriBuilder.uriBuilderFrom((URI)((TaskInfo)this.taskInfo.get()).getSelf()).build()).setHeader("Content-Type", MediaType.JSON_UTF_8.toString()).setBodyGenerator((BodyGenerator)JsonBodyGenerator.jsonBodyGenerator(this.taskUpdateRequestCodec, (Object)updateRequest)).build();
        this.currentRequest = future = this.httpClient.executeAsync(request, (ResponseHandler)FullJsonResponseHandler.createFullJsonResponseHandler(this.taskInfoCodec));
        this.currentRequestStartNanos = System.nanoTime();
        Futures.addCallback((ListenableFuture)future, new SimpleHttpResponseHandler<TaskInfo>(new UpdateResponseHandler(sources), request.getUri()), (Executor)this.executor);
        this.needsUpdate.set(false);
    }

    private synchronized List<TaskSource> getSources() {
        Set splits;
        ImmutableList.Builder sources = ImmutableList.builder();
        if (this.planFragment.isPartitioned() && (!(splits = this.pendingSplits.get((Object)this.planFragment.getPartitionedSource())).isEmpty() || this.noMoreSplits)) {
            sources.add((Object)new TaskSource(this.planFragment.getPartitionedSource(), splits, this.noMoreSplits));
        }
        for (PlanNode planNode : this.planFragment.getSources()) {
            Set splits2;
            PlanNodeId planNodeId = planNode.getId();
            if (planNodeId.equals((Object)this.planFragment.getPartitionedSource()) || (splits2 = this.pendingSplits.get((Object)planNodeId)).isEmpty() && !this.noMoreExchangeLocations) continue;
            sources.add((Object)new TaskSource(planNodeId, splits2, this.noMoreExchangeLocations));
        }
        return sources.build();
    }

    public synchronized void cancel() {
        try (SetThreadName setThreadName = new SetThreadName("HttpRemoteTask-%s", new Object[]{this.taskId});){
            this.pendingSplits.clear();
            if (this.currentRequest != null) {
                this.currentRequest.cancel(true);
                this.currentRequest = null;
                this.currentRequestStartNanos = 0L;
            }
            TaskInfo taskInfo = this.getTaskInfo();
            this.updateTaskInfo(new TaskInfo(taskInfo.getTaskId(), Long.MAX_VALUE, TaskState.CANCELED, taskInfo.getSelf(), taskInfo.getLastHeartbeat(), taskInfo.getOutputBuffers(), taskInfo.getNoMoreSplits(), taskInfo.getStats(), (List)ImmutableList.of(), (Map)ImmutableMap.of()));
            if (taskInfo.getSelf() != null) {
                final long start = System.nanoTime();
                final Request request = Request.Builder.prepareDelete().setUri(taskInfo.getSelf()).build();
                Futures.addCallback((ListenableFuture)this.httpClient.executeAsync(request, (ResponseHandler)StatusResponseHandler.createStatusResponseHandler()), (FutureCallback)new FutureCallback<StatusResponseHandler.StatusResponse>(){

                    public void onSuccess(StatusResponseHandler.StatusResponse result) {
                    }

                    public void onFailure(Throwable t) {
                        if (t instanceof RejectedExecutionException) {
                            return;
                        }
                        if (Duration.nanosSince((long)start).compareTo(new Duration(2.0, TimeUnit.MINUTES)) < 0) {
                            Futures.addCallback((ListenableFuture)HttpRemoteTask.this.httpClient.executeAsync(request, (ResponseHandler)StatusResponseHandler.createStatusResponseHandler()), (FutureCallback)this, (Executor)HttpRemoteTask.this.executor);
                        } else {
                            log.error(t, "Unable to cancel task at %s", new Object[]{request.getUri()});
                        }
                    }
                }, (Executor)this.executor);
            }
        }
    }

    private synchronized void requestSucceeded(TaskInfo newValue, List<TaskSource> sources) {
        try (SetThreadName setThreadName = new SetThreadName("HttpRemoteTask-%s", new Object[]{this.taskId});){
            this.updateTaskInfo(newValue);
            this.lastSuccessfulRequest.set(System.nanoTime());
            this.errorCount.set(0L);
            this.errorsSinceLastSuccess.clear();
            for (TaskSource source : sources) {
                PlanNodeId planNodeId = source.getPlanNodeId();
                for (ScheduledSplit split : source.getSplits()) {
                    this.pendingSplits.remove((Object)planNodeId, (Object)split);
                }
            }
            if (this.continuousTaskInfoFetcher == null) {
                this.continuousTaskInfoFetcher = new ContinuousTaskInfoFetcher();
                this.continuousTaskInfoFetcher.start();
            }
        }
    }

    private synchronized void requestFailed(Throwable reason) {
        if (reason instanceof CancellationException) {
            return;
        }
        TaskInfo taskInfo = this.getTaskInfo();
        if (HttpRemoteTask.isSocketError(reason)) {
            log.warn("Error updating task %s: %s: %s", new Object[]{taskInfo.getTaskId(), reason.getMessage(), taskInfo.getSelf()});
        } else {
            log.warn(reason, "Error updating task %s: %s", new Object[]{taskInfo.getTaskId(), taskInfo.getSelf()});
        }
        if (this.errorsSinceLastSuccess.size() < 10) {
            this.errorsSinceLastSuccess.add(reason);
        }
        long errorCount = this.errorCount.incrementAndGet();
        Duration timeSinceLastSuccess = Duration.nanosSince((long)this.lastSuccessfulRequest.get());
        if (errorCount > (long)this.maxConsecutiveErrorCount && timeSinceLastSuccess.compareTo(this.minErrorDuration) > 0) {
            RuntimeException exception = new RuntimeException(String.format("Too many requests to %s failed: %s failures: Time since last success %s", taskInfo.getSelf(), errorCount, timeSinceLastSuccess));
            for (Throwable error : this.errorsSinceLastSuccess) {
                exception.addSuppressed(error);
            }
            this.failTask(exception);
            this.cancel();
        }
    }

    private void failTask(Throwable cause) {
        TaskInfo taskInfo = this.getTaskInfo();
        if (!taskInfo.getState().isDone()) {
            log.debug(cause, "Remote task failed: %s", new Object[]{taskInfo.getSelf()});
        }
        this.updateTaskInfo(new TaskInfo(taskInfo.getTaskId(), Long.MAX_VALUE, TaskState.FAILED, taskInfo.getSelf(), taskInfo.getLastHeartbeat(), taskInfo.getOutputBuffers(), taskInfo.getNoMoreSplits(), taskInfo.getStats(), (List)ImmutableList.of((Object)Failures.toFailure((Throwable)cause)), taskInfo.getOutputs()));
    }

    private RemoteSplit createRemoteSplitFor(URI taskLocation) {
        URI splitLocation = HttpUriBuilder.uriBuilderFrom((URI)taskLocation).appendPath("results").appendPath(this.nodeId).build();
        return new RemoteSplit(splitLocation, this.tupleInfos);
    }

    public String toString() {
        return Objects.toStringHelper((Object)this).addValue((Object)this.getTaskInfo()).toString();
    }

    private static boolean isSocketError(Throwable t) {
        while (t != null) {
            if (t instanceof SocketException || t instanceof SocketTimeoutException) {
                return true;
            }
            t = t.getCause();
        }
        return false;
    }

    public static interface SimpleHttpResponseCallback<T> {
        public void success(T var1);

        public void failed(Throwable var1);

        public void fatal(Throwable var1);
    }

    public static class SimpleHttpResponseHandler<T>
    implements FutureCallback<FullJsonResponseHandler.JsonResponse<T>> {
        private final SimpleHttpResponseCallback<T> callback;
        private final URI uri;

        public SimpleHttpResponseHandler(SimpleHttpResponseCallback<T> callback, URI uri) {
            this.callback = callback;
            this.uri = uri;
        }

        public void onSuccess(FullJsonResponseHandler.JsonResponse<T> response) {
            try {
                if (response.getStatusCode() == HttpStatus.OK.code() && response.hasValue()) {
                    this.callback.success(response.getValue());
                } else if (response.getStatusCode() == HttpStatus.SERVICE_UNAVAILABLE.code()) {
                    this.callback.failed(new RuntimeException("Server at %s returned SERVICE_UNAVAILABLE"));
                } else {
                    RuntimeException cause = response.getException();
                    if (cause == null) {
                        cause = response.getStatusCode() == HttpStatus.OK.code() ? new RuntimeException(String.format("Expected response from %s is empty", this.uri)) : new RuntimeException(String.format("Expected response code from %s to be %s, but was %s: %s", this.uri, HttpStatus.OK.code(), response.getStatusCode(), response.getStatusMessage()));
                    }
                    this.callback.fatal(cause);
                }
            }
            catch (Throwable t) {
                this.callback.failed(t);
            }
        }

        public void onFailure(Throwable t) {
            this.callback.failed(t);
        }
    }

    private class ContinuousTaskInfoFetcher
    implements SimpleHttpResponseCallback<TaskInfo> {
        @GuardedBy(value="this")
        private boolean running;
        @GuardedBy(value="this")
        private ListenableFuture<FullJsonResponseHandler.JsonResponse<TaskInfo>> future;

        private ContinuousTaskInfoFetcher() {
        }

        public synchronized void start() {
            if (this.running) {
                return;
            }
            this.running = true;
            this.scheduleNextRequest();
        }

        public synchronized void stop() {
            this.running = false;
            if (this.future != null) {
                this.future.cancel(true);
                this.future = null;
            }
        }

        private synchronized void scheduleNextRequest() {
            try (SetThreadName setThreadName = new SetThreadName("ContinuousTaskInfoFetcher-%s", new Object[]{HttpRemoteTask.this.taskId});){
                TaskInfo taskInfo = (TaskInfo)HttpRemoteTask.this.taskInfo.get();
                if (!this.running || taskInfo.getState().isDone()) {
                    return;
                }
                if (this.future != null && !this.future.isDone()) {
                    log.error("Can not reschedule update because an update is already running");
                    return;
                }
                Request request = Request.Builder.prepareGet().setUri(taskInfo.getSelf()).setHeader("Content-Type", MediaType.JSON_UTF_8.toString()).setHeader("X-Presto-Current-State", taskInfo.getState().toString()).setHeader("X-Presto-Max-Wait", "200ms").build();
                this.future = HttpRemoteTask.this.httpClient.executeAsync(request, (ResponseHandler)FullJsonResponseHandler.createFullJsonResponseHandler((JsonCodec)HttpRemoteTask.this.taskInfoCodec));
                Futures.addCallback(this.future, new SimpleHttpResponseHandler<TaskInfo>(this, request.getUri()), (Executor)HttpRemoteTask.this.executor);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void success(TaskInfo value) {
            try (SetThreadName setThreadName = new SetThreadName("ContinuousTaskInfoFetcher-%s", new Object[]{HttpRemoteTask.this.taskId});){
                ContinuousTaskInfoFetcher continuousTaskInfoFetcher = this;
                synchronized (continuousTaskInfoFetcher) {
                    this.future = null;
                }
                try {
                    HttpRemoteTask.this.requestSucceeded(value, (List)ImmutableList.of());
                }
                finally {
                    this.scheduleNextRequest();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void failed(Throwable cause) {
            try (SetThreadName setThreadName = new SetThreadName("ContinuousTaskInfoFetcher-%s", new Object[]{HttpRemoteTask.this.taskId});){
                ContinuousTaskInfoFetcher continuousTaskInfoFetcher = this;
                synchronized (continuousTaskInfoFetcher) {
                    this.future = null;
                }
                try {
                    HttpRemoteTask.this.requestFailed(cause);
                }
                finally {
                    this.scheduleNextRequest();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void fatal(Throwable cause) {
            try (SetThreadName setThreadName = new SetThreadName("ContinuousTaskInfoFetcher-%s", new Object[]{HttpRemoteTask.this.taskId});){
                ContinuousTaskInfoFetcher continuousTaskInfoFetcher = this;
                synchronized (continuousTaskInfoFetcher) {
                    this.future = null;
                }
                HttpRemoteTask.this.failTask(cause);
            }
        }
    }

    private class UpdateResponseHandler
    implements SimpleHttpResponseCallback<TaskInfo> {
        private final List<TaskSource> sources;

        private UpdateResponseHandler(List<TaskSource> sources) {
            this.sources = ImmutableList.copyOf((Collection)((Collection)Preconditions.checkNotNull(sources, (Object)"sources is null")));
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void success(TaskInfo value) {
            try (SetThreadName setThreadName = new SetThreadName("UpdateResponseHandler-%s", new Object[]{HttpRemoteTask.this.taskId});){
                try {
                    HttpRemoteTask.this.requestSucceeded(value, this.sources);
                }
                finally {
                    HttpRemoteTask.this.scheduleUpdate();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void failed(Throwable cause) {
            try (SetThreadName setThreadName = new SetThreadName("UpdateResponseHandler-%s", new Object[]{HttpRemoteTask.this.taskId});){
                try {
                    HttpRemoteTask.this.needsUpdate.set(true);
                    HttpRemoteTask.this.requestFailed(cause);
                }
                finally {
                    HttpRemoteTask.this.scheduleUpdate();
                }
            }
        }

        @Override
        public void fatal(Throwable cause) {
            try (SetThreadName setThreadName = new SetThreadName("UpdateResponseHandler-%s", new Object[]{HttpRemoteTask.this.taskId});){
                HttpRemoteTask.this.failTask(cause);
            }
        }
    }
}

