/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.source.coordinator;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.DynamicParallelismInference;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SupportsBatchSnapshot;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;

public class TestingSplitEnumerator<SplitT extends SourceSplit>
implements SplitEnumerator<SplitT, Set<SplitT>>,
SupportsBatchSnapshot {
    private final SplitEnumeratorContext<SplitT> context;
    private final Queue<SplitT> splits;
    private final List<SourceEvent> handledEvents;
    private final List<Long> successfulCheckpoints;
    private final Set<Integer> registeredReaders;
    private volatile boolean started;
    private volatile boolean closed;

    public TestingSplitEnumerator(SplitEnumeratorContext<SplitT> context) {
        this(context, Collections.emptySet());
    }

    public TestingSplitEnumerator(SplitEnumeratorContext<SplitT> context, Collection<SplitT> restoredSplits) {
        this.context = context;
        this.splits = new ArrayDeque<SplitT>(restoredSplits);
        this.handledEvents = new ArrayList<SourceEvent>();
        this.successfulCheckpoints = new ArrayList<Long>();
        this.registeredReaders = new HashSet<Integer>();
    }

    public void start() {
        this.started = true;
    }

    public void close() throws IOException {
        this.closed = true;
    }

    public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
        this.context.runInCoordinatorThread(() -> {
            if (this.splits.isEmpty()) {
                this.context.signalNoMoreSplits(subtaskId);
            } else {
                SourceSplit split = (SourceSplit)this.splits.poll();
                this.context.assignSplit(split, subtaskId);
            }
        });
    }

    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
        this.handledEvents.add(sourceEvent);
    }

    public void addSplitsBack(List<SplitT> splitsToAddBack, int subtaskId) {
        this.splits.addAll(splitsToAddBack);
    }

    public void addReader(int subtaskId) {
        this.registeredReaders.add(subtaskId);
    }

    public Set<SplitT> snapshotState(long checkpointId) {
        return new HashSet<SplitT>(this.splits);
    }

    public void notifyCheckpointComplete(long checkpointId) {
        this.successfulCheckpoints.add(checkpointId);
    }

    public SplitEnumeratorContext<SplitT> getContext() {
        return this.context;
    }

    public boolean isStarted() {
        return this.started;
    }

    public boolean isClosed() {
        return this.closed;
    }

    public Set<SplitT> getUnassignedSplits() {
        return new HashSet<SplitT>(this.splits);
    }

    public List<SourceEvent> getHandledSourceEvent() {
        return Collections.unmodifiableList(this.handledEvents);
    }

    public List<Long> getSuccessfulCheckpoints() {
        return Collections.unmodifiableList(this.successfulCheckpoints);
    }

    public Set<Integer> getRegisteredReaders() {
        return Collections.unmodifiableSet(this.registeredReaders);
    }

    @SafeVarargs
    public final void addNewSplits(SplitT ... newSplits) {
        this.addNewSplits((Collection<SplitT>)Arrays.asList(newSplits));
    }

    public void addNewSplits(Collection<SplitT> newSplits) {
        this.runInEnumThreadAndSync(() -> this.splits.addAll(newSplits));
    }

    public void executeAssignOneSplit(int subtask) {
        this.runInEnumThreadAndSync(() -> {
            Preconditions.checkState((!this.splits.isEmpty() ? 1 : 0) != 0, (Object)"no splits available");
            SourceSplit split = (SourceSplit)this.splits.poll();
            this.context.assignSplit(split, subtask);
        });
    }

    public void runInEnumThreadAndSync(Runnable action) {
        CompletableFuture future = new CompletableFuture();
        this.context.runInCoordinatorThread(() -> {
            try {
                action.run();
                future.complete(null);
            }
            catch (Throwable t) {
                future.completeExceptionally(t);
            }
        });
        try {
            future.get();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        catch (ExecutionException e) {
            ExceptionUtils.rethrow((Throwable)ExceptionUtils.stripExecutionException((Throwable)e));
        }
    }

    public static <T, SplitT extends SourceSplit> Source<T, SplitT, Set<SplitT>> factorySource(SimpleVersionedSerializer<SplitT> splitSerializer, SimpleVersionedSerializer<Set<SplitT>> checkpointSerializer) {
        return new FactorySource(splitSerializer, checkpointSerializer);
    }

    static class FactorySource<T, SplitT extends SourceSplit>
    implements Source<T, SplitT, Set<SplitT>>,
    DynamicParallelismInference {
        private final SimpleVersionedSerializer<SplitT> splitSerializer;
        private final SimpleVersionedSerializer<Set<SplitT>> checkpointSerializer;

        public FactorySource(SimpleVersionedSerializer<SplitT> splitSerializer, SimpleVersionedSerializer<Set<SplitT>> checkpointSerializer) {
            this.splitSerializer = splitSerializer;
            this.checkpointSerializer = checkpointSerializer;
        }

        public Boundedness getBoundedness() {
            throw new UnsupportedOperationException();
        }

        public SourceReader<T, SplitT> createReader(SourceReaderContext readerContext) {
            throw new UnsupportedOperationException();
        }

        public TestingSplitEnumerator<SplitT> createEnumerator(SplitEnumeratorContext<SplitT> enumContext) {
            return new TestingSplitEnumerator<SplitT>(enumContext);
        }

        public SplitEnumerator<SplitT, Set<SplitT>> restoreEnumerator(SplitEnumeratorContext<SplitT> enumContext, Set<SplitT> checkpoint) {
            return new TestingSplitEnumerator<SplitT>(enumContext, checkpoint);
        }

        public SimpleVersionedSerializer<SplitT> getSplitSerializer() {
            return this.splitSerializer;
        }

        public SimpleVersionedSerializer<Set<SplitT>> getEnumeratorCheckpointSerializer() {
            return this.checkpointSerializer;
        }

        public int inferParallelism(DynamicParallelismInference.Context context) {
            return context.getParallelismInferenceUpperBound();
        }
    }
}

