/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.deployment;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.TestingBlobWriter;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.MarkPartitionFinishedStrategy;
import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.scheduler.ClusterDatasetCorruptedException;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.CompressedSerializedValue;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

public class TaskDeploymentDescriptorFactoryTest
extends TestLogger {
    @ClassRule
    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorResource();
    private static final int PARALLELISM = 4;

    @Test
    public void testCacheShuffleDescriptorAsNonOffloaded() throws Exception {
        this.testCacheShuffleDescriptor(new TestingBlobWriter(Integer.MAX_VALUE));
    }

    @Test
    public void testCacheShuffleDescriptorAsOffloaded() throws Exception {
        this.testCacheShuffleDescriptor(new TestingBlobWriter(0));
    }

    private void testCacheShuffleDescriptor(TestingBlobWriter blobWriter) throws Exception {
        JobID jobId = new JobID();
        Tuple2<ExecutionJobVertex, ExecutionJobVertex> executionJobVertices = this.setupExecutionGraphAndGetVertices(jobId, blobWriter);
        ExecutionVertex ev21 = ((ExecutionJobVertex)executionJobVertices.f1).getTaskVertices()[0];
        TaskDeploymentDescriptorFactoryTest.createTaskDeploymentDescriptor(ev21);
        IntermediateResult consumedResult = (IntermediateResult)((ExecutionJobVertex)executionJobVertices.f1).getInputs().get(0);
        List maybeOffloaded = consumedResult.getCachedShuffleDescriptors(ev21.getConsumedPartitionGroup(0)).getAllSerializedShuffleDescriptors();
        ShuffleDescriptor[] cachedShuffleDescriptors = TaskDeploymentDescriptorFactoryTest.deserializeShuffleDescriptors(maybeOffloaded, jobId, blobWriter);
        Assert.assertEquals((long)ev21.getConsumedPartitionGroup(0).size(), (long)cachedShuffleDescriptors.length);
        int idx = 0;
        for (IntermediateResultPartitionID consumedPartitionId : ev21.getConsumedPartitionGroup(0)) {
            Assert.assertEquals((Object)consumedPartitionId, (Object)cachedShuffleDescriptors[idx++].getResultPartitionID().getPartitionId());
        }
    }

    @Test
    public void testHybridVertexFinish() throws Exception {
        Tuple2<ExecutionJobVertex, ExecutionJobVertex> executionJobVertices = this.buildExecutionGraph();
        ExecutionJobVertex ejv1 = (ExecutionJobVertex)executionJobVertices.f0;
        ExecutionJobVertex ejv2 = (ExecutionJobVertex)executionJobVertices.f1;
        ExecutionVertex ev21 = ejv2.getTaskVertices()[0];
        TaskDeploymentDescriptorFactoryTest.createTaskDeploymentDescriptor(ev21);
        ExecutionVertex ev11 = ejv1.getTaskVertices()[0];
        ExecutionVertex ev12 = ejv1.getTaskVertices()[1];
        ev11.finishPartitionsIfNeeded();
        ev12.finishPartitionsIfNeeded();
        ExecutionVertex ev22 = ejv2.getTaskVertices()[1];
        TaskDeploymentDescriptorFactoryTest.createTaskDeploymentDescriptor(ev22);
        IntermediateResult consumedResult = (IntermediateResult)ejv2.getInputs().get(0);
        List maybeOffloaded = consumedResult.getCachedShuffleDescriptors(ev22.getConsumedPartitionGroup(0)).getAllSerializedShuffleDescriptors();
        Assert.assertEquals((long)maybeOffloaded.size(), (long)2L);
        ExecutionVertex ev13 = ejv1.getTaskVertices()[2];
        ev13.finishPartitionsIfNeeded();
        ExecutionVertex ev23 = ejv2.getTaskVertices()[2];
        TaskDeploymentDescriptorFactoryTest.createTaskDeploymentDescriptor(ev23);
        consumedResult = (IntermediateResult)ejv2.getInputs().get(0);
        maybeOffloaded = consumedResult.getCachedShuffleDescriptors(ev23.getConsumedPartitionGroup(0)).getAllSerializedShuffleDescriptors();
        Assert.assertEquals((long)maybeOffloaded.size(), (long)3L);
    }

    @Test(expected=IllegalStateException.class)
    public void testGetOffloadedShuffleDescriptorBeforeLoading() throws Exception {
        TestingBlobWriter blobWriter = new TestingBlobWriter(0);
        JobID jobId = new JobID();
        Tuple2<ExecutionJobVertex, ExecutionJobVertex> executionJobVertices = this.setupExecutionGraphAndGetVertices(jobId, blobWriter);
        ExecutionVertex ev21 = ((ExecutionJobVertex)executionJobVertices.f1).getTaskVertices()[0];
        TaskDeploymentDescriptor tdd = TaskDeploymentDescriptorFactoryTest.createTaskDeploymentDescriptor(ev21);
        ((InputGateDeploymentDescriptor)tdd.getInputGates().get(0)).getShuffleDescriptors();
    }

    private Tuple2<ExecutionJobVertex, ExecutionJobVertex> setupExecutionGraphAndGetVertices(JobID jobId, BlobWriter blobWriter) throws Exception {
        return this.setupExecutionGraphAndGetVertices(jobId, blobWriter, ResultPartitionType.BLOCKING, ResultPartitionType::isBlockingOrBlockingPersistentResultPartition);
    }

    private Tuple2<ExecutionJobVertex, ExecutionJobVertex> setupExecutionGraphAndGetVertices(JobID jobId, BlobWriter blobWriter, ResultPartitionType resultPartitionType, MarkPartitionFinishedStrategy markPartitionFinishedStrategy) throws Exception {
        JobVertex v1 = TaskDeploymentDescriptorFactoryTest.createJobVertex("v1", 4);
        JobVertex v2 = TaskDeploymentDescriptorFactoryTest.createJobVertex("v2", 4);
        v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, resultPartitionType);
        ArrayList<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
        ExecutionGraph executionGraph = TaskDeploymentDescriptorFactoryTest.createExecutionGraph(jobId, ordered, blobWriter, markPartitionFinishedStrategy);
        return Tuple2.of((Object)executionGraph.getJobVertex(v1.getID()), (Object)executionGraph.getJobVertex(v2.getID()));
    }

    private Tuple2<ExecutionJobVertex, ExecutionJobVertex> buildExecutionGraph() throws Exception {
        JobVertex producer = TaskDeploymentDescriptorFactoryTest.createJobVertex("v1", 4);
        JobVertex consumer = TaskDeploymentDescriptorFactoryTest.createJobVertex("v2", 4);
        consumer.connectNewDataSetAsInput(producer, DistributionPattern.ALL_TO_ALL, ResultPartitionType.HYBRID_FULL);
        JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(producer, consumer);
        AdaptiveBatchScheduler scheduler = new DefaultSchedulerBuilder(jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setHybridPartitionDataConsumeConstraint(JobManagerOptions.HybridPartitionDataConsumeConstraint.ONLY_FINISHED_PRODUCERS).buildAdaptiveBatchJobScheduler();
        scheduler.startScheduling();
        ExecutionGraph executionGraph = scheduler.getExecutionGraph();
        return Tuple2.of((Object)executionGraph.getJobVertex(producer.getID()), (Object)executionGraph.getJobVertex(consumer.getID()));
    }

    private static JobVertex createJobVertex(String vertexName, int parallelism) {
        JobVertex jobVertex = new JobVertex(vertexName);
        jobVertex.setParallelism(parallelism);
        jobVertex.setInvokableClass(AbstractInvokable.class);
        return jobVertex;
    }

    private static ExecutionGraph createExecutionGraph(JobID jobId, List<JobVertex> jobVertices, BlobWriter blobWriter, MarkPartitionFinishedStrategy markPartitionFinishedStrategy) throws JobException, JobExecutionException {
        JobGraph jobGraph = JobGraphBuilder.newBatchJobGraphBuilder().setJobId(jobId).addJobVertices(jobVertices).build();
        return TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).setBlobWriter(blobWriter).setMarkPartitionFinishedStrategy(markPartitionFinishedStrategy).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
    }

    private static TaskDeploymentDescriptor createTaskDeploymentDescriptor(ExecutionVertex ev) throws IOException, ClusterDatasetCorruptedException {
        return TaskDeploymentDescriptorFactory.fromExecution((Execution)ev.getCurrentExecutionAttempt()).createDeploymentDescriptor(new AllocationID(), null, Collections.emptyList());
    }

    public static ShuffleDescriptor[] deserializeShuffleDescriptors(List<TaskDeploymentDescriptor.MaybeOffloaded<TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex[]>> maybeOffloaded, JobID jobId, TestingBlobWriter blobWriter) throws IOException, ClassNotFoundException {
        HashMap<Integer, ShuffleDescriptor> shuffleDescriptorsMap = new HashMap<Integer, ShuffleDescriptor>();
        int maxIndex = 0;
        for (TaskDeploymentDescriptor.MaybeOffloaded<TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex[]> sd : maybeOffloaded) {
            TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex[] shuffleDescriptorAndIndices;
            if (sd instanceof TaskDeploymentDescriptor.NonOffloaded) {
                shuffleDescriptorAndIndices = (TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex[])((TaskDeploymentDescriptor.NonOffloaded)sd).serializedValue.deserializeValue(ClassLoader.getSystemClassLoader());
            } else {
                CompressedSerializedValue compressedSerializedValue = CompressedSerializedValue.fromBytes((byte[])blobWriter.getBlob(jobId, ((TaskDeploymentDescriptor.Offloaded)sd).serializedValueKey));
                shuffleDescriptorAndIndices = (TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex[])compressedSerializedValue.deserializeValue(ClassLoader.getSystemClassLoader());
            }
            for (TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex shuffleDescriptorAndIndex : shuffleDescriptorAndIndices) {
                int index = shuffleDescriptorAndIndex.getIndex();
                maxIndex = Math.max(maxIndex, shuffleDescriptorAndIndex.getIndex());
                shuffleDescriptorsMap.put(index, shuffleDescriptorAndIndex.getShuffleDescriptor());
            }
        }
        ShuffleDescriptor[] shuffleDescriptors = new ShuffleDescriptor[maxIndex + 1];
        shuffleDescriptorsMap.forEach((key, value) -> {
            shuffleDescriptors[key.intValue()] = value;
        });
        return shuffleDescriptors;
    }
}

