/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.server;

import com.facebook.airlift.concurrent.BoundedExecutor;
import com.facebook.airlift.concurrent.MoreFutures;
import com.facebook.airlift.log.Logger;
import com.facebook.airlift.stats.TimeStat;
import com.facebook.airlift.units.DataSize;
import com.facebook.airlift.units.Duration;
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.execution.TaskManager;
import com.facebook.presto.execution.buffer.BufferInfo;
import com.facebook.presto.execution.buffer.BufferResult;
import com.facebook.presto.execution.buffer.OutputBuffers;
import com.facebook.presto.execution.buffer.PageBufferInfo;
import com.facebook.presto.operator.ExchangeClientConfig;
import com.facebook.presto.server.ForAsyncRpc;
import com.facebook.presto.server.SerializedPageWriteListener;
import com.facebook.presto.spi.page.SerializedPage;
import com.facebook.presto.util.TaskUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import jakarta.annotation.security.RolesAllowed;
import jakarta.inject.Inject;
import jakarta.servlet.AsyncContext;
import jakarta.servlet.AsyncEvent;
import jakarta.servlet.AsyncListener;
import jakarta.servlet.ServletOutputStream;
import jakarta.servlet.ServletRequest;
import jakarta.servlet.ServletResponse;
import jakarta.servlet.WriteListener;
import jakarta.servlet.http.HttpServlet;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.Enumeration;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;

