/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.spark.execution.nativeprocess;

import com.facebook.airlift.log.Logger;
import com.facebook.presto.operator.PageBufferClient;
import com.facebook.presto.server.RequestErrorTracker;
import com.facebook.presto.spark.execution.http.PrestoSparkHttpTaskClient;
import com.facebook.presto.spi.ErrorCodeSupplier;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.StandardErrorCode;
import com.facebook.presto.spi.page.PagesSerdeUtil;
import com.facebook.presto.spi.page.SerializedPage;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import java.net.URI;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

public class HttpNativeExecutionTaskResultFetcher {
    private static final Logger log = Logger.get(HttpNativeExecutionTaskResultFetcher.class);
    private static final Duration FETCH_INTERVAL = new Duration(200.0, TimeUnit.MILLISECONDS);
    private static final Duration POLL_TIMEOUT = new Duration(100.0, TimeUnit.MILLISECONDS);
    private static final DataSize MAX_RESPONSE_SIZE = new DataSize(32.0, DataSize.Unit.MEGABYTE);
    private static final DataSize MAX_BUFFER_SIZE = new DataSize(128.0, DataSize.Unit.MEGABYTE);
    private static final String TASK_ERROR_MESSAGE = "TaskResultsFetcher encountered too many errors talking to native process.";
    private final Executor executor;
    private final ScheduledExecutorService scheduler;
    private final PrestoSparkHttpTaskClient workerClient;
    private final LinkedBlockingDeque<SerializedPage> pageBuffer = new LinkedBlockingDeque();
    private final AtomicLong bufferMemoryBytes;
    private final Object taskHasResult;
    private final Duration maxErrorDuration;
    private final RequestErrorTracker errorTracker;
    private final AtomicReference<RuntimeException> lastException = new AtomicReference();
    private ScheduledFuture<?> scheduledFuture;
    private long token;

    public HttpNativeExecutionTaskResultFetcher(ScheduledExecutorService scheduler, ScheduledExecutorService errorRetryScheduledExecutor, PrestoSparkHttpTaskClient workerClient, Executor executor, Duration maxErrorDuration, Object taskHasResult) {
        this.executor = Objects.requireNonNull(executor, "executor is null");
        this.scheduler = Objects.requireNonNull(scheduler, "scheduler is null");
        this.workerClient = Objects.requireNonNull(workerClient, "workerClient is null");
        this.bufferMemoryBytes = new AtomicLong();
        this.taskHasResult = Objects.requireNonNull(taskHasResult, "taskHasResult is null");
        this.maxErrorDuration = Objects.requireNonNull(maxErrorDuration, "maxErrorDuration is null");
        this.errorTracker = new RequestErrorTracker((Object)"NativeExecution", workerClient.getLocation(), (ErrorCodeSupplier)StandardErrorCode.NATIVE_EXECUTION_TASK_ERROR, TASK_ERROR_MESSAGE, maxErrorDuration, Objects.requireNonNull(errorRetryScheduledExecutor, "errorRetryScheduledExecutor is null"), "getting results from native process");
    }

    public void start() {
        this.scheduledFuture = this.scheduler.scheduleAtFixedRate(this::doGetResults, 0L, (long)FETCH_INTERVAL.getValue(), FETCH_INTERVAL.getUnit());
    }

    public void stop(boolean success) {
        if (this.scheduledFuture != null) {
            this.scheduledFuture.cancel(false);
        }
        if (success && !this.pageBuffer.isEmpty()) {
            throw new PrestoException((ErrorCodeSupplier)StandardErrorCode.GENERIC_INTERNAL_ERROR, String.format("TaskResultFetcher is closed with %s pages left in the buffer", this.pageBuffer.size()));
        }
    }

    public Optional<SerializedPage> pollPage() throws InterruptedException {
        if (this.scheduledFuture != null && this.scheduledFuture.isCancelled() && this.lastException.get() != null) {
            throw this.lastException.get();
        }
        SerializedPage page = this.pageBuffer.poll((long)POLL_TIMEOUT.getValue(), POLL_TIMEOUT.getUnit());
        if (page != null) {
            this.bufferMemoryBytes.addAndGet(-page.getSizeInBytes());
            return Optional.of(page);
        }
        return Optional.empty();
    }

    public boolean hasPage() {
        if (this.scheduledFuture != null && this.scheduledFuture.isCancelled() && this.lastException.get() != null) {
            throw this.lastException.get();
        }
        return !this.pageBuffer.isEmpty();
    }

    private void doGetResults() {
        if (this.bufferMemoryBytes.longValue() >= MAX_BUFFER_SIZE.toBytes()) {
            return;
        }
        try {
            PageBufferClient.PagesResponse pagesResponse = (PageBufferClient.PagesResponse)this.workerClient.getResults(this.token, MAX_RESPONSE_SIZE).get();
            this.onSuccess(pagesResponse);
        }
        catch (Throwable t) {
            this.onFailure(t);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void onSuccess(PageBufferClient.PagesResponse pagesResponse) {
        this.errorTracker.requestSucceeded();
        List pages = pagesResponse.getPages();
        long bytes = 0L;
        long positionCount = 0L;
        for (SerializedPage page : pages) {
            if (!PagesSerdeUtil.isChecksumValid((SerializedPage)page)) {
                throw new PrestoException((ErrorCodeSupplier)StandardErrorCode.SERIALIZED_PAGE_CHECKSUM_ERROR, String.format("Received corrupted serialized page from host %s", HostAddress.fromUri((URI)this.workerClient.getLocation())));
            }
            bytes += (long)page.getSizeInBytes();
            positionCount += (long)page.getPositionCount();
        }
        log.info("Received %s rows in %s pages from %s", new Object[]{positionCount, pages.size(), this.workerClient.getTaskUri()});
        this.pageBuffer.addAll(pages);
        this.bufferMemoryBytes.addAndGet(bytes);
        long nextToken = pagesResponse.getNextToken();
        if (pages.size() > 0) {
            this.workerClient.acknowledgeResultsAsync(nextToken);
        }
        this.token = nextToken;
        if (pagesResponse.isClientComplete()) {
            this.workerClient.abortResults();
            this.scheduledFuture.cancel(false);
        }
        if (!pages.isEmpty()) {
            Object object = this.taskHasResult;
            synchronized (object) {
                this.taskHasResult.notifyAll();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void onFailure(Throwable t) {
        try {
            this.errorTracker.requestFailed(t);
        }
        catch (PrestoException e) {
            this.workerClient.abortResults();
            this.stop(false);
            this.lastException.set((RuntimeException)((Object)e));
            Object object = this.taskHasResult;
            synchronized (object) {
                this.taskHasResult.notifyAll();
            }
            return;
        }
        ListenableFuture errorRateLimit = this.errorTracker.acquireRequestPermit();
        try {
            errorRateLimit.get(this.maxErrorDuration.toMillis(), TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            log.debug(e.getMessage());
        }
    }
}

