/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rest.handler.legacy;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.net.URLDecoder;
import java.nio.channels.FileChannel;
import java.util.HashMap;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.TransientBlobCache;
import org.apache.flink.runtime.blob.TransientBlobKey;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.rest.handler.RedirectHandler;
import org.apache.flink.runtime.rest.handler.WebHandler;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPromise;
import org.apache.flink.shaded.netty4.io.netty.channel.DefaultFileRegion;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpChunkedInput;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMessage;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedFile;
import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedInput;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.GenericFutureListener;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ChannelHandler.Sharable
public class TaskManagerLogHandler
extends RedirectHandler<JobManagerGateway>
implements WebHandler {
    private static final Logger LOG = LoggerFactory.getLogger(TaskManagerLogHandler.class);
    private static final String TASKMANAGER_LOG_REST_PATH = "/taskmanagers/:taskmanagerid/log";
    private static final String TASKMANAGER_OUT_REST_PATH = "/taskmanagers/:taskmanagerid/stdout";
    private final HashMap<String, TransientBlobKey> lastSubmittedLog = new HashMap();
    private final HashMap<String, TransientBlobKey> lastSubmittedStdout = new HashMap();
    private final ConcurrentHashMap<String, Boolean> lastRequestPending = new ConcurrentHashMap();
    private final Configuration config;
    private CompletableFuture<TransientBlobCache> cache;
    private FileMode fileMode;
    private final Executor executor;

    public TaskManagerLogHandler(GatewayRetriever<JobManagerGateway> retriever, Executor executor, CompletableFuture<String> localJobManagerAddressPromise, Time timeout, FileMode fileMode, Configuration config) {
        super(localJobManagerAddressPromise, retriever, timeout);
        this.executor = (Executor)Preconditions.checkNotNull((Object)executor);
        this.config = config;
        this.fileMode = fileMode;
    }

    @Override
    public String[] getPaths() {
        switch (this.fileMode) {
            case LOG: {
                return new String[]{TASKMANAGER_LOG_REST_PATH};
            }
        }
        return new String[]{TASKMANAGER_OUT_REST_PATH};
    }

    @Override
    protected void respondAsLeader(ChannelHandlerContext ctx, Routed routed, JobManagerGateway jobManagerGateway) {
        if (this.cache == null) {
            CompletableFuture<Integer> blobPortFuture = jobManagerGateway.requestBlobServerPort(this.timeout);
            this.cache = blobPortFuture.thenApplyAsync(port -> {
                try {
                    return new TransientBlobCache(new InetSocketAddress(jobManagerGateway.getHostname(), (int)port), this.config);
                }
                catch (IOException e) {
                    throw new CompletionException(new FlinkException("Could not create TransientBlobCache.", (Throwable)e));
                }
            }, this.executor);
        }
        String taskManagerId = (String)routed.pathParams().get("taskmanagerid");
        HttpRequest request = routed.request();
        if (this.lastRequestPending.putIfAbsent(taskManagerId, true) == null) {
            try {
                String unescapedString;
                try {
                    unescapedString = URLDecoder.decode(taskManagerId, "UTF-8");
                }
                catch (UnsupportedEncodingException e) {
                    throw new FlinkException("Could not decode task manager id: " + taskManagerId + '.', (Throwable)e);
                }
                ResourceID resourceId = new ResourceID(unescapedString);
                CompletableFuture<Optional<Instance>> taskManagerFuture = jobManagerGateway.requestTaskManagerInstance(resourceId, this.timeout);
                CompletionStage blobKeyFuture = taskManagerFuture.thenCompose(optTMInstance -> {
                    Instance taskManagerInstance = (Instance)optTMInstance.orElseThrow(() -> new CompletionException(new FlinkException("Could not find instance with " + resourceId + '.')));
                    switch (this.fileMode) {
                        case LOG: {
                            return taskManagerInstance.getTaskManagerGateway().requestTaskManagerLog(this.timeout);
                        }
                    }
                    return taskManagerInstance.getTaskManagerGateway().requestTaskManagerStdout(this.timeout);
                });
                CompletionStage logPathFuture = ((CompletableFuture)blobKeyFuture).thenCombineAsync(this.cache, (blobKey, blobCache) -> {
                    HashMap<String, TransientBlobKey> lastSubmittedFile;
                    HashMap<String, TransientBlobKey> hashMap = lastSubmittedFile = this.fileMode == FileMode.LOG ? this.lastSubmittedLog : this.lastSubmittedStdout;
                    if (lastSubmittedFile.containsKey(taskManagerId)) {
                        if (!Objects.equals(blobKey, lastSubmittedFile.get(taskManagerId))) {
                            if (!blobCache.deleteFromCache(lastSubmittedFile.get(taskManagerId))) {
                                throw new CompletionException(new FlinkException("Could not delete file for " + taskManagerId + '.'));
                            }
                            lastSubmittedFile.put(taskManagerId, (TransientBlobKey)blobKey);
                        }
                    } else {
                        lastSubmittedFile.put(taskManagerId, (TransientBlobKey)blobKey);
                    }
                    try {
                        return blobCache.getFile((TransientBlobKey)blobKey).getAbsolutePath();
                    }
                    catch (IOException e) {
                        throw new CompletionException(new FlinkException("Could not retrieve blob for " + blobKey + '.', (Throwable)e));
                    }
                }, this.executor);
                ((CompletableFuture)logPathFuture).exceptionally(failure -> {
                    this.display(ctx, request, "Fetching TaskManager log failed.");
                    LOG.error("Fetching TaskManager log failed.", failure);
                    this.lastRequestPending.remove(taskManagerId);
                    return null;
                });
                ((CompletableFuture)logPathFuture).thenAccept(filePath -> {
                    ChannelFuture lastContentFuture;
                    long fileLength;
                    RandomAccessFile raf;
                    File file = new File((String)filePath);
                    try {
                        raf = new RandomAccessFile(file, "r");
                    }
                    catch (FileNotFoundException e) {
                        this.display(ctx, request, "Displaying TaskManager log failed.");
                        LOG.error("Displaying TaskManager log failed.", (Throwable)e);
                        return;
                    }
                    try {
                        fileLength = raf.length();
                    }
                    catch (IOException ioe) {
                        this.display(ctx, request, "Displaying TaskManager log failed.");
                        LOG.error("Displaying TaskManager log failed.", (Throwable)ioe);
                        try {
                            raf.close();
                        }
                        catch (IOException e) {
                            LOG.error("Could not close random access file.", (Throwable)e);
                        }
                        return;
                    }
                    FileChannel fc = raf.getChannel();
                    DefaultHttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
                    response.headers().set("Content-Type", (Object)"text/plain");
                    if (HttpHeaders.isKeepAlive((HttpMessage)request)) {
                        response.headers().set("Connection", (Object)"keep-alive");
                    }
                    HttpHeaders.setContentLength((HttpMessage)response, (long)fileLength);
                    ctx.write((Object)response);
                    GenericFutureListener completionListener = future -> {
                        this.lastRequestPending.remove(taskManagerId);
                        fc.close();
                        raf.close();
                    };
                    if (ctx.pipeline().get(SslHandler.class) == null) {
                        ctx.write((Object)new DefaultFileRegion(fc, 0L, fileLength), (ChannelPromise)ctx.newProgressivePromise()).addListener(completionListener);
                        lastContentFuture = ctx.writeAndFlush((Object)LastHttpContent.EMPTY_LAST_CONTENT);
                    } else {
                        try {
                            lastContentFuture = ctx.writeAndFlush((Object)new HttpChunkedInput((ChunkedInput)new ChunkedFile(raf, 0L, fileLength, 8192)), (ChannelPromise)ctx.newProgressivePromise()).addListener(completionListener);
                        }
                        catch (IOException e) {
                            this.display(ctx, request, "Displaying TaskManager log failed.");
                            LOG.warn("Could not write http data.", (Throwable)e);
                            return;
                        }
                    }
                    if (!HttpHeaders.isKeepAlive((HttpMessage)request)) {
                        lastContentFuture.addListener((GenericFutureListener)ChannelFutureListener.CLOSE);
                    }
                });
            }
            catch (Exception e) {
                this.display(ctx, request, "Error: " + e.getMessage());
                LOG.error("Fetching TaskManager log failed.", (Throwable)e);
                this.lastRequestPending.remove(taskManagerId);
            }
        } else {
            this.display(ctx, request, "loading...");
        }
    }

    private void display(ChannelHandlerContext ctx, HttpRequest request, String message) {
        DefaultHttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
        response.headers().set("Content-Type", (Object)"text/plain");
        if (HttpHeaders.isKeepAlive((HttpMessage)request)) {
            response.headers().set("Connection", (Object)"keep-alive");
        }
        byte[] buf = message.getBytes(ConfigConstants.DEFAULT_CHARSET);
        ByteBuf b = Unpooled.copiedBuffer((byte[])buf);
        HttpHeaders.setContentLength((HttpMessage)response, (long)buf.length);
        ctx.write((Object)response);
        ctx.write((Object)b);
        ChannelFuture lastContentFuture = ctx.writeAndFlush((Object)LastHttpContent.EMPTY_LAST_CONTENT);
        if (!HttpHeaders.isKeepAlive((HttpMessage)request)) {
            lastContentFuture.addListener((GenericFutureListener)ChannelFutureListener.CLOSE);
        }
    }

    public static enum FileMode {
        LOG,
        STDOUT;

    }
}

