/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rest.handler.job;

import java.io.IOException;
import java.io.ObjectOutputStream;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.net.SSLUtilsTest;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.streaming.api.graph.ExecutionPlan;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.concurrent.FutureUtils;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.CompletableFutureAssert;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;

@ExtendWith(value={ParameterizedTestExtension.class})
public class JobSubmitHandlerTest {
    @TempDir
    private Path temporaryFolder;
    private final Configuration configuration;
    private BlobServer blobServer;

    @Parameters(name="SSL enabled: {0}")
    public static Iterable<Tuple2<Boolean, String>> data() {
        ArrayList<Tuple2<Boolean, String>> parameters = new ArrayList<Tuple2<Boolean, String>>(3);
        parameters.add(Tuple2.of((Object)false, (Object)"no SSL"));
        for (String sslProvider : SSLUtilsTest.AVAILABLE_SSL_PROVIDERS) {
            parameters.add((Tuple2<Boolean, String>)Tuple2.of((Object)true, (Object)sslProvider));
        }
        return parameters;
    }

    public JobSubmitHandlerTest(Tuple2<Boolean, String> withSsl) {
        this.configuration = (Boolean)withSsl.f0 != false ? SSLUtilsTest.createInternalSslConfigWithKeyAndTrustStores((String)withSsl.f1) : new Configuration();
    }

    @BeforeEach
    void setup() throws IOException {
        Configuration config = new Configuration(this.configuration);
        this.blobServer = new BlobServer(config, TempDirUtils.newFolder((Path)this.temporaryFolder), (BlobStore)new VoidBlobStore());
        this.blobServer.start();
    }

    @AfterEach
    void teardown() throws IOException {
        if (this.blobServer != null) {
            this.blobServer.close();
        }
    }

