/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.io.network.partition.AbstractPartitionTrackerTest;
import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTrackerImpl;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.shuffle.PartitionDescriptor;
import org.apache.flink.runtime.shuffle.ProducerDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.TriConsumer;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

public class JobMasterPartitionTrackerImplTest
extends TestLogger {
    @Test
    public void testPipelinedPartitionIsNotTracked() {
        JobMasterPartitionTrackerImplTest.testReleaseOnConsumptionHandling(ResultPartitionType.PIPELINED);
    }

    @Test
    public void testBlockingPartitionIsTracked() {
        JobMasterPartitionTrackerImplTest.testReleaseOnConsumptionHandling(ResultPartitionType.BLOCKING);
    }

    @Test
    public void testPipelinedApproximatePartitionIsTracked() {
        JobMasterPartitionTrackerImplTest.testReleaseOnConsumptionHandling(ResultPartitionType.PIPELINED_APPROXIMATE);
    }

    private static void testReleaseOnConsumptionHandling(ResultPartitionType resultPartitionType) {
        JobMasterPartitionTrackerImpl partitionTracker = new JobMasterPartitionTrackerImpl(new JobID(), (ShuffleMaster)new TestingShuffleMaster(), ignored -> Optional.empty());
        ResourceID resourceId = ResourceID.generate();
        ResultPartitionID resultPartitionId = new ResultPartitionID();
        partitionTracker.startTrackingPartition(resourceId, AbstractPartitionTrackerTest.createResultPartitionDeploymentDescriptor(resultPartitionId, resultPartitionType, true));
        MatcherAssert.assertThat((Object)partitionTracker.isTrackingPartitionsFor((Object)resourceId), (Matcher)Matchers.is((Object)resultPartitionType.isReconnectable()));
    }

    @Test
    public void testReleaseCallsWithLocalResources() {
        TestingShuffleMaster shuffleMaster = new TestingShuffleMaster();
        JobID jobId = new JobID();
        ArrayBlockingQueue releaseCalls = new ArrayBlockingQueue(4);
        JobMasterPartitionTrackerImpl partitionTracker = new JobMasterPartitionTrackerImpl(jobId, (ShuffleMaster)shuffleMaster, tmId -> Optional.of(JobMasterPartitionTrackerImplTest.createTaskExecutorGateway(tmId, releaseCalls)));
        ResourceID tmId2 = ResourceID.generate();
        ResultPartitionID resultPartitionId = new ResultPartitionID();
        partitionTracker.startTrackingPartition(tmId2, AbstractPartitionTrackerTest.createResultPartitionDeploymentDescriptor(resultPartitionId, true));
        MatcherAssert.assertThat((Object)partitionTracker.isTrackingPartitionsFor((Object)tmId2), (Matcher)Matchers.is((Object)true));
        partitionTracker.stopTrackingAndReleasePartitions(Arrays.asList(resultPartitionId));
        Assert.assertEquals((long)1L, (long)releaseCalls.size());
        ReleaseCall releaseCall = (ReleaseCall)releaseCalls.remove();
        Assert.assertEquals((Object)tmId2, (Object)releaseCall.getTaskExecutorId());
        Assert.assertEquals((Object)jobId, (Object)releaseCall.getJobId());
        MatcherAssert.assertThat(releaseCall.getReleasedPartitions(), (Matcher)Matchers.contains((Object[])new ResultPartitionID[]{resultPartitionId}));
        MatcherAssert.assertThat(releaseCall.getPromotedPartitions(), (Matcher)Matchers.is((Matcher)Matchers.empty()));
        Assert.assertEquals((long)1L, (long)shuffleMaster.externallyReleasedPartitions.size());
        Assert.assertEquals((Object)resultPartitionId, (Object)shuffleMaster.externallyReleasedPartitions.remove());
        MatcherAssert.assertThat((Object)partitionTracker.isTrackingPartitionsFor((Object)tmId2), (Matcher)Matchers.is((Object)false));
    }

    @Test
    public void testReleaseCallsWithoutLocalResources() {
        TestingShuffleMaster shuffleMaster = new TestingShuffleMaster();
        ArrayBlockingQueue releaseCalls = new ArrayBlockingQueue(4);
        JobMasterPartitionTrackerImpl partitionTracker = new JobMasterPartitionTrackerImpl(new JobID(), (ShuffleMaster)shuffleMaster, tmId -> Optional.of(JobMasterPartitionTrackerImplTest.createTaskExecutorGateway(tmId, releaseCalls)));
        ResourceID tmId2 = ResourceID.generate();
        ResultPartitionID resultPartitionId = new ResultPartitionID();
        partitionTracker.startTrackingPartition(tmId2, AbstractPartitionTrackerTest.createResultPartitionDeploymentDescriptor(resultPartitionId, false));
        MatcherAssert.assertThat((Object)partitionTracker.isTrackingPartitionsFor((Object)tmId2), (Matcher)Matchers.is((Object)false));
        partitionTracker.stopTrackingAndReleasePartitions(Arrays.asList(resultPartitionId));
        Assert.assertEquals((long)0L, (long)releaseCalls.size());
        Assert.assertEquals((long)1L, (long)shuffleMaster.externallyReleasedPartitions.size());
        MatcherAssert.assertThat(shuffleMaster.externallyReleasedPartitions, (Matcher)Matchers.contains((Object[])new ResultPartitionID[]{resultPartitionId}));
    }

    @Test
    public void testStopTrackingIssuesNoReleaseCalls() {
        TestingShuffleMaster shuffleMaster = new TestingShuffleMaster();
        ArrayBlockingQueue taskExecutorReleaseCalls = new ArrayBlockingQueue(4);
        JobMasterPartitionTrackerImpl partitionTracker = new JobMasterPartitionTrackerImpl(new JobID(), (ShuffleMaster)shuffleMaster, resourceId -> Optional.of(JobMasterPartitionTrackerImplTest.createTaskExecutorGateway(resourceId, taskExecutorReleaseCalls)));
        ResourceID taskExecutorId1 = ResourceID.generate();
        ResultPartitionID resultPartitionId1 = new ResultPartitionID();
        partitionTracker.startTrackingPartition(taskExecutorId1, AbstractPartitionTrackerTest.createResultPartitionDeploymentDescriptor(resultPartitionId1, true));
        partitionTracker.stopTrackingPartitionsFor((Object)taskExecutorId1);
        Assert.assertEquals((long)0L, (long)taskExecutorReleaseCalls.size());
        Assert.assertEquals((long)0L, (long)shuffleMaster.externallyReleasedPartitions.size());
    }

    @Test
    public void testTrackingInternalAndExternalPartitionsByTmId() {
        TestingShuffleMaster shuffleMaster = new TestingShuffleMaster();
        ArrayBlockingQueue taskExecutorReleaseCalls = new ArrayBlockingQueue(4);
        JobMasterPartitionTrackerImpl partitionTracker = new JobMasterPartitionTrackerImpl(new JobID(), (ShuffleMaster)shuffleMaster, resourceId -> Optional.of(JobMasterPartitionTrackerImplTest.createTaskExecutorGateway(resourceId, taskExecutorReleaseCalls)));
        ResourceID taskExecutorId = ResourceID.generate();
        ResultPartitionID resultPartitionId1 = new ResultPartitionID();
        ResultPartitionID resultPartitionId2 = new ResultPartitionID();
        partitionTracker.startTrackingPartition(taskExecutorId, AbstractPartitionTrackerTest.createResultPartitionDeploymentDescriptor(resultPartitionId2, false));
        MatcherAssert.assertThat((Object)partitionTracker.isTrackingPartitionsFor((Object)taskExecutorId), (Matcher)Matchers.is((Object)false));
        partitionTracker.startTrackingPartition(taskExecutorId, AbstractPartitionTrackerTest.createResultPartitionDeploymentDescriptor(resultPartitionId1, true));
        MatcherAssert.assertThat((Object)partitionTracker.isTrackingPartitionsFor((Object)taskExecutorId), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat(partitionTracker.getAllTrackedPartitions().stream().map(desc -> desc.getShuffleDescriptor().getResultPartitionID()).collect(Collectors.toList()), (Matcher)Matchers.containsInAnyOrder((Object[])new ResultPartitionID[]{resultPartitionId1, resultPartitionId2}));
        partitionTracker.stopTrackingPartitionsFor((Object)taskExecutorId);
        MatcherAssert.assertThat((Object)partitionTracker.isTrackingPartitionsFor((Object)taskExecutorId), (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)partitionTracker.isPartitionTracked(resultPartitionId1), (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)partitionTracker.isPartitionTracked(resultPartitionId2), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)((ResultPartitionDeploymentDescriptor)Iterables.getOnlyElement((Iterable)partitionTracker.getAllTrackedPartitions())).getShuffleDescriptor().getResultPartitionID(), (Matcher)Matchers.is((Object)resultPartitionId2));
    }

    @Test
    public void testReleaseOrPromote() {
        TestingShuffleMaster shuffleMaster = new TestingShuffleMaster();
        ArrayBlockingQueue taskExecutorReleaseOrPromoteCalls = new ArrayBlockingQueue(4);
        JobMasterPartitionTrackerImpl partitionTracker = new JobMasterPartitionTrackerImpl(new JobID(), (ShuffleMaster)shuffleMaster, resourceId -> Optional.of(JobMasterPartitionTrackerImplTest.createTaskExecutorGateway(resourceId, taskExecutorReleaseOrPromoteCalls)));
        ResourceID taskExecutorId1 = ResourceID.generate();
        ResultPartitionID jobPartitionId0 = new ResultPartitionID();
        ResultPartitionID jobPartitionId1 = new ResultPartitionID();
        ResultPartitionID clusterPartitionId0 = new ResultPartitionID();
        ResultPartitionID clusterPartitionId1 = new ResultPartitionID();
        ResultPartitionDeploymentDescriptor jobPartition0 = AbstractPartitionTrackerTest.createResultPartitionDeploymentDescriptor(jobPartitionId0, ResultPartitionType.BLOCKING, true);
        partitionTracker.startTrackingPartition(taskExecutorId1, jobPartition0);
        ResultPartitionDeploymentDescriptor jobPartition1 = AbstractPartitionTrackerTest.createResultPartitionDeploymentDescriptor(jobPartitionId1, ResultPartitionType.BLOCKING, false);
        partitionTracker.startTrackingPartition(taskExecutorId1, jobPartition1);
        ResultPartitionDeploymentDescriptor clusterPartition0 = AbstractPartitionTrackerTest.createResultPartitionDeploymentDescriptor(clusterPartitionId0, ResultPartitionType.BLOCKING_PERSISTENT, true);
        partitionTracker.startTrackingPartition(taskExecutorId1, clusterPartition0);
        ResultPartitionDeploymentDescriptor clusterPartition1 = AbstractPartitionTrackerTest.createResultPartitionDeploymentDescriptor(clusterPartitionId1, ResultPartitionType.BLOCKING_PERSISTENT, false);
        partitionTracker.startTrackingPartition(taskExecutorId1, clusterPartition1);
        partitionTracker.stopTrackingAndReleaseOrPromotePartitions(Arrays.asList(jobPartitionId0, jobPartitionId1, clusterPartitionId0, clusterPartitionId1));
        Assert.assertEquals((long)1L, (long)taskExecutorReleaseOrPromoteCalls.size());
        ReleaseCall taskExecutorReleaseOrPromoteCall = (ReleaseCall)taskExecutorReleaseOrPromoteCalls.remove();
        Assert.assertEquals((Object)jobPartitionId0, (Object)Iterables.getOnlyElement(taskExecutorReleaseOrPromoteCall.getReleasedPartitions()));
        Assert.assertEquals((Object)clusterPartitionId0, (Object)Iterables.getOnlyElement(taskExecutorReleaseOrPromoteCall.getPromotedPartitions()));
        ArrayList<ResultPartitionID> externallyReleasedPartitions = new ArrayList<ResultPartitionID>(shuffleMaster.externallyReleasedPartitions);
        MatcherAssert.assertThat(externallyReleasedPartitions, (Matcher)Matchers.containsInAnyOrder((Object[])new ResultPartitionID[]{jobPartitionId0, jobPartitionId1}));
    }

    private static TaskExecutorGateway createTaskExecutorGateway(ResourceID taskExecutorId, Collection<ReleaseCall> releaseOrPromoteCalls) {
        return new TestingTaskExecutorGatewayBuilder().setReleaseOrPromotePartitionsConsumer((TriConsumer<JobID, Set<ResultPartitionID>, Set<ResultPartitionID>>)((TriConsumer)(jobId, partitionsToRelease, partitionsToPromote) -> releaseOrPromoteCalls.add(new ReleaseCall(taskExecutorId, (JobID)jobId, (Collection)partitionsToRelease, (Collection)partitionsToPromote)))).createTestingTaskExecutorGateway();
    }

    private static class ReleaseCall {
        private final ResourceID taskExecutorId;
        private final JobID jobId;
        private final Collection<ResultPartitionID> releasedPartitions;
        private final Collection<ResultPartitionID> promotedPartitions;

        private ReleaseCall(ResourceID taskExecutorId, JobID jobId, Collection<ResultPartitionID> releasedPartitions, Collection<ResultPartitionID> promotedPartitions) {
            this.taskExecutorId = taskExecutorId;
            this.jobId = jobId;
            this.releasedPartitions = releasedPartitions;
            this.promotedPartitions = promotedPartitions;
        }

        public ResourceID getTaskExecutorId() {
            return this.taskExecutorId;
        }

        public JobID getJobId() {
            return this.jobId;
        }

        public Collection<ResultPartitionID> getReleasedPartitions() {
            return this.releasedPartitions;
        }

        public Collection<ResultPartitionID> getPromotedPartitions() {
            return this.promotedPartitions;
        }
    }

    private static class TestingShuffleMaster
    implements ShuffleMaster<ShuffleDescriptor> {
        final Queue<ResultPartitionID> externallyReleasedPartitions = new ArrayBlockingQueue<ResultPartitionID>(4);

        private TestingShuffleMaster() {
        }

        public CompletableFuture<ShuffleDescriptor> registerPartitionWithProducer(JobID jobID, PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor) {
            return null;
        }

        public void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor) {
            this.externallyReleasedPartitions.add(shuffleDescriptor.getResultPartitionID());
        }
    }
}