@RolesAllowed(value={"internal"})
public class AsyncPageTransportServlet
extends HttpServlet {
    private static final Logger log = Logger.get(AsyncPageTransportServlet.class);
    private final Duration pageTransportTimeout;
    private final TaskManager taskManager;
    private final Executor responseExecutor;
    private final ScheduledExecutorService timeoutExecutor;
    private final TimeStat readFromOutputBufferTime = new TimeStat();
    private final TimeStat resultsRequestTime = new TimeStat();

    @Inject
    public AsyncPageTransportServlet(TaskManager taskManager, ExchangeClientConfig exchangeClientConfig, @ForAsyncRpc BoundedExecutor responseExecutor, @ForAsyncRpc ScheduledExecutorService timeoutExecutor) {
        this.taskManager = Objects.requireNonNull(taskManager, "taskManager is null");
        this.pageTransportTimeout = Objects.requireNonNull(exchangeClientConfig.getAsyncPageTransportTimeout(), "asyncPageTransportTimeout is null");
        this.responseExecutor = (Executor)Objects.requireNonNull(responseExecutor, "responseExecutor is null");
        this.timeoutExecutor = Objects.requireNonNull(timeoutExecutor, "timeoutExecutor is null");
    }

    @VisibleForTesting
    protected AsyncPageTransportServlet() {
        this.taskManager = null;
        this.pageTransportTimeout = null;
        this.responseExecutor = null;
        this.timeoutExecutor = null;
    }

    protected void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
        this.parseURI(request.getRequestURI(), request, response);
    }

    protected void reportFailure(HttpServletResponse response, String message) throws IOException {
        response.sendError(400, message);
    }

    protected void parseURI(String requestURI, HttpServletRequest request, HttpServletResponse response) throws IOException {
        TaskId taskId = null;
        OutputBuffers.OutputBufferId bufferId = null;
        long token = 0L;
        if (request != null) {
            Enumeration headerNames = request.getHeaderNames();
            while (headerNames.hasMoreElements()) {
                String headerName = (String)headerNames.nextElement();
                String headerValue = request.getHeader(headerName);
                if (headerName.contains("\r") || headerName.contains("\n")) {
                    throw new IllegalArgumentException(String.format("Invalid header name: %s", headerName));
                }
                if (!headerValue.contains("\r") && !headerValue.contains("\n")) continue;
                throw new IllegalArgumentException(String.format("Invalid header value: %s", headerValue));
            }
        }
        int previousIndex = -1;
        for (int part = 0; part < 8; ++part) {
            int nextIndex = requestURI.indexOf(47, previousIndex + 1);
            if (nextIndex == -1 && part != 7 || nextIndex != -1 && part == 7) {
                this.reportFailure(response, String.format("Unexpected URI for task result request in async mode: %s", requestURI));
                return;
            }
            switch (part) {
                case 4: {
                    taskId = TaskId.valueOf((String)requestURI.substring(previousIndex + 1, nextIndex));
                    break;
                }
                case 6: {
                    bufferId = OutputBuffers.OutputBufferId.fromString((String)requestURI.substring(previousIndex + 1, nextIndex));
                    break;
                }
                case 7: {
                    token = Long.parseLong(requestURI.substring(previousIndex + 1));
                }
            }
            previousIndex = nextIndex;
        }
        this.processRequest(requestURI, taskId, bufferId, token, request, response);
    }

    protected void processRequest(final String requestURI, TaskId taskId, OutputBuffers.OutputBufferId bufferId, long token, HttpServletRequest request, final HttpServletResponse response) throws IOException {
        final long start = System.nanoTime();
        DataSize maxSize = DataSize.valueOf((String)request.getHeader("X-Presto-Max-Size"));
        final AsyncContext asyncContext = request.startAsync((ServletRequest)request, (ServletResponse)response);
        Duration waitTime = TaskUtils.randomizeWaitTime((Duration)TaskUtils.DEFAULT_MAX_WAIT_TIME);
        asyncContext.setTimeout(waitTime.toMillis() + this.pageTransportTimeout.toMillis());
        asyncContext.addListener(new AsyncListener(){

            public void onComplete(AsyncEvent event) {
                AsyncPageTransportServlet.this.resultsRequestTime.add(Duration.nanosSince((long)start));
            }

            public void onError(AsyncEvent event) throws IOException {
                String errorMessage = String.format("Server error to process task result request %s : %s", requestURI, event.getThrowable().getMessage());
                log.error(event.getThrowable(), errorMessage);
                response.sendError(500, errorMessage);
            }

            public void onStartAsync(AsyncEvent event) {
            }

            public void onTimeout(AsyncEvent event) throws IOException {
                String errorMessage = String.format("Server timeout to process task result request: %s", requestURI);
                log.error(event.getThrowable(), errorMessage);
                response.sendError(500, errorMessage);
            }
        });
        ListenableFuture bufferResultFuture = this.taskManager.getTaskResults(taskId, bufferId, token, maxSize.toBytes());
        bufferResultFuture = MoreFutures.addTimeout((ListenableFuture)bufferResultFuture, () -> BufferResult.emptyResults((String)this.taskManager.getTaskInstanceId(taskId), (long)token, (long)this.taskManager.getOutputBufferInfo(taskId).getBuffers().stream().filter(bufferInfo -> bufferInfo.getBufferId().equals((Object)bufferId)).map(BufferInfo::getPageBufferInfo).map(PageBufferInfo::getBufferedBytes).findFirst().orElse(0L), (boolean)false), (Duration)waitTime, (ScheduledExecutorService)this.timeoutExecutor);
        bufferResultFuture.addListener(() -> this.readFromOutputBufferTime.add(Duration.nanosSince((long)start)), MoreExecutors.directExecutor());
        final ServletOutputStream out = response.getOutputStream();
        Futures.addCallback((ListenableFuture)bufferResultFuture, (FutureCallback)new FutureCallback<BufferResult>(){

            public void onSuccess(BufferResult bufferResult) {
                response.setHeader("Content-Type", "application/X-presto-pages");
                response.setHeader("X-Presto-Task-Instance-Id", bufferResult.getTaskInstanceId());
                response.setHeader("X-Presto-Page-Sequence-Id", String.valueOf(bufferResult.getToken()));
                response.setHeader("X-Presto-Page-End-Sequence-Id", String.valueOf(bufferResult.getNextToken()));
                response.setHeader("X-Presto-Buffer-Complete", String.valueOf(bufferResult.isBufferComplete()));
                response.setHeader("X-Presto-Buffer-Remaining-Bytes", String.valueOf(bufferResult.getBufferedBytes()));
                List serializedPages = bufferResult.getSerializedPages();
                if (serializedPages.isEmpty()) {
                    response.setStatus(204);
                    asyncContext.complete();
                } else {
                    int contentLength = serializedPages.size() * 21 + serializedPages.stream().mapToInt(SerializedPage::getSizeInBytes).sum();
                    response.setHeader("Content-Length", String.valueOf(contentLength));
                    out.setWriteListener((WriteListener)new SerializedPageWriteListener(serializedPages, asyncContext, out));
                }
            }

            public void onFailure(Throwable thrown) {
                String errorMessage = String.format("Error getting task result from TaskManager for request %s : %s", requestURI, thrown.getMessage());
                log.error(thrown, errorMessage);
                try {
                    response.sendError(500, errorMessage);
                }
                catch (IOException e) {
                    log.error((Throwable)e, "Failed to send response with error code: %s", new Object[]{e.getMessage()});
                }
                asyncContext.complete();
            }
        }, (Executor)this.responseExecutor);
    }

    @Managed
    @Nested
    public TimeStat getReadFromOutputBufferTime() {
        return this.readFromOutputBufferTime;
    }

    @Managed
    @Nested
    public TimeStat getResultsRequestTime() {
        return this.resultsRequestTime;
    }
}

