/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.io.network.partition.AbstractPartitionTrackerTest;
import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTrackerImpl;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.shuffle.PartitionDescriptor;
import org.apache.flink.runtime.shuffle.ProducerDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.shaded.guava32.com.google.common.collect.Iterables;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class JobMasterPartitionTrackerImplTest {
    JobMasterPartitionTrackerImplTest() {
    }

    @Test
    void testPipelinedPartitionIsNotTracked() {
        JobMasterPartitionTrackerImplTest.testReleaseOnConsumptionHandling(ResultPartitionType.PIPELINED);
    }

    @Test
    void testBlockingPartitionIsTracked() {
        JobMasterPartitionTrackerImplTest.testReleaseOnConsumptionHandling(ResultPartitionType.BLOCKING);
    }

    @Test
    void testPipelinedApproximatePartitionIsTracked() {
        JobMasterPartitionTrackerImplTest.testReleaseOnConsumptionHandling(ResultPartitionType.PIPELINED_APPROXIMATE);
    }

    private static void testReleaseOnConsumptionHandling(ResultPartitionType resultPartitionType) {
        JobMasterPartitionTrackerImpl partitionTracker = new JobMasterPartitionTrackerImpl(new JobID(), (ShuffleMaster)new TestingShuffleMaster(), ignored -> Optional.empty());
        ResourceID resourceId = ResourceID.generate();
        ResultPartitionID resultPartitionId = new ResultPartitionID();
        partitionTracker.startTrackingPartition(resourceId, AbstractPartitionTrackerTest.createResultPartitionDeploymentDescriptor(resultPartitionId, resultPartitionType, true));
        Assertions.assertThat((boolean)partitionTracker.isTrackingPartitionsFor((Object)resourceId)).isEqualTo(resultPartitionType.isReleaseByScheduler());
    }

    @Test
    void testReleaseCallsWithLocalResources() {
        TestingShuffleMaster shuffleMaster = new TestingShuffleMaster();
        JobID jobId = new JobID();
        ArrayBlockingQueue releaseCall = new ArrayBlockingQueue(4);
        JobMasterPartitionTrackerImpl partitionTracker = new JobMasterPartitionTrackerImpl(jobId, (ShuffleMaster)shuffleMaster, tmId -> Optional.of(JobMasterPartitionTrackerImplTest.createTaskExecutorGateway(tmId, releaseCall, new ArrayBlockingQueue<PromoteCall>(4))));
        ResourceID tmId2 = ResourceID.generate();
        ResultPartitionID resultPartitionId = new ResultPartitionID();
        partitionTracker.startTrackingPartition(tmId2, AbstractPartitionTrackerTest.createResultPartitionDeploymentDescriptor(resultPartitionId, true));
        Assertions.assertThat((boolean)partitionTracker.isTrackingPartitionsFor((Object)tmId2)).isTrue();
        partitionTracker.stopTrackingAndReleasePartitions(Arrays.asList(resultPartitionId));
        Assertions.assertThat(releaseCall).hasSize(1);
        ReleaseCall releaseOrPromoteCall = (ReleaseCall)releaseCall.remove();
        Assertions.assertThat((Object)releaseOrPromoteCall.getTaskExecutorId()).isEqualTo((Object)tmId2);
        Assertions.assertThat((Comparable)releaseOrPromoteCall.getJobId()).isEqualTo((Object)jobId);
        Assertions.assertThat(releaseOrPromoteCall.getReleasedPartitions()).contains((Object[])new ResultPartitionID[]{resultPartitionId});
        Assertions.assertThat(shuffleMaster.externallyReleasedPartitions).hasSize(1);
        Assertions.assertThat((Object)shuffleMaster.externallyReleasedPartitions.remove()).isEqualTo((Object)resultPartitionId);
        Assertions.assertThat((boolean)partitionTracker.isTrackingPartitionsFor((Object)tmId2)).isFalse();
    }

    @Test
    void testReleaseCallsWithoutLocalResources() {
        TestingShuffleMaster shuffleMaster = new TestingShuffleMaster();
        ArrayBlockingQueue releaseCalls = new ArrayBlockingQueue(4);
        ArrayBlockingQueue promoteCalls = new ArrayBlockingQueue(4);
        JobMasterPartitionTrackerImpl partitionTracker = new JobMasterPartitionTrackerImpl(new JobID(), (ShuffleMaster)shuffleMaster, tmId -> Optional.of(JobMasterPartitionTrackerImplTest.createTaskExecutorGateway(tmId, releaseCalls, promoteCalls)));
        ResourceID tmId2 = ResourceID.generate();
        ResultPartitionID resultPartitionId = new ResultPartitionID();
        partitionTracker.startTrackingPartition(tmId2, AbstractPartitionTrackerTest.createResultPartitionDeploymentDescriptor(resultPartitionId, false));
        Assertions.assertThat((boolean)partitionTracker.isTrackingPartitionsFor((Object)tmId2)).isFalse();
        partitionTracker.stopTrackingAndReleasePartitions(Arrays.asList(resultPartitionId));
        Assertions.assertThat(releaseCalls).isEmpty();
        Assertions.assertThat(promoteCalls).isEmpty();
        Assertions.assertThat(shuffleMaster.externallyReleasedPartitions).containsOnly((Object[])new ResultPartitionID[]{resultPartitionId});
    }

    @Test
    void testStopTrackingIssuesNoReleaseCalls() {
        TestingShuffleMaster shuffleMaster = new TestingShuffleMaster();
        ArrayBlockingQueue releaseCalls = new ArrayBlockingQueue(4);
        ArrayBlockingQueue promoteCalls = new ArrayBlockingQueue(4);
        JobMasterPartitionTrackerImpl partitionTracker = new JobMasterPartitionTrackerImpl(new JobID(), (ShuffleMaster)shuffleMaster, resourceId -> Optional.of(JobMasterPartitionTrackerImplTest.createTaskExecutorGateway(resourceId, releaseCalls, promoteCalls)));
        ResourceID taskExecutorId1 = ResourceID.generate();
        ResultPartitionID resultPartitionId1 = new ResultPartitionID();
        partitionTracker.startTrackingPartition(taskExecutorId1, AbstractPartitionTrackerTest.createResultPartitionDeploymentDescriptor(resultPartitionId1, true));
        partitionTracker.stopTrackingPartitionsFor((Object)taskExecutorId1);
        Assertions.assertThat(releaseCalls).isEmpty();
        Assertions.assertThat(promoteCalls).isEmpty();
        Assertions.assertThat(shuffleMaster.externallyReleasedPartitions).isEmpty();
    }

    @Test
    void testTrackingInternalAndExternalPartitionsByTmId() {
        TestingShuffleMaster shuffleMaster = new TestingShuffleMaster();
        ArrayBlockingQueue releaseCalls = new ArrayBlockingQueue(4);
        ArrayBlockingQueue promoteCalls = new ArrayBlockingQueue(4);
        JobMasterPartitionTrackerImpl partitionTracker = new JobMasterPartitionTrackerImpl(new JobID(), (ShuffleMaster)shuffleMaster, resourceId -> Optional.of(JobMasterPartitionTrackerImplTest.createTaskExecutorGateway(resourceId, releaseCalls, promoteCalls)));
        ResourceID taskExecutorId = ResourceID.generate();
        ResultPartitionID resultPartitionId1 = new ResultPartitionID();
        ResultPartitionID resultPartitionId2 = new ResultPartitionID();
        partitionTracker.startTrackingPartition(taskExecutorId, AbstractPartitionTrackerTest.createResultPartitionDeploymentDescriptor(resultPartitionId2, false));
        Assertions.assertThat((boolean)partitionTracker.isTrackingPartitionsFor((Object)taskExecutorId)).isFalse();
        partitionTracker.startTrackingPartition(taskExecutorId, AbstractPartitionTrackerTest.createResultPartitionDeploymentDescriptor(resultPartitionId1, true));
        Assertions.assertThat((boolean)partitionTracker.isTrackingPartitionsFor((Object)taskExecutorId)).isTrue();
        Assertions.assertThat(partitionTracker.getAllTrackedPartitions().stream().map(desc -> desc.getShuffleDescriptor().getResultPartitionID()).collect(Collectors.toList())).containsExactlyInAnyOrder((Object[])new ResultPartitionID[]{resultPartitionId1, resultPartitionId2});
        partitionTracker.stopTrackingPartitionsFor((Object)taskExecutorId);
        Assertions.assertThat((boolean)partitionTracker.isTrackingPartitionsFor((Object)taskExecutorId)).isFalse();
        Assertions.assertThat((boolean)partitionTracker.isPartitionTracked(resultPartitionId1)).isFalse();
        Assertions.assertThat((boolean)partitionTracker.isPartitionTracked(resultPartitionId2)).isTrue();
        Assertions.assertThat((Object)((ResultPartitionDeploymentDescriptor)Iterables.getOnlyElement((Iterable)partitionTracker.getAllTrackedPartitions())).getShuffleDescriptor().getResultPartitionID()).isEqualTo((Object)resultPartitionId2);
    }

    @Test
    void testGetJobPartitionClusterPartition() {
        TestingShuffleMaster shuffleMaster = new TestingShuffleMaster();
        ArrayBlockingQueue releaseCalls = new ArrayBlockingQueue(4);
        ArrayBlockingQueue promoteCalls = new ArrayBlockingQueue(4);
        JobMasterPartitionTrackerImpl partitionTracker = new JobMasterPartitionTrackerImpl(new JobID(), (ShuffleMaster)shuffleMaster, resourceId -> Optional.of(JobMasterPartitionTrackerImplTest.createTaskExecutorGateway(resourceId, releaseCalls, promoteCalls)));
        ResourceID taskExecutorId = ResourceID.generate();
        ResultPartitionID resultPartitionId1 = new ResultPartitionID();
        ResultPartitionID resultPartitionId2 = new ResultPartitionID();
        ResultPartitionDeploymentDescriptor clusterPartition = AbstractPartitionTrackerTest.createResultPartitionDeploymentDescriptor(resultPartitionId1, ResultPartitionType.BLOCKING_PERSISTENT, false);
        ResultPartitionDeploymentDescriptor jobPartition = AbstractPartitionTrackerTest.createResultPartitionDeploymentDescriptor(resultPartitionId2, false);
        partitionTracker.startTrackingPartition(taskExecutorId, clusterPartition);
        partitionTracker.startTrackingPartition(taskExecutorId, jobPartition);
        Assertions.assertThat((Collection)partitionTracker.getAllTrackedNonClusterPartitions()).containsExactly((Object[])new ResultPartitionDeploymentDescriptor[]{jobPartition});
        Assertions.assertThat((Collection)partitionTracker.getAllTrackedClusterPartitions()).containsExactly((Object[])new ResultPartitionDeploymentDescriptor[]{clusterPartition});
    }

    @Test
    void testGetShuffleDescriptors() {
        TestingShuffleMaster shuffleMaster = new TestingShuffleMaster();
        IntermediateDataSetID intermediateDataSetId = new IntermediateDataSetID();
        ArrayBlockingQueue releaseCalls = new ArrayBlockingQueue(4);
        ArrayBlockingQueue promoteCalls = new ArrayBlockingQueue(4);
        JobMasterPartitionTrackerImpl partitionTracker = new JobMasterPartitionTrackerImpl(new JobID(), (ShuffleMaster)shuffleMaster, resourceId -> Optional.of(JobMasterPartitionTrackerImplTest.createTaskExecutorGateway(resourceId, releaseCalls, promoteCalls)));
        TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
        partitionTracker.connectToResourceManager((ResourceManagerGateway)resourceManagerGateway);
        partitionTracker.getClusterPartitionShuffleDescriptors(intermediateDataSetId);
        Assertions.assertThat(resourceManagerGateway.requestedIntermediateDataSetIds).contains((Object[])new IntermediateDataSetID[]{intermediateDataSetId});
    }

    @Test
    void testGetShuffleDescriptorsBeforeConnectToResourceManager() {
        TestingShuffleMaster shuffleMaster = new TestingShuffleMaster();
        IntermediateDataSetID intermediateDataSetId = new IntermediateDataSetID();
        ArrayBlockingQueue releaseCalls = new ArrayBlockingQueue(4);
        ArrayBlockingQueue promoteCalls = new ArrayBlockingQueue(4);
        JobMasterPartitionTrackerImpl partitionTracker = new JobMasterPartitionTrackerImpl(new JobID(), (ShuffleMaster)shuffleMaster, resourceId -> Optional.of(JobMasterPartitionTrackerImplTest.createTaskExecutorGateway(resourceId, releaseCalls, promoteCalls)));
        Assertions.assertThatThrownBy(() -> partitionTracker.getClusterPartitionShuffleDescriptors(intermediateDataSetId)).isInstanceOf(NullPointerException.class);
    }

    @Test
    void testReleaseJobPartitionPromoteClusterPartition() {
        TestingShuffleMaster shuffleMaster = new TestingShuffleMaster();
        ArrayBlockingQueue taskExecutorReleaseCalls = new ArrayBlockingQueue(4);
        ArrayBlockingQueue taskExecutorPromoteCalls = new ArrayBlockingQueue(4);
        JobMasterPartitionTrackerImpl partitionTracker = new JobMasterPartitionTrackerImpl(new JobID(), (ShuffleMaster)shuffleMaster, resourceId -> Optional.of(JobMasterPartitionTrackerImplTest.createTaskExecutorGateway(resourceId, taskExecutorReleaseCalls, taskExecutorPromoteCalls)));
        ResourceID taskExecutorId1 = ResourceID.generate();
        ResultPartitionID jobPartitionId0 = new ResultPartitionID();
        ResultPartitionID jobPartitionId1 = new ResultPartitionID();
        ResultPartitionID clusterPartitionId0 = new ResultPartitionID();
        ResultPartitionID clusterPartitionId1 = new ResultPartitionID();
        ResultPartitionDeploymentDescriptor jobPartition0 = AbstractPartitionTrackerTest.createResultPartitionDeploymentDescriptor(jobPartitionId0, ResultPartitionType.BLOCKING, true);
        partitionTracker.startTrackingPartition(taskExecutorId1, jobPartition0);
        ResultPartitionDeploymentDescriptor jobPartition1 = AbstractPartitionTrackerTest.createResultPartitionDeploymentDescriptor(jobPartitionId1, ResultPartitionType.BLOCKING, false);
        partitionTracker.startTrackingPartition(taskExecutorId1, jobPartition1);
        ResultPartitionDeploymentDescriptor clusterPartition0 = AbstractPartitionTrackerTest.createResultPartitionDeploymentDescriptor(clusterPartitionId0, ResultPartitionType.BLOCKING_PERSISTENT, true);
        partitionTracker.startTrackingPartition(taskExecutorId1, clusterPartition0);
        ResultPartitionDeploymentDescriptor clusterPartition1 = AbstractPartitionTrackerTest.createResultPartitionDeploymentDescriptor(clusterPartitionId1, ResultPartitionType.BLOCKING_PERSISTENT, false);
        partitionTracker.startTrackingPartition(taskExecutorId1, clusterPartition1);
        partitionTracker.stopTrackingAndReleasePartitions(Arrays.asList(jobPartitionId0, jobPartitionId1));
        partitionTracker.stopTrackingAndPromotePartitions(Arrays.asList(clusterPartitionId0, clusterPartitionId1));
        Assertions.assertThat(taskExecutorReleaseCalls).hasSize(1);
        Assertions.assertThat(taskExecutorPromoteCalls).hasSize(1);
        ReleaseCall releaseCall = (ReleaseCall)taskExecutorReleaseCalls.remove();
        PromoteCall promoteCall = (PromoteCall)taskExecutorPromoteCalls.remove();
        Assertions.assertThat((Object)((ResultPartitionID)Iterables.getOnlyElement(releaseCall.getReleasedPartitions()))).isEqualTo((Object)jobPartitionId0);
        Assertions.assertThat((Object)((ResultPartitionID)Iterables.getOnlyElement(promoteCall.getPromotedPartitions()))).isEqualTo((Object)clusterPartitionId0);
        ArrayList<ResultPartitionID> externallyReleasedPartitions = new ArrayList<ResultPartitionID>(shuffleMaster.externallyReleasedPartitions);
        Assertions.assertThat(externallyReleasedPartitions).containsExactlyInAnyOrder((Object[])new ResultPartitionID[]{jobPartitionId0, jobPartitionId1});
    }

    private static TaskExecutorGateway createTaskExecutorGateway(ResourceID taskExecutorId, Collection<ReleaseCall> releaseCalls, Collection<PromoteCall> promoteCalls) {
        return new TestingTaskExecutorGatewayBuilder().setReleasePartitionsConsumer((jobId, partitions) -> releaseCalls.add(new ReleaseCall(taskExecutorId, (JobID)jobId, (Collection<ResultPartitionID>)partitions))).setPromotePartitionsConsumer((jobId, partitions) -> promoteCalls.add(new PromoteCall(taskExecutorId, (JobID)jobId, (Collection<ResultPartitionID>)partitions))).createTestingTaskExecutorGateway();
    }

    private static class PromoteCall {
        private final ResourceID taskExecutorId;
        private final JobID jobId;
        private final Collection<ResultPartitionID> promotedPartitions;

        private PromoteCall(ResourceID taskExecutorId, JobID jobId, Collection<ResultPartitionID> promotedPartitions) {
            this.taskExecutorId = taskExecutorId;
            this.jobId = jobId;
            this.promotedPartitions = promotedPartitions;
        }

        public ResourceID getTaskExecutorId() {
            return this.taskExecutorId;
        }

        public JobID getJobId() {
            return this.jobId;
        }

        public Collection<ResultPartitionID> getPromotedPartitions() {
            return this.promotedPartitions;
        }
    }

    private static class ReleaseCall {
        private final ResourceID taskExecutorId;
        private final JobID jobId;
        private final Collection<ResultPartitionID> releasedPartitions;

        private ReleaseCall(ResourceID taskExecutorId, JobID jobId, Collection<ResultPartitionID> releasedPartitions) {
            this.taskExecutorId = taskExecutorId;
            this.jobId = jobId;
            this.releasedPartitions = releasedPartitions;
        }

        public ResourceID getTaskExecutorId() {
            return this.taskExecutorId;
        }

        public JobID getJobId() {
            return this.jobId;
        }

        public Collection<ResultPartitionID> getReleasedPartitions() {
            return this.releasedPartitions;
        }
    }

    private static class TestingResourceManagerGateway
    extends org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway {
        private final List<IntermediateDataSetID> requestedIntermediateDataSetIds = new ArrayList<IntermediateDataSetID>();

        private TestingResourceManagerGateway() {
        }

        @Override
        public CompletableFuture<List<ShuffleDescriptor>> getClusterPartitionsShuffleDescriptors(IntermediateDataSetID intermediateDataSetID) {
            this.requestedIntermediateDataSetIds.add(intermediateDataSetID);
            return CompletableFuture.completedFuture(Collections.emptyList());
        }
    }

    private static class TestingShuffleMaster
    implements ShuffleMaster<ShuffleDescriptor> {
        final Queue<ResultPartitionID> externallyReleasedPartitions = new ArrayBlockingQueue<ResultPartitionID>(4);

        private TestingShuffleMaster() {
        }

        public CompletableFuture<ShuffleDescriptor> registerPartitionWithProducer(JobID jobID, PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor) {
            return null;
        }

        public void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor) {
            this.externallyReleasedPartitions.add(shuffleDescriptor.getResultPartitionID());
        }
    }
}

