/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rest.handler.taskmanager;

import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.blob.TransientBlobKey;
import org.apache.flink.runtime.blob.TransientBlobService;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException;
import org.apache.flink.runtime.rest.NotFoundException;
import org.apache.flink.runtime.rest.handler.AbstractHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
import org.apache.flink.shaded.guava31.com.google.common.cache.CacheLoader;
import org.apache.flink.shaded.guava31.com.google.common.cache.LoadingCache;
import org.apache.flink.shaded.guava31.com.google.common.cache.RemovalNotification;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;

public abstract class AbstractTaskManagerFileHandler<M extends TaskManagerMessageParameters>
extends AbstractHandler<RestfulGateway, EmptyRequestBody, M> {
    private final GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever;
    private final TransientBlobService transientBlobService;
    private final LoadingCache<Tuple2<ResourceID, String>, CompletableFuture<TransientBlobKey>> fileBlobKeys;

    protected AbstractTaskManagerFileHandler(@Nonnull GatewayRetriever<? extends RestfulGateway> leaderRetriever, @Nonnull Time timeout, @Nonnull Map<String, String> responseHeaders, @Nonnull UntypedResponseMessageHeaders<EmptyRequestBody, M> untypedResponseMessageHeaders, @Nonnull GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever, @Nonnull TransientBlobService transientBlobService, @Nonnull Time cacheEntryDuration) {
        super(leaderRetriever, timeout, responseHeaders, untypedResponseMessageHeaders);
        this.resourceManagerGatewayRetriever = Preconditions.checkNotNull(resourceManagerGatewayRetriever);
        this.transientBlobService = Preconditions.checkNotNull(transientBlobService);
        this.fileBlobKeys = CacheBuilder.newBuilder().expireAfterWrite(cacheEntryDuration.toMilliseconds(), TimeUnit.MILLISECONDS).removalListener(this::removeBlob).build(new CacheLoader<Tuple2<ResourceID, String>, CompletableFuture<TransientBlobKey>>(){

            @Override
            public CompletableFuture<TransientBlobKey> load(Tuple2<ResourceID, String> taskManagerIdAndFileName) throws Exception {
                return AbstractTaskManagerFileHandler.this.loadTaskManagerFile(taskManagerIdAndFileName);
            }
        });
    }

    @Override
    protected CompletableFuture<Void> respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, HandlerRequest<EmptyRequestBody> handlerRequest, RestfulGateway gateway) throws RestHandlerException {
        CompletableFuture<TransientBlobKey> blobKeyFuture;
        ResourceID taskManagerId = (ResourceID)handlerRequest.getPathParameter(TaskManagerIdPathParameter.class);
        String filename = this.getFileName(handlerRequest);
        Tuple2<ResourceID, String> taskManagerIdAndFileName = new Tuple2<ResourceID, String>(taskManagerId, filename);
        try {
            blobKeyFuture = this.fileBlobKeys.get(taskManagerIdAndFileName);
        }
        catch (ExecutionException e) {
            Throwable cause = ExceptionUtils.stripExecutionException(e);
            throw new RestHandlerException("Could not retrieve file blob key future.", HttpResponseStatus.INTERNAL_SERVER_ERROR, cause);
        }
        CompletionStage resultFuture = blobKeyFuture.thenAcceptAsync(blobKey -> {
            File file;
            try {
                file = this.transientBlobService.getFile((TransientBlobKey)blobKey);
            }
            catch (IOException e) {
                throw new CompletionException(new FlinkException("Could not retrieve file from transient blob store.", e));
            }
            try {
                HandlerUtils.transferFile(ctx, file, httpRequest);
            }
            catch (FlinkException e) {
                throw new CompletionException(new FlinkException("Could not transfer file to client.", e));
            }
        }, (Executor)ctx.executor());
        return ((CompletableFuture)((CompletableFuture)resultFuture).handle((ignored, throwable) -> {
            if (throwable != null) {
                return this.handleException(ctx, httpRequest, (Throwable)throwable, taskManagerId);
            }
            return CompletableFuture.completedFuture(null);
        })).thenCompose(Function.identity());
    }

    private CompletableFuture<TransientBlobKey> loadTaskManagerFile(Tuple2<ResourceID, String> taskManagerIdAndFileName) throws RestHandlerException {
        this.log.debug("Load file from TaskManager {}.", taskManagerIdAndFileName.f0);
        ResourceManagerGateway resourceManagerGateway = this.resourceManagerGatewayRetriever.getNow().orElseThrow(() -> {
            this.log.debug("Could not connect to ResourceManager right now.");
            return new RestHandlerException("Cannot connect to ResourceManager right now. Please try to refresh.", HttpResponseStatus.NOT_FOUND);
        });
        return this.requestFileUpload(resourceManagerGateway, taskManagerIdAndFileName);
    }

    protected abstract CompletableFuture<TransientBlobKey> requestFileUpload(ResourceManagerGateway var1, Tuple2<ResourceID, String> var2);

    private void removeBlob(RemovalNotification<Tuple2<ResourceID, String>, CompletableFuture<TransientBlobKey>> removalNotification) {
        this.log.debug("Remove cached file for TaskExecutor {}.", removalNotification.getKey());
        CompletableFuture value = (CompletableFuture)removalNotification.getValue();
        if (value != null) {
            value.thenAccept(this.transientBlobService::deleteFromCache);
        }
    }

    protected String getFileName(HandlerRequest<EmptyRequestBody> handlerRequest) {
        return null;
    }

    protected CompletableFuture<Void> handleException(ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest, Throwable throwable, ResourceID taskManagerId) {
        this.log.error("Failed to transfer file from TaskExecutor {}.", (Object)taskManagerId, (Object)throwable);
        this.fileBlobKeys.invalidate(taskManagerId);
        Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
        if (strippedThrowable instanceof UnknownTaskExecutorException) {
            throw new CompletionException(new NotFoundException(String.format("Failed to transfer file from TaskExecutor %s because it was unknown.", taskManagerId), strippedThrowable));
        }
        throw new CompletionException(new FlinkException(String.format("Failed to transfer file from TaskExecutor %s.", taskManagerId), strippedThrowable));
    }
}

