/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.source.coordinator;

import java.net.URL;
import java.net.URLClassLoader;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.WatermarkAlignmentParams;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.DynamicFilteringInfo;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.api.connector.source.mocks.MockSplitEnumerator;
import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorCheckpointSerializer;
import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.ComponentClosingUtils;
import org.apache.flink.runtime.operators.coordination.CoordinatorStore;
import org.apache.flink.runtime.operators.coordination.CoordinatorStoreImpl;
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils;
import org.apache.flink.runtime.source.coordinator.SourceCoordinator;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorTestBase;
import org.apache.flink.runtime.source.coordinator.TestingSplitEnumerator;
import org.apache.flink.runtime.source.event.NoMoreSplitsEvent;
import org.apache.flink.runtime.source.event.RequestSplitEvent;
import org.apache.flink.runtime.source.event.SourceEventWrapper;
import org.apache.flink.util.function.ThrowingRunnable;
import org.assertj.core.api.AbstractCollectionAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.MapAssert;
import org.assertj.core.api.ObjectAssert;
import org.junit.jupiter.api.Test;

class SourceCoordinatorTest
extends SourceCoordinatorTestBase {
    SourceCoordinatorTest() {
    }

    @Test
    void testThrowExceptionWhenNotStarted() {
        String failureMessage = "Call should fail when source coordinator has not started yet.";
        CoordinatorTestUtils.verifyException((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> this.sourceCoordinator.notifyCheckpointComplete(100L)), failureMessage, "The coordinator has not started yet.");
        CoordinatorTestUtils.verifyException((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> this.sourceCoordinator.handleEventFromOperator(0, 0, null)), failureMessage, "The coordinator has not started yet.");
        CoordinatorTestUtils.verifyException((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> this.sourceCoordinator.executionAttemptFailed(0, 0, null)), failureMessage, "The coordinator has not started yet.");
        CoordinatorTestUtils.verifyException((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> this.sourceCoordinator.checkpointCoordinator(100L, new CompletableFuture())), failureMessage, "The coordinator has not started yet.");
    }

    @Test
    void testRestCheckpointAfterCoordinatorStarted() throws Exception {
        this.sourceCoordinator.start();
        CoordinatorTestUtils.verifyException((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> this.sourceCoordinator.resetToCheckpoint(0L, null)), "Reset to checkpoint should fail after the coordinator has started", "The coordinator can only be reset if it was not yet started");
    }

    @Test
    void testStart() throws Exception {
        this.sourceCoordinator.start();
        this.waitForCoordinatorToProcessActions();
        Assertions.assertThat((boolean)this.getEnumerator().isStarted()).isTrue();
    }

    @Test
    void testClosed() throws Exception {
        this.sourceCoordinator.start();
        this.sourceCoordinator.close();
        Assertions.assertThat((boolean)this.getEnumerator().isClosed()).isTrue();
        Assertions.assertThat((boolean)this.sourceCoordinator.getContext().isClosed()).isTrue();
    }

    @Test
    void testClosedWithoutStart() throws Exception {
        this.sourceCoordinator.close();
        Assertions.assertThat((boolean)this.sourceCoordinator.getContext().isClosed()).isTrue();
    }

    @Test
    void testHandleSourceEvent() throws Exception {
        this.sourceReady();
        SourceEvent sourceEvent = new SourceEvent(){};
        this.sourceCoordinator.handleEventFromOperator(0, 0, (OperatorEvent)new SourceEventWrapper(sourceEvent));
        this.waitForCoordinatorToProcessActions();
        Assertions.assertThat(this.getEnumerator().getHandledSourceEvent()).hasSize(1);
        Assertions.assertThat((Object)this.getEnumerator().getHandledSourceEvent().get(0)).isEqualTo((Object)sourceEvent);
    }

    @Test
    void testCheckpointCoordinatorAndRestore() throws Exception {
        this.sourceReady();
        this.addTestingSplitSet(6);
        this.registerReader(0);
        this.getEnumerator().executeAssignOneSplit(0);
        this.getEnumerator().executeAssignOneSplit(0);
        CompletableFuture checkpointFuture = new CompletableFuture();
        this.sourceCoordinator.checkpointCoordinator(100L, checkpointFuture);
        byte[] bytes = (byte[])checkpointFuture.get();
        SourceCoordinator<MockSourceSplit, Set<MockSourceSplit>> restoredCoordinator = this.getNewSourceCoordinator();
        restoredCoordinator.resetToCheckpoint(100L, bytes);
        TestingSplitEnumerator restoredEnumerator = (TestingSplitEnumerator)restoredCoordinator.getEnumerator();
        SourceCoordinatorContext restoredContext = restoredCoordinator.getContext();
        ((AbstractCollectionAssert)Assertions.assertThat(restoredEnumerator.getUnassignedSplits()).as("2 splits should have been assigned to reader 0", new Object[0])).hasSize(4);
        Assertions.assertThat((Map)restoredEnumerator.getContext().registeredReaders()).isEmpty();
        ((MapAssert)Assertions.assertThat((Map)restoredContext.registeredReaders()).as("Registered readers should not be recovered by restoring", new Object[0])).isEmpty();
    }

    @Test
    void testBatchSnapshotCoordinatorAndRestore() throws Exception {
        this.sourceReady();
        this.addTestingSplitSet(6);
        this.registerReader(0);
        this.getEnumerator().executeAssignOneSplit(0);
        this.getEnumerator().executeAssignOneSplit(0);
        CompletableFuture checkpointFuture = new CompletableFuture();
        this.sourceCoordinator.checkpointCoordinator(-1L, checkpointFuture);
        byte[] bytes = (byte[])checkpointFuture.get();
        SourceCoordinator<MockSourceSplit, Set<MockSourceSplit>> restoredCoordinator = this.getNewSourceCoordinator();
        restoredCoordinator.resetToCheckpoint(-1L, bytes);
        TestingSplitEnumerator restoredEnumerator = (TestingSplitEnumerator)restoredCoordinator.getEnumerator();
        SourceCoordinatorContext restoredContext = restoredCoordinator.getContext();
        ((AbstractCollectionAssert)Assertions.assertThat(restoredEnumerator.getUnassignedSplits()).as("2 splits should have been assigned to reader 0", new Object[0])).hasSize(4);
        Assertions.assertThat((Map)restoredEnumerator.getContext().registeredReaders()).isEmpty();
        ((MapAssert)Assertions.assertThat((Map)restoredContext.registeredReaders()).as("Registered readers should not be recovered by restoring", new Object[0])).isEmpty();
        Assertions.assertThat((Map)restoredContext.getAssignmentTracker().uncheckpointedAssignments()).isEqualTo((Object)this.sourceCoordinator.getContext().getAssignmentTracker().uncheckpointedAssignments());
    }

    @Test
    void testSubtaskFailedAndRevertUncompletedAssignments() throws Exception {
        this.sourceReady();
        this.addTestingSplitSet(6);
        this.registerReader(0);
        this.getEnumerator().executeAssignOneSplit(0);
        this.getEnumerator().executeAssignOneSplit(0);
        this.sourceCoordinator.checkpointCoordinator(100L, new CompletableFuture());
        this.getEnumerator().addNewSplits((SourceSplit[])new MockSourceSplit[]{new MockSourceSplit(6)});
        this.getEnumerator().executeAssignOneSplit(0);
        this.sourceCoordinator.checkpointCoordinator(101L, new CompletableFuture());
        this.waitForCoordinatorToProcessActions();
        Assertions.assertThat(this.getEnumerator().getUnassignedSplits()).hasSize(4);
        Assertions.assertThat((Map)this.splitSplitAssignmentTracker.uncheckpointedAssignments()).isEmpty();
        CoordinatorTestUtils.verifyAssignment(Arrays.asList("0", "1"), (Collection)((Map)this.splitSplitAssignmentTracker.assignmentsByCheckpointId().get(100L)).get(0));
        CoordinatorTestUtils.verifyAssignment(Collections.singletonList("2"), (Collection)this.splitSplitAssignmentTracker.assignmentsByCheckpointId(101L).get(0));
        this.sourceCoordinator.executionAttemptFailed(0, 0, null);
        this.sourceCoordinator.subtaskReset(0, 99L);
        this.waitForCoordinatorToProcessActions();
        ((MapAssert)Assertions.assertThat((Map)this.context.registeredReaders()).as("Reader 0 should have been unregistered.", new Object[0])).doesNotContainKey((Object)0);
        for (Map assignment : this.splitSplitAssignmentTracker.assignmentsByCheckpointId().values()) {
            ((MapAssert)Assertions.assertThat((Map)assignment).as("Assignment in uncompleted checkpoint should have been reverted.", new Object[0])).doesNotContainKey((Object)0);
        }
        Assertions.assertThat((Map)this.splitSplitAssignmentTracker.uncheckpointedAssignments()).doesNotContainKey((Object)0);
        Assertions.assertThat(this.getEnumerator().getUnassignedSplits()).hasSize(7);
    }

    @Test
    void testFailedSubtaskDoNotRevertCompletedCheckpoint() throws Exception {
        this.sourceReady();
        this.addTestingSplitSet(6);
        this.registerReader(0);
        this.getEnumerator().executeAssignOneSplit(0);
        this.getEnumerator().executeAssignOneSplit(0);
        this.sourceCoordinator.checkpointCoordinator(100L, new CompletableFuture());
        this.sourceCoordinator.notifyCheckpointComplete(100L);
        this.sourceCoordinator.executionAttemptFailed(0, 0, null);
        this.waitForCoordinatorToProcessActions();
        Assertions.assertThat((Long)this.getEnumerator().getSuccessfulCheckpoints().get(0)).isEqualTo(100L);
        Assertions.assertThat((Map)this.context.registeredReaders()).doesNotContainKey((Object)0);
        Assertions.assertThat(this.getEnumerator().getUnassignedSplits()).hasSize(4);
        Assertions.assertThat((Map)this.splitSplitAssignmentTracker.uncheckpointedAssignments()).doesNotContainKey((Object)0);
        Assertions.assertThat((Map)this.splitSplitAssignmentTracker.assignmentsByCheckpointId()).isEmpty();
    }

    @Test
    void testFailJobWhenExceptionThrownFromStart() throws Exception {
        final RuntimeException failureReason = new RuntimeException("Artificial Exception");
        try (MockSplitEnumeratorContext enumeratorContext = new MockSplitEnumeratorContext(1);
             MockSplitEnumerator splitEnumerator = new MockSplitEnumerator(1, (SplitEnumeratorContext)enumeratorContext){

            public void start() {
                throw failureReason;
            }
        };
             SourceCoordinator coordinator = new SourceCoordinator(new JobID(), "TestOperator", new EnumeratorCreatingSource(() -> SourceCoordinatorTest.lambda$testFailJobWhenExceptionThrownFromStart$5((SplitEnumerator)splitEnumerator)), this.context, (CoordinatorStore)new CoordinatorStoreImpl(), WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED, null);){
            coordinator.start();
            CommonTestUtils.waitUtil(() -> this.operatorCoordinatorContext.isJobFailed(), (Duration)Duration.ofSeconds(10L), (String)"The job should have failed due to the artificial exception.");
            Assertions.assertThat((Throwable)this.operatorCoordinatorContext.getJobFailureReason()).isEqualTo((Object)failureReason);
        }
    }

    @Test
    void testFailJobWhenExceptionThrownFromEnumeratorCreation() throws Exception {
        RuntimeException failureReason = new RuntimeException("Artificial Exception");
        SourceCoordinator coordinator = new SourceCoordinator(new JobID(), "TestOperator", new EnumeratorCreatingSource(() -> {
            throw failureReason;
        }), this.context, (CoordinatorStore)new CoordinatorStoreImpl(), WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED, null);
        coordinator.start();
        Assertions.assertThat((boolean)this.operatorCoordinatorContext.isJobFailed()).isTrue();
        Assertions.assertThat((Throwable)this.operatorCoordinatorContext.getJobFailureReason()).isEqualTo((Object)failureReason);
    }

    @Test
    void testErrorThrownFromSplitEnumerator() throws Exception {
        final Error error = new Error("Test Error");
        try (MockSplitEnumeratorContext enumeratorContext = new MockSplitEnumeratorContext(1);
             MockSplitEnumerator splitEnumerator = new MockSplitEnumerator(1, (SplitEnumeratorContext)enumeratorContext){

            public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
                throw error;
            }
        };
             SourceCoordinator coordinator = new SourceCoordinator(new JobID(), "TestOperator", new EnumeratorCreatingSource(() -> SourceCoordinatorTest.lambda$testErrorThrownFromSplitEnumerator$8((SplitEnumerator)splitEnumerator)), this.context, (CoordinatorStore)new CoordinatorStoreImpl(), WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED, null);){
            coordinator.start();
            coordinator.handleEventFromOperator(1, 0, (OperatorEvent)new SourceEventWrapper(new SourceEvent(){}));
            CommonTestUtils.waitUtil(() -> this.operatorCoordinatorContext.isJobFailed(), (Duration)Duration.ofSeconds(10L), (String)"The job should have failed due to the artificial exception.");
            Assertions.assertThat((Throwable)this.operatorCoordinatorContext.getJobFailureReason()).isEqualTo((Object)error);
        }
    }

    @Test
    void testBlockOnClose() throws Exception {
        final CountDownLatch latch = new CountDownLatch(1);
        try (MockSplitEnumeratorContext enumeratorContext = new MockSplitEnumeratorContext(1);
             MockSplitEnumerator splitEnumerator = new MockSplitEnumerator(1, (SplitEnumeratorContext)enumeratorContext){

            public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
                SourceCoordinatorTest.this.context.callAsync(() -> 1L, (ignored, t) -> {
                    latch.countDown();
                    try {
                        Thread.sleep(Long.MAX_VALUE);
                    }
                    catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                });
            }
        };
             SourceCoordinator coordinator = new SourceCoordinator("TestOperator", new EnumeratorCreatingSource(() -> splitEnumerator), this.context, (CoordinatorStore)new CoordinatorStoreImpl());){
            coordinator.start();
            coordinator.handleEventFromOperator(1, 0, (OperatorEvent)new SourceEventWrapper(new SourceEvent(){}));
            latch.await();
            CompletableFuture future = ComponentClosingUtils.closeAsyncWithTimeout((String)"testBlockOnClose", () -> ((SourceCoordinator)coordinator).close(), (Duration)Duration.ofMillis(1L));
            ((CompletableFuture)future.exceptionally(e -> {
                Assertions.assertThat((Throwable)e).isInstanceOf(TimeoutException.class);
                return null;
            })).get();
            CommonTestUtils.waitUtil(() -> ((MockSplitEnumerator)splitEnumerator).closed(), (Duration)Duration.ofSeconds(5L), (String)"Split enumerator was not closed in 5 seconds.");
        }
    }

    @Test
    void testUserClassLoaderWhenCreatingNewEnumerator() throws Exception {
        URLClassLoader testClassLoader = new URLClassLoader(new URL[0]);
        MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(new OperatorID(), testClassLoader);
        EnumeratorCreatingSource source = new EnumeratorCreatingSource(ClassLoaderTestEnumerator::new);
        SourceCoordinatorProvider provider = new SourceCoordinatorProvider("testOperator", context.getOperatorId(), source, 1, WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED, null);
        OperatorCoordinator coordinator = provider.getCoordinator((OperatorCoordinator.Context)context);
        coordinator.start();
        ClassLoaderTestEnumerator enumerator = (ClassLoaderTestEnumerator)source.createEnumeratorFuture.get();
        Assertions.assertThat((Object)enumerator.constructorClassLoader).isSameAs((Object)testClassLoader);
        Assertions.assertThat((Object)enumerator.threadClassLoader.get()).isSameAs((Object)testClassLoader);
        coordinator.close();
    }

    @Test
    void testUserClassLoaderWhenRestoringEnumerator() throws Exception {
        URLClassLoader testClassLoader = new URLClassLoader(new URL[0]);
        MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(new OperatorID(), testClassLoader);
        EnumeratorCreatingSource source = new EnumeratorCreatingSource(ClassLoaderTestEnumerator::new);
        SourceCoordinatorProvider provider = new SourceCoordinatorProvider("testOperator", context.getOperatorId(), source, 1, WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED, null);
        OperatorCoordinator coordinator = provider.getCoordinator((OperatorCoordinator.Context)context);
        coordinator.resetToCheckpoint(1L, SourceCoordinatorTest.createEmptyCheckpoint());
        coordinator.start();
        ClassLoaderTestEnumerator enumerator = (ClassLoaderTestEnumerator)source.restoreEnumeratorFuture.get();
        Assertions.assertThat((Object)enumerator.constructorClassLoader).isSameAs((Object)testClassLoader);
        Assertions.assertThat((Object)enumerator.threadClassLoader.get()).isSameAs((Object)testClassLoader);
        coordinator.close();
    }

    @Test
    void testSerdeBackwardCompatibility() throws Exception {
        this.sourceReady();
        this.addTestingSplitSet(6);
        TestingSplitEnumerator<MockSourceSplit> enumerator = this.getEnumerator();
        HashSet<MockSourceSplit> splits = new HashSet<MockSourceSplit>();
        enumerator.runInEnumThreadAndSync(() -> splits.addAll((Collection<MockSourceSplit>)enumerator.snapshotState(1L)));
        byte[] checkpointDataForV0Serde = this.createCheckpointDataWithSerdeV0(splits);
        SourceCoordinator<MockSourceSplit, Set<MockSourceSplit>> restoredCoordinator = this.getNewSourceCoordinator();
        restoredCoordinator.resetToCheckpoint(15213L, checkpointDataForV0Serde);
        TestingSplitEnumerator restoredEnumerator = (TestingSplitEnumerator)restoredCoordinator.getEnumerator();
        SourceCoordinatorContext restoredContext = restoredCoordinator.getContext();
        Assertions.assertThat(restoredEnumerator.getUnassignedSplits()).isEqualTo(splits);
        Assertions.assertThat(restoredEnumerator.getHandledSourceEvent()).isEmpty();
        Assertions.assertThat((Map)restoredContext.registeredReaders()).isEmpty();
    }

    @Test
    public void testSubtaskRestartAndRequestSplitsAgain() throws Exception {
        this.sourceCoordinator.start();
        ArrayList<MockSourceSplit> splits = new ArrayList<MockSourceSplit>();
        for (int i = 0; i < 2; ++i) {
            splits.add(new MockSourceSplit(i));
        }
        this.getEnumerator().addNewSplits((Collection<MockSourceSplit>)splits);
        int attemptNumber = 0;
        this.setReaderTaskReady(this.sourceCoordinator, 0, attemptNumber);
        this.registerReader(0, attemptNumber);
        this.sourceCoordinator.handleEventFromOperator(0, attemptNumber, (OperatorEvent)new RequestSplitEvent());
        this.waitForSentEvents(1);
        this.sourceCoordinator.handleEventFromOperator(0, attemptNumber, (OperatorEvent)new RequestSplitEvent());
        this.waitForSentEvents(2);
        this.sourceCoordinator.checkpointCoordinator(100L, new CompletableFuture());
        this.sourceCoordinator.handleEventFromOperator(0, attemptNumber, (OperatorEvent)new RequestSplitEvent());
        this.waitForSentEvents(3);
        Assertions.assertThat(this.getEnumerator().getUnassignedSplits()).isEmpty();
        this.sourceCoordinator.executionAttemptFailed(0, attemptNumber, null);
        this.sourceCoordinator.subtaskReset(0, 99L);
        SourceCoordinatorTest.waitUtilNumberReached(() -> this.getEnumerator().getUnassignedSplits().size(), 2);
        this.setReaderTaskReady(this.sourceCoordinator, 0, ++attemptNumber);
        this.registerReader(0, attemptNumber);
        this.sourceCoordinator.handleEventFromOperator(0, attemptNumber, (OperatorEvent)new RequestSplitEvent());
        this.waitForSentEvents(4);
        this.sourceCoordinator.handleEventFromOperator(0, attemptNumber, (OperatorEvent)new RequestSplitEvent());
        this.waitForSentEvents(5);
        this.sourceCoordinator.handleEventFromOperator(0, attemptNumber, (OperatorEvent)new RequestSplitEvent());
        this.waitForSentEvents(6);
        Assertions.assertThat(this.getEnumerator().getUnassignedSplits()).isEmpty();
        List<OperatorEvent> events = this.receivingTasks.getSentEventsForSubtask(0);
        SourceCoordinatorTest.assertAddSplitEvent(events.get(0), Collections.singletonList((MockSourceSplit)splits.get(0)));
        SourceCoordinatorTest.assertAddSplitEvent(events.get(1), Collections.singletonList((MockSourceSplit)splits.get(1)));
        SourceCoordinatorTest.assertAddSplitEvent(events.get(3), Collections.singletonList((MockSourceSplit)splits.get(0)));
        SourceCoordinatorTest.assertAddSplitEvent(events.get(4), Collections.singletonList((MockSourceSplit)splits.get(1)));
        Assertions.assertThat((Object)events.get(2)).isInstanceOf(NoMoreSplitsEvent.class);
        Assertions.assertThat((Object)events.get(5)).isInstanceOf(NoMoreSplitsEvent.class);
    }

    @Test
    public void testListeningEventsFromOtherCoordinators() throws Exception {
        String listeningID = "testListeningID";
        CoordinatorStoreImpl store = new CoordinatorStoreImpl();
        SourceCoordinator coordinator = new SourceCoordinator(new JobID(), "TestOperator", this.createMockSource(), this.context, (CoordinatorStore)store, WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED, "testListeningID");
        coordinator.start();
        ((ObjectAssert)Assertions.assertThat((Object)store.get((Object)"testListeningID")).isNotNull()).isSameAs((Object)coordinator);
    }

    @Test
    public void testInferSourceParallelismAsync() throws Exception {
        String listeningID = "testListeningID";
        CoordinatorStoreImpl store = new CoordinatorStoreImpl();
        class TestDynamicFilteringEvent
        implements SourceEvent,
        DynamicFilteringInfo {
            TestDynamicFilteringEvent() {
            }
        }
        store.putIfAbsent((Object)"testListeningID", (Object)new SourceEventWrapper((SourceEvent)new TestDynamicFilteringEvent()));
        SourceCoordinator coordinator = new SourceCoordinator(new JobID(), "TestOperator", this.createMockSource(), this.context, (CoordinatorStore)store, WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED, "testListeningID");
        Assertions.assertThat((Integer)((Integer)coordinator.inferSourceParallelismAsync(2, 1L).get())).isEqualTo(2);
    }

    private byte[] createCheckpointDataWithSerdeV0(Set<MockSourceSplit> splits) throws Exception {
        MockSplitEnumeratorCheckpointSerializer enumChkptSerializer = new MockSplitEnumeratorCheckpointSerializer();
        DataOutputSerializer serializer = new DataOutputSerializer(32);
        serializer.writeInt(0);
        serializer.writeInt(enumChkptSerializer.getVersion());
        byte[] serializedEnumChkpt = enumChkptSerializer.serialize(splits);
        serializer.writeInt(serializedEnumChkpt.length);
        serializer.write(serializedEnumChkpt);
        serializer.writeInt(0);
        serializer.writeInt(0);
        serializer.writeInt(0);
        return serializer.getCopyOfBuffer();
    }

    private static byte[] createEmptyCheckpoint() throws Exception {
        return SourceCoordinator.writeCheckpointBytes(Collections.emptySet(), (SimpleVersionedSerializer)new MockSplitEnumeratorCheckpointSerializer());
    }

    private static /* synthetic */ SplitEnumerator lambda$testErrorThrownFromSplitEnumerator$8(SplitEnumerator splitEnumerator) {
        return splitEnumerator;
    }

    private static /* synthetic */ SplitEnumerator lambda$testFailJobWhenExceptionThrownFromStart$5(SplitEnumerator splitEnumerator) {
        return splitEnumerator;
    }

    private static final class EnumeratorCreatingSource<T, EnumT extends SplitEnumerator<MockSourceSplit, Set<MockSourceSplit>>>
    implements Source<T, MockSourceSplit, Set<MockSourceSplit>> {
        final CompletableFuture<EnumT> createEnumeratorFuture = new CompletableFuture();
        final CompletableFuture<EnumT> restoreEnumeratorFuture = new CompletableFuture();
        private final Supplier<EnumT> enumeratorFactory;

        public EnumeratorCreatingSource(Supplier<EnumT> enumeratorFactory) {
            this.enumeratorFactory = enumeratorFactory;
        }

        public Boundedness getBoundedness() {
            return Boundedness.CONTINUOUS_UNBOUNDED;
        }

        public SourceReader<T, MockSourceSplit> createReader(SourceReaderContext readerContext) {
            throw new UnsupportedOperationException();
        }

        public SplitEnumerator<MockSourceSplit, Set<MockSourceSplit>> createEnumerator(SplitEnumeratorContext<MockSourceSplit> enumContext) {
            SplitEnumerator enumerator = (SplitEnumerator)this.enumeratorFactory.get();
            this.createEnumeratorFuture.complete(enumerator);
            return enumerator;
        }

        public SplitEnumerator<MockSourceSplit, Set<MockSourceSplit>> restoreEnumerator(SplitEnumeratorContext<MockSourceSplit> enumContext, Set<MockSourceSplit> checkpoint) {
            SplitEnumerator enumerator = (SplitEnumerator)this.enumeratorFactory.get();
            this.restoreEnumeratorFuture.complete(enumerator);
            return enumerator;
        }

        public SimpleVersionedSerializer<MockSourceSplit> getSplitSerializer() {
            return new MockSourceSplitSerializer();
        }

        public SimpleVersionedSerializer<Set<MockSourceSplit>> getEnumeratorCheckpointSerializer() {
            return new MockSplitEnumeratorCheckpointSerializer();
        }
    }

    private static final class ClassLoaderTestEnumerator
    implements SplitEnumerator<MockSourceSplit, Set<MockSourceSplit>> {
        final CompletableFuture<ClassLoader> threadClassLoader = new CompletableFuture();
        final ClassLoader constructorClassLoader = Thread.currentThread().getContextClassLoader();

        public void start() {
            this.threadClassLoader.complete(Thread.currentThread().getContextClassLoader());
        }

        public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
            throw new UnsupportedOperationException();
        }

        public void addSplitsBack(List<MockSourceSplit> splits, int subtaskId) {
            throw new UnsupportedOperationException();
        }

        public void addReader(int subtaskId) {
            throw new UnsupportedOperationException();
        }

        public Set<MockSourceSplit> snapshotState(long checkpointId) throws Exception {
            throw new UnsupportedOperationException();
        }

        public void close() {
        }
    }
}

