/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.deployment;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.deployment.CachedShuffleDescriptors;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.CompressedSerializedValue;
import org.apache.flink.util.SerializedValue;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class CachedShuffleDescriptorsTest {
    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();

    CachedShuffleDescriptorsTest() {
    }

    @Test
    void testCreateAndGet() throws Exception {
        JobVertex v1 = new JobVertex("source");
        JobVertex v2 = new JobVertex("sink");
        ExecutionGraph eg = this.buildExecutionGraph(v1, v2, 1, 1, DistributionPattern.ALL_TO_ALL);
        ExecutionJobVertex ejv1 = eg.getJobVertex(v1.getID());
        Assertions.assertThat((Object)ejv1).isNotNull();
        ExecutionVertex ev11 = ejv1.getTaskVertices()[0];
        ExecutionJobVertex ejv2 = eg.getJobVertex(v2.getID());
        Assertions.assertThat((Object)ejv2).isNotNull();
        ExecutionVertex ev21 = ejv2.getTaskVertices()[0];
        IntermediateResultPartition intermediateResultPartition = (IntermediateResultPartition)ev11.getProducedPartitions().values().stream().findAny().get();
        ConsumedPartitionGroup consumedPartitionGroup = ev21.getConsumedPartitionGroup(0);
        ShuffleDescriptor shuffleDescriptor = this.getShuffleDescriptor(intermediateResultPartition);
        CachedShuffleDescriptors cachedShuffleDescriptors = new CachedShuffleDescriptors(consumedPartitionGroup, CachedShuffleDescriptorsTest.createSingleShuffleDescriptorAndIndex(shuffleDescriptor, 0));
        Assertions.assertThat((List)cachedShuffleDescriptors.getAllSerializedShuffleDescriptors()).isEmpty();
        cachedShuffleDescriptors.serializeShuffleDescriptors(CachedShuffleDescriptorsTest::nonOffloadedShuffleDescriptor);
        Assertions.assertThat((List)cachedShuffleDescriptors.getAllSerializedShuffleDescriptors()).hasSize(1);
        TaskDeploymentDescriptor.MaybeOffloaded maybeOffloadedShuffleDescriptor = (TaskDeploymentDescriptor.MaybeOffloaded)cachedShuffleDescriptors.getAllSerializedShuffleDescriptors().get(0);
        this.assertNonOffloadedShuffleDescriptorAndIndexEquals((TaskDeploymentDescriptor.MaybeOffloaded<TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex[]>)maybeOffloadedShuffleDescriptor, Collections.singletonList(shuffleDescriptor), Collections.singletonList(0));
    }

    @Test
    void testMarkPartitionFinishAndSerialize() throws Exception {
        JobVertex v1 = new JobVertex("source");
        JobVertex v2 = new JobVertex("sink");
        ExecutionGraph eg = this.buildExecutionGraph(v1, v2, 2, 1, DistributionPattern.ALL_TO_ALL);
        ExecutionJobVertex ejv1 = eg.getJobVertex(v1.getID());
        Assertions.assertThat((Object)ejv1).isNotNull();
        ExecutionVertex ev11 = ejv1.getTaskVertices()[0];
        ExecutionVertex ev12 = ejv1.getTaskVertices()[1];
        ExecutionJobVertex ejv2 = eg.getJobVertex(v2.getID());
        Assertions.assertThat((Object)ejv2).isNotNull();
        ExecutionVertex ev21 = ejv2.getTaskVertices()[0];
        IntermediateResultPartition intermediateResultPartition1 = (IntermediateResultPartition)ev11.getProducedPartitions().values().stream().findAny().get();
        IntermediateResultPartition intermediateResultPartition2 = (IntermediateResultPartition)ev12.getProducedPartitions().values().stream().findAny().get();
        ConsumedPartitionGroup consumedPartitionGroup1 = ev21.getConsumedPartitionGroup(0);
        ShuffleDescriptor shuffleDescriptor = this.getShuffleDescriptor(intermediateResultPartition1);
        CachedShuffleDescriptors cachedShuffleDescriptors = new CachedShuffleDescriptors(consumedPartitionGroup1, CachedShuffleDescriptorsTest.createSingleShuffleDescriptorAndIndex(shuffleDescriptor, 0));
        cachedShuffleDescriptors.serializeShuffleDescriptors(CachedShuffleDescriptorsTest::nonOffloadedShuffleDescriptor);
        cachedShuffleDescriptors.markPartitionFinished(intermediateResultPartition1);
        cachedShuffleDescriptors.markPartitionFinished(intermediateResultPartition2);
        cachedShuffleDescriptors.serializeShuffleDescriptors(CachedShuffleDescriptorsTest::nonOffloadedShuffleDescriptor);
        Assertions.assertThat((List)cachedShuffleDescriptors.getAllSerializedShuffleDescriptors()).hasSize(2);
        TaskDeploymentDescriptor.MaybeOffloaded maybeOffloaded = (TaskDeploymentDescriptor.MaybeOffloaded)cachedShuffleDescriptors.getAllSerializedShuffleDescriptors().get(1);
        ShuffleDescriptor expectedShuffleDescriptor1 = TaskDeploymentDescriptorFactory.getConsumedPartitionShuffleDescriptor((IntermediateResultPartition)intermediateResultPartition1, (TaskDeploymentDescriptorFactory.PartitionLocationConstraint)TaskDeploymentDescriptorFactory.PartitionLocationConstraint.MUST_BE_KNOWN, (boolean)false);
        ShuffleDescriptor expectedShuffleDescriptor2 = TaskDeploymentDescriptorFactory.getConsumedPartitionShuffleDescriptor((IntermediateResultPartition)intermediateResultPartition2, (TaskDeploymentDescriptorFactory.PartitionLocationConstraint)TaskDeploymentDescriptorFactory.PartitionLocationConstraint.MUST_BE_KNOWN, (boolean)false);
        this.assertNonOffloadedShuffleDescriptorAndIndexEquals((TaskDeploymentDescriptor.MaybeOffloaded<TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex[]>)maybeOffloaded, Arrays.asList(expectedShuffleDescriptor1, expectedShuffleDescriptor2), Arrays.asList(0, 1));
    }

    private static TaskDeploymentDescriptor.MaybeOffloaded<TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex[]> nonOffloadedShuffleDescriptor(TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex[] toBeSerialized) throws IOException {
        return new TaskDeploymentDescriptor.NonOffloaded((SerializedValue)CompressedSerializedValue.fromObject((Object)toBeSerialized));
    }

    private void assertNonOffloadedShuffleDescriptorAndIndexEquals(TaskDeploymentDescriptor.MaybeOffloaded<TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex[]> maybeOffloaded, List<ShuffleDescriptor> expectedDescriptors, List<Integer> expectedIndices) throws Exception {
        Assertions.assertThat(expectedDescriptors).hasSameSizeAs(expectedIndices);
        Assertions.assertThat(maybeOffloaded).isInstanceOf(TaskDeploymentDescriptor.NonOffloaded.class);
        TaskDeploymentDescriptor.NonOffloaded nonOffloaded = (TaskDeploymentDescriptor.NonOffloaded)maybeOffloaded;
        Object[] shuffleDescriptorAndIndices = (TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex[])nonOffloaded.serializedValue.deserializeValue(this.getClass().getClassLoader());
        Assertions.assertThat((Object[])shuffleDescriptorAndIndices).hasSameSizeAs(expectedDescriptors);
        for (int i = 0; i < shuffleDescriptorAndIndices.length; ++i) {
            Assertions.assertThat((int)shuffleDescriptorAndIndices[i].getIndex()).isEqualTo((Object)expectedIndices.get(i));
            Assertions.assertThat((Object)shuffleDescriptorAndIndices[i].getShuffleDescriptor().getResultPartitionID()).isEqualTo((Object)expectedDescriptors.get(i).getResultPartitionID());
        }
    }

    private ShuffleDescriptor getShuffleDescriptor(IntermediateResultPartition intermediateResultPartition) {
        return TaskDeploymentDescriptorFactory.getConsumedPartitionShuffleDescriptor((IntermediateResultPartition)intermediateResultPartition, (TaskDeploymentDescriptorFactory.PartitionLocationConstraint)TaskDeploymentDescriptorFactory.PartitionLocationConstraint.CAN_BE_UNKNOWN, (boolean)true);
    }

    private static TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex[] createSingleShuffleDescriptorAndIndex(ShuffleDescriptor shuffleDescriptor, int index) {
        return new TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex[]{new TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex(shuffleDescriptor, index)};
    }

    private ExecutionGraph buildExecutionGraph(JobVertex producer, JobVertex consumer, int producerParallelism, int consumerParallelism, DistributionPattern distributionPattern) throws Exception {
        producer.setParallelism(producerParallelism);
        consumer.setParallelism(consumerParallelism);
        producer.setInvokableClass(NoOpInvokable.class);
        consumer.setInvokableClass(NoOpInvokable.class);
        consumer.connectNewDataSetAsInput(producer, distributionPattern, ResultPartitionType.HYBRID_FULL);
        JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(producer, consumer);
        DefaultScheduler scheduler = new DefaultSchedulerBuilder(jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).build();
        scheduler.startScheduling();
        return scheduler.getExecutionGraph();
    }
}

