/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.io.network.partition.AbstractPartitionTrackerTest;
import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTrackerImpl;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.shuffle.PartitionDescriptor;
import org.apache.flink.runtime.shuffle.ProducerDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.TriConsumer;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

public class JobMasterPartitionTrackerImplTest
extends TestLogger {
    @Test
    public void testPipelinedPartitionIsNotTracked() {
        JobMasterPartitionTrackerImplTest.testReleaseOnConsumptionHandling(ResultPartitionType.PIPELINED);
    }

    @Test
    public void testBlockingPartitionIsTracked() {
        JobMasterPartitionTrackerImplTest.testReleaseOnConsumptionHandling(ResultPartitionType.BLOCKING);
    }

    @Test
    public void testPipelinedApproximatePartitionIsTracked() {
        JobMasterPartitionTrackerImplTest.testReleaseOnConsumptionHandling(ResultPartitionType.PIPELINED_APPROXIMATE);
    }

    private static void testReleaseOnConsumptionHandling(ResultPartitionType resultPartitionType) {
        JobMasterPartitionTrackerImpl partitionTracker = new JobMasterPartitionTrackerImpl(new JobID(), (ShuffleMaster)new TestingShuffleMaster(), ignored -> Optional.empty());
        ResourceID resourceId = ResourceID.generate();
        ResultPartitionID resultPartitionId = new ResultPartitionID();
        partitionTracker.startTrackingPartition(resourceId, AbstractPartitionTrackerTest.createResultPartitionDeploymentDescriptor(resultPartitionId, resultPartitionType, false));
        MatcherAssert.assertThat((Object)partitionTracker.isTrackingPartitionsFor((Object)resourceId), (Matcher)CoreMatchers.is((Object)resultPartitionType.isReconnectable()));
    }

    @Test
    public void testReleaseCallsWithLocalResources() {
        TestingShuffleMaster shuffleMaster = new TestingShuffleMaster();
        JobID jobId = new JobID();
        ArrayBlockingQueue taskExecutorReleaseCalls = new ArrayBlockingQueue(4);
        JobMasterPartitionTrackerImpl partitionTracker = new JobMasterPartitionTrackerImpl(jobId, (ShuffleMaster)shuffleMaster, resourceId -> Optional.of(JobMasterPartitionTrackerImplTest.createTaskExecutorGateway(resourceId, taskExecutorReleaseCalls)));
        ResourceID taskExecutorId1 = ResourceID.generate();
        ResourceID taskExecutorId2 = ResourceID.generate();
        ResultPartitionID resultPartitionId1 = new ResultPartitionID();
        ResultPartitionID resultPartitionId2 = new ResultPartitionID();
        partitionTracker.startTrackingPartition(taskExecutorId1, AbstractPartitionTrackerTest.createResultPartitionDeploymentDescriptor(resultPartitionId1, true));
        partitionTracker.startTrackingPartition(taskExecutorId2, AbstractPartitionTrackerTest.createResultPartitionDeploymentDescriptor(resultPartitionId2, true));
        partitionTracker.stopTrackingAndReleasePartitionsFor(taskExecutorId1);
        Assert.assertEquals((long)1L, (long)taskExecutorReleaseCalls.size());
        ReleaseCall taskExecutorReleaseCall = (ReleaseCall)taskExecutorReleaseCalls.remove();
        Assert.assertEquals((Object)taskExecutorId1, (Object)taskExecutorReleaseCall.getTaskExecutorId());
        Assert.assertEquals((Object)jobId, (Object)taskExecutorReleaseCall.getJobId());
        MatcherAssert.assertThat(taskExecutorReleaseCall.getReleasedPartitions(), (Matcher)Matchers.contains((Object[])new ResultPartitionID[]{resultPartitionId1}));
        MatcherAssert.assertThat(taskExecutorReleaseCall.getPromotedPartitions(), (Matcher)CoreMatchers.is((Matcher)Matchers.empty()));
        Assert.assertEquals((long)1L, (long)shuffleMaster.externallyReleasedPartitions.size());
        Assert.assertEquals((Object)resultPartitionId1, (Object)shuffleMaster.externallyReleasedPartitions.remove());
        MatcherAssert.assertThat((Object)partitionTracker.isTrackingPartitionsFor((Object)taskExecutorId1), (Matcher)CoreMatchers.is((Object)false));
        partitionTracker.stopTrackingAndReleasePartitions(Collections.singletonList(resultPartitionId2));
        Assert.assertEquals((long)1L, (long)taskExecutorReleaseCalls.size());
        ReleaseCall releaseCall = (ReleaseCall)taskExecutorReleaseCalls.remove();
        Assert.assertEquals((Object)taskExecutorId2, (Object)releaseCall.getTaskExecutorId());
        Assert.assertEquals((Object)jobId, (Object)releaseCall.getJobId());
        MatcherAssert.assertThat(releaseCall.getReleasedPartitions(), (Matcher)Matchers.contains((Object[])new ResultPartitionID[]{resultPartitionId2}));
        MatcherAssert.assertThat(releaseCall.getPromotedPartitions(), (Matcher)CoreMatchers.is((Matcher)Matchers.empty()));
        Assert.assertEquals((long)1L, (long)shuffleMaster.externallyReleasedPartitions.size());
        Assert.assertEquals((Object)resultPartitionId2, (Object)shuffleMaster.externallyReleasedPartitions.remove());
        MatcherAssert.assertThat((Object)partitionTracker.isTrackingPartitionsFor((Object)taskExecutorId2), (Matcher)CoreMatchers.is((Object)false));
    }

    @Test
    public void testReleaseCallsWithoutLocalResources() {
        TestingShuffleMaster shuffleMaster = new TestingShuffleMaster();
        ArrayBlockingQueue taskExecutorReleaseCalls = new ArrayBlockingQueue(4);
        JobMasterPartitionTrackerImpl partitionTracker = new JobMasterPartitionTrackerImpl(new JobID(), (ShuffleMaster)shuffleMaster, resourceId -> Optional.of(JobMasterPartitionTrackerImplTest.createTaskExecutorGateway(resourceId, taskExecutorReleaseCalls)));
        ResourceID taskExecutorId1 = ResourceID.generate();
        ResourceID taskExecutorId2 = ResourceID.generate();
        ResultPartitionID resultPartitionId1 = new ResultPartitionID();
        ResultPartitionID resultPartitionId2 = new ResultPartitionID();
        partitionTracker.startTrackingPartition(taskExecutorId1, AbstractPartitionTrackerTest.createResultPartitionDeploymentDescriptor(resultPartitionId1, false));
        partitionTracker.startTrackingPartition(taskExecutorId2, AbstractPartitionTrackerTest.createResultPartitionDeploymentDescriptor(resultPartitionId2, false));
        partitionTracker.stopTrackingAndReleasePartitionsFor(taskExecutorId1);
        Assert.assertEquals((long)0L, (long)taskExecutorReleaseCalls.size());
        Assert.assertEquals((long)1L, (long)shuffleMaster.externallyReleasedPartitions.size());
        Assert.assertEquals((Object)resultPartitionId1, (Object)shuffleMaster.externallyReleasedPartitions.remove());
        MatcherAssert.assertThat((Object)partitionTracker.isTrackingPartitionsFor((Object)taskExecutorId1), (Matcher)CoreMatchers.is((Object)false));
        partitionTracker.stopTrackingAndReleasePartitions(Collections.singletonList(resultPartitionId2));
        Assert.assertEquals((long)0L, (long)taskExecutorReleaseCalls.size());
        Assert.assertEquals((long)1L, (long)shuffleMaster.externallyReleasedPartitions.size());
        Assert.assertEquals((Object)resultPartitionId2, (Object)shuffleMaster.externallyReleasedPartitions.remove());
        MatcherAssert.assertThat((Object)partitionTracker.isTrackingPartitionsFor((Object)taskExecutorId2), (Matcher)CoreMatchers.is((Object)false));
    }

    @Test
    public void testStopTrackingIssuesNoReleaseCalls() {
        TestingShuffleMaster shuffleMaster = new TestingShuffleMaster();
        ArrayBlockingQueue taskExecutorReleaseCalls = new ArrayBlockingQueue(4);
        JobMasterPartitionTrackerImpl partitionTracker = new JobMasterPartitionTrackerImpl(new JobID(), (ShuffleMaster)shuffleMaster, resourceId -> Optional.of(JobMasterPartitionTrackerImplTest.createTaskExecutorGateway(resourceId, taskExecutorReleaseCalls)));
        ResourceID taskExecutorId1 = ResourceID.generate();
        ResultPartitionID resultPartitionId1 = new ResultPartitionID();
        partitionTracker.startTrackingPartition(taskExecutorId1, AbstractPartitionTrackerTest.createResultPartitionDeploymentDescriptor(resultPartitionId1, true));
        partitionTracker.stopTrackingPartitionsFor((Object)taskExecutorId1);
        Assert.assertEquals((long)0L, (long)taskExecutorReleaseCalls.size());
        Assert.assertEquals((long)0L, (long)shuffleMaster.externallyReleasedPartitions.size());
    }

    @Test
    public void testReleaseOrPromote() {
        TestingShuffleMaster shuffleMaster = new TestingShuffleMaster();
        ArrayBlockingQueue taskExecutorReleaseCalls = new ArrayBlockingQueue(4);
        JobMasterPartitionTrackerImpl partitionTracker = new JobMasterPartitionTrackerImpl(new JobID(), (ShuffleMaster)shuffleMaster, resourceId -> Optional.of(JobMasterPartitionTrackerImplTest.createTaskExecutorGateway(resourceId, taskExecutorReleaseCalls)));
        ResourceID taskExecutorId1 = ResourceID.generate();
        ResultPartitionID jobPartitionId = new ResultPartitionID();
        ResultPartitionID clusterPartitionId = new ResultPartitionID();
        ResultPartitionDeploymentDescriptor jobPartition = AbstractPartitionTrackerTest.createResultPartitionDeploymentDescriptor(jobPartitionId, ResultPartitionType.BLOCKING, true);
        partitionTracker.startTrackingPartition(taskExecutorId1, jobPartition);
        ResultPartitionDeploymentDescriptor clusterPartition = AbstractPartitionTrackerTest.createResultPartitionDeploymentDescriptor(clusterPartitionId, ResultPartitionType.BLOCKING_PERSISTENT, true);
        partitionTracker.startTrackingPartition(taskExecutorId1, clusterPartition);
        partitionTracker.stopTrackingAndReleaseOrPromotePartitionsFor(taskExecutorId1);
        Assert.assertEquals((long)1L, (long)taskExecutorReleaseCalls.size());
        Assert.assertEquals((long)1L, (long)shuffleMaster.externallyReleasedPartitions.size());
        Assert.assertEquals((Object)jobPartitionId, (Object)shuffleMaster.externallyReleasedPartitions.remove());
        ReleaseCall taskExecutorReleaseOrPromoteCall = (ReleaseCall)taskExecutorReleaseCalls.remove();
        Assert.assertEquals((Object)jobPartitionId, (Object)Iterables.getOnlyElement(taskExecutorReleaseOrPromoteCall.getReleasedPartitions()));
        Assert.assertEquals((Object)clusterPartitionId, (Object)Iterables.getOnlyElement(taskExecutorReleaseOrPromoteCall.getPromotedPartitions()));
    }

    private static TaskExecutorGateway createTaskExecutorGateway(ResourceID taskExecutorId, Collection<ReleaseCall> releaseCalls) {
        return new TestingTaskExecutorGatewayBuilder().setReleaseOrPromotePartitionsConsumer((TriConsumer<JobID, Set<ResultPartitionID>, Set<ResultPartitionID>>)((TriConsumer)(jobId, partitionToRelease, partitionsToPromote) -> releaseCalls.add(new ReleaseCall(taskExecutorId, (JobID)jobId, (Collection)partitionToRelease, (Collection)partitionsToPromote)))).createTestingTaskExecutorGateway();
    }

    private static class ReleaseCall {
        private final ResourceID taskExecutorId;
        private final JobID jobId;
        private final Collection<ResultPartitionID> releasedPartitions;
        private final Collection<ResultPartitionID> promotedPartitions;

        private ReleaseCall(ResourceID taskExecutorId, JobID jobId, Collection<ResultPartitionID> releasedPartitions, Collection<ResultPartitionID> promotedPartitions) {
            this.taskExecutorId = taskExecutorId;
            this.jobId = jobId;
            this.releasedPartitions = releasedPartitions;
            this.promotedPartitions = promotedPartitions;
        }

        public ResourceID getTaskExecutorId() {
            return this.taskExecutorId;
        }

        public JobID getJobId() {
            return this.jobId;
        }

        public Collection<ResultPartitionID> getReleasedPartitions() {
            return this.releasedPartitions;
        }

        public Collection<ResultPartitionID> getPromotedPartitions() {
            return this.promotedPartitions;
        }
    }

    private static class TestingShuffleMaster
    implements ShuffleMaster<ShuffleDescriptor> {
        final Queue<ResultPartitionID> externallyReleasedPartitions = new ArrayBlockingQueue<ResultPartitionID>(4);

        private TestingShuffleMaster() {
        }

        public CompletableFuture<ShuffleDescriptor> registerPartitionWithProducer(PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor) {
            return null;
        }

        public void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor) {
            this.externallyReleasedPartitions.add(shuffleDescriptor.getResultPartitionID());
        }
    }
}

