/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.source.coordinator;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.flink.api.common.eventtime.WatermarkAlignmentParams;
import org.apache.flink.api.connector.source.ReaderInfo;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SupportsHandleExecutionAttemptSourceEvent;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorCheckpointSerializer;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorTestBase;
import org.apache.flink.runtime.source.coordinator.TestingSplitEnumerator;
import org.apache.flink.runtime.source.event.NoMoreSplitsEvent;
import org.apache.flink.runtime.source.event.RequestSplitEvent;
import org.apache.flink.runtime.source.event.SourceEventWrapper;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class SourceCoordinatorConcurrentAttemptsTest
extends SourceCoordinatorTestBase {
    private boolean enumeratorSupportsHandleExecutionAttemptSourceEvent;

    SourceCoordinatorConcurrentAttemptsTest() {
    }

    @Override
    @BeforeEach
    void setup() throws Exception {
        this.supportsConcurrentExecutionAttempts = true;
        this.enumeratorSupportsHandleExecutionAttemptSourceEvent = true;
        super.setup();
    }

    @Test
    void testCoordinatorThrowExceptionIfWatermarkAlignmentIsEnabled() {
        this.enumeratorSupportsHandleExecutionAttemptSourceEvent = false;
        Assertions.assertThatThrownBy(() -> this.getNewSourceCoordinator(new WatermarkAlignmentParams(1000L, "group1", Long.MAX_VALUE))).isInstanceOf(IllegalArgumentException.class);
    }

    @Test
    void testCoordinatorFailJobOnSourceEventToNonsupportingEnumerator() throws Exception {
        this.enumeratorSupportsHandleExecutionAttemptSourceEvent = false;
        this.sourceCoordinator = this.getNewSourceCoordinator();
        this.context = this.sourceCoordinator.getContext();
        this.sourceCoordinator.start();
        this.sourceCoordinator.handleEventFromOperator(0, 0, (OperatorEvent)new SourceEventWrapper((SourceEvent)new TestSourceEvent()));
        this.waitForCoordinatorToProcessActions();
        Assertions.assertThat((boolean)this.operatorCoordinatorContext.isJobFailed()).isTrue();
    }

    @Test
    void testContextThrowExceptionOnSourceEventToNonsupportingMethod() throws Exception {
        this.enumeratorSupportsHandleExecutionAttemptSourceEvent = false;
        this.sourceCoordinator = this.getNewSourceCoordinator();
        this.context = this.sourceCoordinator.getContext();
        this.sourceCoordinator.start();
        this.sourceReady();
        Assertions.assertThatThrownBy(() -> this.context.sendEventToSourceReader(0, (SourceEvent)new TestSourceEvent())).isInstanceOf(IllegalStateException.class);
    }

    @Test
    public void testConcurrentAttemptsRequestSplits() throws Exception {
        this.sourceCoordinator.start();
        ArrayList<MockSourceSplit> splits = new ArrayList<MockSourceSplit>();
        for (int i = 0; i < 2; ++i) {
            splits.add(new MockSourceSplit(i));
        }
        this.getEnumerator().addNewSplits((Collection<MockSourceSplit>)splits);
        int attempt0 = 0;
        this.setReaderTaskReady(this.sourceCoordinator, 0, attempt0);
        this.registerReader(0, attempt0);
        this.sourceCoordinator.handleEventFromOperator(0, attempt0, (OperatorEvent)new RequestSplitEvent());
        this.waitForSentEvents(1);
        int attempt1 = 3;
        this.setReaderTaskReady(this.sourceCoordinator, 0, attempt1);
        this.registerReader(0, attempt1);
        this.waitForSentEvents(2);
        this.sourceCoordinator.handleEventFromOperator(0, attempt1, (OperatorEvent)new RequestSplitEvent());
        this.waitForSentEvents(4);
        this.sourceCoordinator.handleEventFromOperator(0, attempt0, (OperatorEvent)new RequestSplitEvent());
        this.waitForSentEvents(6);
        int attempt2 = 5;
        this.setReaderTaskReady(this.sourceCoordinator, 0, attempt2);
        this.registerReader(0, attempt2);
        this.waitForSentEvents(8);
        List<OperatorEvent> events = this.receivingTasks.getSentEventsForSubtask(0);
        SourceCoordinatorConcurrentAttemptsTest.assertAddSplitEvent(events.get(0), Collections.singletonList(splits.get(0)));
        SourceCoordinatorConcurrentAttemptsTest.assertAddSplitEvent(events.get(1), Collections.singletonList(splits.get(0)));
        SourceCoordinatorConcurrentAttemptsTest.assertAddSplitEvent(events.get(2), Collections.singletonList(splits.get(1)));
        SourceCoordinatorConcurrentAttemptsTest.assertAddSplitEvent(events.get(3), Collections.singletonList(splits.get(1)));
        SourceCoordinatorConcurrentAttemptsTest.assertAddSplitEvent(events.get(6), splits);
        Assertions.assertThat((Object)events.get(4)).isInstanceOf(NoMoreSplitsEvent.class);
        Assertions.assertThat((Object)events.get(5)).isInstanceOf(NoMoreSplitsEvent.class);
        Assertions.assertThat((Object)events.get(7)).isInstanceOf(NoMoreSplitsEvent.class);
    }

    @Test
    public void testReaderInfoOfConcurrentAttempts() throws Exception {
        this.sourceCoordinator.start();
        this.registerReader(0, 3);
        this.registerReader(0, 5);
        this.waitForCoordinatorToProcessActions();
        Assertions.assertThat((Map)this.context.registeredReadersOfAttempts()).hasSize(1);
        Map attemptReaders = (Map)this.context.registeredReadersOfAttempts().get(0);
        Assertions.assertThat((Map)attemptReaders).containsOnlyKeys((Object[])new Integer[]{3, 5});
        Assertions.assertThat(attemptReaders.get(3)).isNotNull();
        Assertions.assertThat((String)((ReaderInfo)attemptReaders.get(3)).getLocation()).isEqualTo(SourceCoordinatorConcurrentAttemptsTest.createLocationFor(0, 3));
        Assertions.assertThat(attemptReaders.get(5)).isNotNull();
        Assertions.assertThat((String)((ReaderInfo)attemptReaders.get(5)).getLocation()).isEqualTo(SourceCoordinatorConcurrentAttemptsTest.createLocationFor(0, 5));
        this.sourceCoordinator.executionAttemptFailed(0, 5, (Throwable)new Exception());
        this.waitForCoordinatorToProcessActions();
        attemptReaders = (Map)this.context.registeredReadersOfAttempts().get(0);
        Assertions.assertThat((Map)attemptReaders).containsOnlyKeys((Object[])new Integer[]{3});
        this.sourceCoordinator.subtaskReset(0, -1L);
        this.waitForCoordinatorToProcessActions();
        Assertions.assertThat((Map)this.context.registeredReadersOfAttempts()).isEmpty();
    }

    @Test
    public void testSubtaskReaderInfoOfConcurrentAttempts() throws Exception {
        this.sourceCoordinator.start();
        this.registerReader(0, 3);
        this.registerReader(0, 5);
        this.waitForCoordinatorToProcessActions();
        Assertions.assertThat((Map)this.context.registeredReaders()).hasSize(1);
        Assertions.assertThat((String)((ReaderInfo)this.context.registeredReaders().get(0)).getLocation()).isEqualTo(SourceCoordinatorConcurrentAttemptsTest.createLocationFor(0, 3));
        this.registerReader(0, 1);
        this.waitForCoordinatorToProcessActions();
        Assertions.assertThat((String)((ReaderInfo)this.context.registeredReaders().get(0)).getLocation()).isEqualTo(SourceCoordinatorConcurrentAttemptsTest.createLocationFor(0, 1));
    }

    @Test
    public void testForwardAttemptSourceEvents() throws Exception {
        this.sourceCoordinator.start();
        TestSourceEvent event1 = new TestSourceEvent();
        TestSourceEvent event2 = new TestSourceEvent();
        this.sourceCoordinator.handleEventFromOperator(0, 3, (OperatorEvent)new SourceEventWrapper((SourceEvent)event1));
        this.sourceCoordinator.handleEventFromOperator(0, 5, (OperatorEvent)new SourceEventWrapper((SourceEvent)event2));
        this.waitForCoordinatorToProcessActions();
        Assertions.assertThat((Object)((TestEnumerator)this.getTestEnumerator()).getEvent(0, 3)).isSameAs((Object)event1);
        Assertions.assertThat((Object)((TestEnumerator)this.getTestEnumerator()).getEvent(0, 5)).isSameAs((Object)event2);
    }

    @Override
    Source<Integer, MockSourceSplit, Set<MockSourceSplit>> createMockSource() {
        if (this.enumeratorSupportsHandleExecutionAttemptSourceEvent) {
            return new TestSource<Integer, MockSourceSplit>((SimpleVersionedSerializer<MockSourceSplit>)new MockSourceSplitSerializer(), (SimpleVersionedSerializer<Set<MockSourceSplit>>)new MockSplitEnumeratorCheckpointSerializer());
        }
        return TestingSplitEnumerator.factorySource(new MockSourceSplitSerializer(), new MockSplitEnumeratorCheckpointSerializer());
    }

    private TestEnumerator<MockSourceSplit> getTestEnumerator() {
        return (TestEnumerator)super.getEnumerator();
    }

    private static class TestSourceEvent
    implements SourceEvent,
    Serializable {
        private TestSourceEvent() {
        }
    }

    private static class TestEnumerator<SplitT extends SourceSplit>
    extends TestingSplitEnumerator<SplitT>
    implements SupportsHandleExecutionAttemptSourceEvent {
        private final Map<Integer, Map<Integer, SourceEvent>> sourceEvents = new HashMap<Integer, Map<Integer, SourceEvent>>();

        private TestEnumerator(SplitEnumeratorContext<SplitT> context) {
            super(context);
        }

        private TestEnumerator(SplitEnumeratorContext<SplitT> context, Collection<SplitT> restoredSplits) {
            super(context, restoredSplits);
        }

        public void handleSourceEvent(int subtaskId, int attemptNumber, SourceEvent sourceEvent) {
            this.sourceEvents.computeIfAbsent(subtaskId, k -> new HashMap()).put(attemptNumber, sourceEvent);
            this.handleSourceEvent(subtaskId, sourceEvent);
        }

        private SourceEvent getEvent(int subtaskId, int attemptNumber) {
            return (SourceEvent)((Map)this.sourceEvents.getOrDefault(subtaskId, new HashMap())).get(attemptNumber);
        }
    }

    private static final class TestSource<T, SplitT extends SourceSplit>
    extends TestingSplitEnumerator.FactorySource<T, SplitT> {
        public TestSource(SimpleVersionedSerializer<SplitT> splitSerializer, SimpleVersionedSerializer<Set<SplitT>> checkpointSerializer) {
            super(splitSerializer, checkpointSerializer);
        }

        @Override
        public TestingSplitEnumerator<SplitT> createEnumerator(SplitEnumeratorContext<SplitT> enumContext) {
            return new TestEnumerator((SplitEnumeratorContext)enumContext);
        }

        @Override
        public SplitEnumerator<SplitT, Set<SplitT>> restoreEnumerator(SplitEnumeratorContext<SplitT> enumContext, Set<SplitT> checkpoint) {
            return new TestEnumerator(enumContext, checkpoint);
        }
    }
}