    @TestTemplate
    void testSerializationFailureHandling() throws Exception {
        Path jobGraphFile = TempDirUtils.newFile((Path)this.temporaryFolder).toPath();
        TestingDispatcherGateway mockGateway = TestingDispatcherGateway.newBuilder().setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get())).build();
        JobSubmitHandler handler = new JobSubmitHandler(() -> CompletableFuture.completedFuture(mockGateway), RpcUtils.INF_TIMEOUT, Collections.emptyMap(), Executors.directExecutor(), this.configuration);
        JobSubmitRequestBody request = new JobSubmitRequestBody(jobGraphFile.toString(), Collections.emptyList(), Collections.emptyList());
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> handler.handleRequest(HandlerRequest.create((RequestBody)request, (MessageParameters)EmptyMessageParameters.getInstance()), mockGateway)).isInstanceOf(RestHandlerException.class)).satisfies(new ThrowingConsumer[]{e -> Assertions.assertThat((Comparable)((RestHandlerException)((Object)e)).getHttpResponseStatus()).isEqualTo((Object)HttpResponseStatus.BAD_REQUEST)});
    }

    @TestTemplate
    void testSuccessfulJobSubmission() throws Exception {
        Path jobGraphFile = TempDirUtils.newFile((Path)this.temporaryFolder).toPath();
        try (ObjectOutputStream objectOut = new ObjectOutputStream(Files.newOutputStream(jobGraphFile, new OpenOption[0]));){
            objectOut.writeObject(JobGraphTestUtils.emptyJobGraph());
        }
        TestingDispatcherGateway.Builder builder = TestingDispatcherGateway.newBuilder();
        builder.setBlobServerPort(this.blobServer.getPort()).setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get())).setHostname("localhost");
        TestingDispatcherGateway mockGateway = builder.build();
        JobSubmitHandler handler = new JobSubmitHandler(() -> CompletableFuture.completedFuture(mockGateway), RpcUtils.INF_TIMEOUT, Collections.emptyMap(), Executors.directExecutor(), this.configuration);
        JobSubmitRequestBody request = new JobSubmitRequestBody(jobGraphFile.getFileName().toString(), Collections.emptyList(), Collections.emptyList());
        handler.handleRequest(HandlerRequest.create((RequestBody)request, (MessageParameters)EmptyMessageParameters.getInstance(), Collections.singleton(jobGraphFile.toFile())), (DispatcherGateway)mockGateway).get();
    }

    @TestTemplate
    void testRejectionOnCountMismatch() throws Exception {
        Path jobGraphFile = TempDirUtils.newFile((Path)this.temporaryFolder).toPath();
        try (ObjectOutputStream objectOut = new ObjectOutputStream(Files.newOutputStream(jobGraphFile, new OpenOption[0]));){
            objectOut.writeObject(JobGraphTestUtils.emptyJobGraph());
        }
        Path countExceedingFile = TempDirUtils.newFile((Path)this.temporaryFolder).toPath();
        TestingDispatcherGateway.Builder builder = TestingDispatcherGateway.newBuilder();
        builder.setBlobServerPort(this.blobServer.getPort()).setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get())).setHostname("localhost");
        TestingDispatcherGateway mockGateway = builder.build();
        JobSubmitHandler handler = new JobSubmitHandler(() -> CompletableFuture.completedFuture(mockGateway), RpcUtils.INF_TIMEOUT, Collections.emptyMap(), Executors.directExecutor(), this.configuration);
        JobSubmitRequestBody request = new JobSubmitRequestBody(jobGraphFile.getFileName().toString(), Collections.emptyList(), Collections.emptyList());
        try {
            handler.handleRequest(HandlerRequest.create((RequestBody)request, (MessageParameters)EmptyMessageParameters.getInstance(), Arrays.asList(jobGraphFile.toFile(), countExceedingFile.toFile())), (DispatcherGateway)mockGateway).get();
        }
        catch (Exception e) {
            ExceptionUtils.findThrowable((Throwable)e, candidate -> candidate instanceof RestHandlerException && candidate.getMessage().contains("count"));
        }
    }

    @TestTemplate
    void testFileHandling() throws Exception {
        String dcEntryName = "entry";
        CompletableFuture submittedExecutionPlanFuture = new CompletableFuture();
        TestingDispatcherGateway dispatcherGateway = TestingDispatcherGateway.newBuilder().setBlobServerPort(this.blobServer.getPort()).setSubmitFunction(submittedExecutionPlan -> {
            submittedExecutionPlanFuture.complete(submittedExecutionPlan);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).build();
        JobSubmitHandler handler = new JobSubmitHandler(() -> CompletableFuture.completedFuture(dispatcherGateway), RpcUtils.INF_TIMEOUT, Collections.emptyMap(), Executors.directExecutor(), this.configuration);
        Path jobGraphFile = TempDirUtils.newFile((Path)this.temporaryFolder).toPath();
        Path jarFile = TempDirUtils.newFile((Path)this.temporaryFolder).toPath();
        Path artifactFile = TempDirUtils.newFile((Path)this.temporaryFolder).toPath();
        JobGraph jobGraph = JobGraphTestUtils.emptyJobGraph();
        jobGraph.addUserArtifact("entry", new DistributedCache.DistributedCacheEntry("random", Boolean.valueOf(false)));
        try (ObjectOutputStream objectOut = new ObjectOutputStream(Files.newOutputStream(jobGraphFile, new OpenOption[0]));){
            objectOut.writeObject(jobGraph);
        }
        JobSubmitRequestBody request = new JobSubmitRequestBody(jobGraphFile.getFileName().toString(), Collections.singletonList(jarFile.getFileName().toString()), Collections.singleton(new JobSubmitRequestBody.DistributedCacheFile("entry", artifactFile.getFileName().toString())));
        handler.handleRequest(HandlerRequest.create((RequestBody)request, (MessageParameters)EmptyMessageParameters.getInstance(), Arrays.asList(jobGraphFile.toFile(), jarFile.toFile(), artifactFile.toFile())), (DispatcherGateway)dispatcherGateway).get();
        ((CompletableFutureAssert)Assertions.assertThat(submittedExecutionPlanFuture).as("No ExecutionPlan was submitted.", new Object[0])).isCompleted();
        ExecutionPlan submittedExecutionPlan2 = (ExecutionPlan)submittedExecutionPlanFuture.get();
        Assertions.assertThat((List)submittedExecutionPlan2.getUserJarBlobKeys()).hasSize(1);
        Assertions.assertThat((Map)submittedExecutionPlan2.getUserArtifacts()).hasSize(1);
        Assertions.assertThat((byte[])((DistributedCache.DistributedCacheEntry)submittedExecutionPlan2.getUserArtifacts().get((Object)"entry")).blobKey).isNotNull();
    }

    @TestTemplate
    void testFailedJobSubmission() throws Exception {
        String errorMessage = "test";
        TestingDispatcherGateway mockGateway = TestingDispatcherGateway.newBuilder().setSubmitFunction(jobgraph -> FutureUtils.completedExceptionally((Throwable)new Exception("test"))).build();
        JobSubmitHandler handler = new JobSubmitHandler(() -> CompletableFuture.completedFuture(mockGateway), RpcUtils.INF_TIMEOUT, Collections.emptyMap(), Executors.directExecutor(), this.configuration);
        Path jobGraphFile = TempDirUtils.newFile((Path)this.temporaryFolder).toPath();
        JobGraph jobGraph = JobGraphTestUtils.emptyJobGraph();
        try (ObjectOutputStream objectOut = new ObjectOutputStream(Files.newOutputStream(jobGraphFile, new OpenOption[0]));){
            objectOut.writeObject(jobGraph);
        }
        JobSubmitRequestBody request = new JobSubmitRequestBody(jobGraphFile.getFileName().toString(), Collections.emptyList(), Collections.emptyList());
        FlinkAssertions.assertThatFuture((CompletableFuture)handler.handleRequest(HandlerRequest.create((RequestBody)request, (MessageParameters)EmptyMessageParameters.getInstance(), Collections.singletonList(jobGraphFile.toFile())), (DispatcherGateway)mockGateway)).eventuallyFailsWith(Exception.class).withMessageContaining("test");
    }
}

