/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.cluster.coordination.http.replication;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.http.replication.RequestReplicationHeader;
import org.apache.nifi.cluster.coordination.http.replication.UploadRequest;
import org.apache.nifi.cluster.coordination.http.replication.UploadRequestReplicationException;
import org.apache.nifi.cluster.coordination.http.replication.UploadRequestReplicator;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.file.FileUtils;
import org.apache.nifi.web.client.StandardHttpUriBuilder;
import org.apache.nifi.web.client.api.HttpRequestBodySpec;
import org.apache.nifi.web.client.api.HttpResponseEntity;
import org.apache.nifi.web.client.api.WebClientService;
import org.apache.nifi.web.security.ProxiedEntitiesUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StandardUploadRequestReplicator
implements UploadRequestReplicator {
    private static final Logger logger = LoggerFactory.getLogger(StandardUploadRequestReplicator.class);
    private static final ObjectMapper objectMapper = new ObjectMapper();
    private final ClusterCoordinator clusterCoordinator;
    private final WebClientService webClientService;
    private final File uploadWorkingDirectory;

    public StandardUploadRequestReplicator(ClusterCoordinator clusterCoordinator, WebClientService webClientService, NiFiProperties properties) throws IOException {
        this.clusterCoordinator = Objects.requireNonNull(clusterCoordinator, "Cluster Coordinator is required");
        this.webClientService = Objects.requireNonNull(webClientService, "Web Client Service is required");
        this.uploadWorkingDirectory = properties.getUploadWorkingDirectory();
        FileUtils.ensureDirectoryExistAndCanAccess((File)this.uploadWorkingDirectory);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public <T> T upload(UploadRequest<T> uploadRequest) throws IOException {
        String filename = uploadRequest.getFilename();
        File tempFile = new File(this.uploadWorkingDirectory, UUID.randomUUID().toString());
        logger.debug("Created temporary file {} to hold contents of upload for {}", (Object)tempFile.getAbsolutePath(), (Object)filename);
        try {
            Files.copy(uploadRequest.getContents(), tempFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
            Set nodeIds = this.clusterCoordinator.getNodeIdentifiers(new NodeConnectionState[0]);
            HashMap<NodeIdentifier, Future<T>> futures = new HashMap<NodeIdentifier, Future<T>>();
            for (NodeIdentifier nodeId : nodeIds) {
                Future<T> future = this.performUploadAsync(nodeId, uploadRequest, tempFile);
                futures.put(nodeId, future);
            }
            Object responseEntity = null;
            for (Map.Entry entry : futures.entrySet()) {
                NodeIdentifier nodeId = (NodeIdentifier)entry.getKey();
                Future future = (Future)entry.getValue();
                try {
                    responseEntity = future.get();
                    logger.debug("Node {} successfully processed upload for {}", (Object)nodeId, (Object)filename);
                }
                catch (ExecutionException ee) {
                    Throwable cause = ee.getCause();
                    if (cause instanceof UploadRequestReplicationException) {
                        throw (UploadRequestReplicationException)cause;
                    }
                    throw new IOException("Failed to replicate upload request to " + String.valueOf(nodeId), ee.getCause());
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new IOException("Interrupted while waiting for upload request to replicate to " + String.valueOf(nodeId), e);
                }
                catch (UploadRequestReplicationException e) {
                    throw e;
                }
                catch (Exception e) {
                    throw new IOException("Failed to replicate upload request to " + String.valueOf(nodeId), e);
                }
            }
            Iterator iterator = responseEntity;
            return (T)iterator;
        }
        finally {
            boolean successfulDelete = tempFile.delete();
            if (successfulDelete) {
                logger.debug("Deleted temporary file {} that was created to hold contents of upload for {}", (Object)tempFile.getAbsolutePath(), (Object)filename);
            } else {
                logger.warn("Failed to delete temporary file {}. This file should be cleaned up manually", (Object)tempFile.getAbsolutePath());
            }
        }
    }

    private <T> Future<T> performUploadAsync(NodeIdentifier nodeId, UploadRequest<T> uploadRequest, File contents) {
        CompletableFuture future = new CompletableFuture();
        Thread.ofVirtual().name("Replicate upload to " + nodeId.getApiAddress()).start(() -> {
            try {
                Object response = this.replicateRequest(nodeId, uploadRequest, contents);
                logger.debug("Successfully replicated upload request for {} to {}", (Object)uploadRequest.getFilename(), (Object)nodeId.getApiAddress());
                future.complete(response);
            }
            catch (IOException | UploadRequestReplicationException e) {
                future.completeExceptionally(e);
            }
        });
        return future;
    }

    private <T> T replicateRequest(NodeIdentifier nodeId, UploadRequest<T> uploadRequest, File contents) throws IOException {
        URI exampleRequestUri = uploadRequest.getExampleRequestUri();
        URI requestUri = new StandardHttpUriBuilder().scheme(exampleRequestUri.getScheme()).host(nodeId.getApiAddress()).port(nodeId.getApiPort()).encodedPath(exampleRequestUri.getPath()).build();
        NiFiUser user = uploadRequest.getUser();
        String filename = uploadRequest.getFilename();
        try (FileInputStream inputStream = new FileInputStream(contents);){
            Object object;
            block13: {
                HttpRequestBodySpec request = this.webClientService.post().uri(requestUri).body((InputStream)inputStream, OptionalLong.of(((InputStream)inputStream).available())).header(RequestReplicationHeader.EXECUTION_CONTINUE.getHeader(), Boolean.TRUE.toString()).header(RequestReplicationHeader.REQUEST_REPLICATED.getHeader(), Boolean.TRUE.toString()).header("X-ProxiedEntitiesChain", ProxiedEntitiesUtils.buildProxiedEntitiesChainString((NiFiUser)user)).header("X-ProxiedEntityGroups", ProxiedEntitiesUtils.buildProxiedEntityGroupsString((Set)user.getIdentityProviderGroups()));
                Map<String, String> additionalHeaders = uploadRequest.getHeaders();
                for (Map.Entry<String, String> headerEntry : additionalHeaders.entrySet()) {
                    request.header(headerEntry.getKey(), headerEntry.getValue());
                }
                logger.debug("Replicating upload request for {} to {}", (Object)filename, (Object)nodeId);
                HttpResponseEntity response = request.retrieve();
                try {
                    int statusCode = response.statusCode();
                    if (uploadRequest.getSuccessfulResponseStatus() != statusCode) {
                        String responseMessage = IOUtils.toString((InputStream)response.body(), (Charset)StandardCharsets.UTF_8);
                        throw new UploadRequestReplicationException("Failed to replicate upload request to [%s] %s".formatted(nodeId, responseMessage), statusCode);
                    }
                    InputStream responseBody = response.body();
                    object = objectMapper.readValue(responseBody, uploadRequest.getResponseClass());
                    if (response == null) break block13;
                }
                catch (Throwable throwable) {
                    if (response != null) {
                        try {
                            response.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                response.close();
            }
            return (T)object;
        }
    }
}

