/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.base.source.hybrid;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.function.BiConsumer;
import org.apache.flink.api.connector.source.ReaderInfo;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.api.connector.source.SupportsIntermediateNoMoreSplits;
import org.apache.flink.connector.base.source.hybrid.HybridSource;
import org.apache.flink.connector.base.source.hybrid.HybridSourceEnumeratorState;
import org.apache.flink.connector.base.source.hybrid.HybridSourceSplit;
import org.apache.flink.connector.base.source.hybrid.SourceReaderFinishedEvent;
import org.apache.flink.connector.base.source.hybrid.SwitchSourceEvent;
import org.apache.flink.connector.base.source.hybrid.SwitchedSources;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.groups.SplitEnumeratorMetricGroup;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HybridSourceSplitEnumerator
implements SplitEnumerator<HybridSourceSplit, HybridSourceEnumeratorState> {
    private static final Logger LOG = LoggerFactory.getLogger(HybridSourceSplitEnumerator.class);
    private final SplitEnumeratorContext<HybridSourceSplit> context;
    private final List<HybridSource.SourceListEntry> sources;
    private final SwitchedSources switchedSources = new SwitchedSources();
    private final Map<Integer, TreeMap<Integer, List<HybridSourceSplit>>> pendingSplits;
    private final Set<Integer> finishedReaders;
    private final Map<Integer, Integer> readerSourceIndex;
    private int currentSourceIndex;
    private HybridSourceEnumeratorState restoredEnumeratorState;
    private SplitEnumerator<SourceSplit, Object> currentEnumerator;
    private SimpleVersionedSerializer<Object> currentEnumeratorCheckpointSerializer;

    public HybridSourceSplitEnumerator(SplitEnumeratorContext<HybridSourceSplit> context, List<HybridSource.SourceListEntry> sources, int initialSourceIndex, HybridSourceEnumeratorState restoredEnumeratorState) {
        Preconditions.checkArgument((initialSourceIndex < sources.size() ? 1 : 0) != 0);
        this.context = context;
        this.sources = sources;
        this.currentSourceIndex = initialSourceIndex;
        this.pendingSplits = new HashMap<Integer, TreeMap<Integer, List<HybridSourceSplit>>>();
        this.finishedReaders = new HashSet<Integer>();
        this.readerSourceIndex = new HashMap<Integer, Integer>();
        this.restoredEnumeratorState = restoredEnumeratorState;
    }

    public void start() {
        this.switchEnumerator();
    }

    public void handleSplitRequest(int subtaskId, String requesterHostname) {
        LOG.debug("handleSplitRequest subtask={} sourceIndex={} pendingSplits={}", new Object[]{subtaskId, this.currentSourceIndex, this.pendingSplits});
        Preconditions.checkState((this.pendingSplits.isEmpty() || !this.pendingSplits.containsKey(subtaskId) ? 1 : 0) != 0);
        this.currentEnumerator.handleSplitRequest(subtaskId, requesterHostname);
    }

    public void addSplitsBack(List<HybridSourceSplit> splits, int subtaskId) {
        LOG.debug("Adding splits back for subtask={} splits={}", (Object)subtaskId, splits);
        TreeMap<Integer, List> splitsBySourceIndex = new TreeMap<Integer, List>();
        for (HybridSourceSplit split : splits) {
            splitsBySourceIndex.computeIfAbsent(split.sourceIndex(), k -> new ArrayList()).add(split);
        }
        splitsBySourceIndex.forEach((k, splitsPerSource) -> {
            if (k == this.currentSourceIndex) {
                this.currentEnumerator.addSplitsBack(HybridSourceSplit.unwrapSplits(splitsPerSource, this.switchedSources), subtaskId);
            } else {
                this.pendingSplits.computeIfAbsent(subtaskId, sourceIndex -> new TreeMap()).put(k, splitsPerSource);
            }
        });
    }

    public void addReader(int subtaskId) {
        LOG.debug("addReader subtaskId={}", (Object)subtaskId);
        this.readerSourceIndex.remove(subtaskId);
    }

    private void sendSwitchSourceEvent(int subtaskId, int sourceIndex) {
        this.readerSourceIndex.put(subtaskId, sourceIndex);
        Source source = this.switchedSources.sourceOf(sourceIndex);
        this.context.sendEventToSourceReader(subtaskId, (SourceEvent)new SwitchSourceEvent(sourceIndex, source, sourceIndex >= this.sources.size() - 1));
        TreeMap<Integer, List<HybridSourceSplit>> splitsBySource = this.pendingSplits.get(subtaskId);
        if (splitsBySource != null) {
            List<HybridSourceSplit> splits = splitsBySource.remove(sourceIndex);
            if (splits != null && !splits.isEmpty()) {
                LOG.debug("Restoring splits to subtask={} {}", (Object)subtaskId, splits);
                this.context.assignSplits(new SplitsAssignment(Collections.singletonMap(subtaskId, splits)));
                HybridSourceSplitEnumerator.checkAndSignalNoMoreSplits(this.context, subtaskId, sourceIndex, this.sources.size());
            }
            if (splitsBySource.isEmpty()) {
                this.pendingSplits.remove(subtaskId);
            }
        }
        if (sourceIndex == this.currentSourceIndex) {
            LOG.debug("adding reader subtask={} sourceIndex={}", (Object)subtaskId, (Object)this.currentSourceIndex);
            this.currentEnumerator.addReader(subtaskId);
        }
    }

    public HybridSourceEnumeratorState snapshotState(long checkpointId) throws Exception {
        Object enumState = this.currentEnumerator.snapshotState(checkpointId);
        byte[] enumStateBytes = this.currentEnumeratorCheckpointSerializer.serialize(enumState);
        return new HybridSourceEnumeratorState(this.currentSourceIndex, enumStateBytes, this.currentEnumeratorCheckpointSerializer.getVersion());
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        this.currentEnumerator.notifyCheckpointComplete(checkpointId);
    }

    public void notifyCheckpointAborted(long checkpointId) throws Exception {
        this.currentEnumerator.notifyCheckpointAborted(checkpointId);
    }

    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
        LOG.debug("handleSourceEvent {} subtask={} pendingSplits={}", new Object[]{sourceEvent, subtaskId, this.pendingSplits});
        if (sourceEvent instanceof SourceReaderFinishedEvent) {
            SourceReaderFinishedEvent srfe = (SourceReaderFinishedEvent)sourceEvent;
            int subtaskSourceIndex = this.readerSourceIndex.computeIfAbsent(subtaskId, k -> {
                LOG.debug("New reader subtask={} sourceIndex={}", (Object)subtaskId, (Object)srfe.sourceIndex());
                return srfe.sourceIndex();
            });
            if (srfe.sourceIndex() < subtaskSourceIndex) {
                return;
            }
            if (subtaskSourceIndex < this.currentSourceIndex) {
                subtaskSourceIndex = subtaskSourceIndex == -1 ? this.switchedSources.getFirstSourceIndex() : ++subtaskSourceIndex;
                this.sendSwitchSourceEvent(subtaskId, subtaskSourceIndex);
                return;
            }
            this.finishedReaders.add(subtaskId);
            if (this.finishedReaders.size() == this.context.currentParallelism()) {
                LOG.debug("All readers finished, ready to switch enumerator!");
                if (this.currentSourceIndex + 1 < this.sources.size()) {
                    this.switchEnumerator();
                    for (int i = 0; i < this.context.currentParallelism(); ++i) {
                        this.sendSwitchSourceEvent(i, this.currentSourceIndex);
                    }
                }
            }
        } else {
            this.currentEnumerator.handleSourceEvent(subtaskId, sourceEvent);
        }
    }

    public void close() throws IOException {
        if (this.currentEnumerator != null) {
            this.currentEnumerator.close();
        }
    }

    private void switchEnumerator() {
        final SplitEnumerator<SourceSplit, Object> previousEnumerator = this.currentEnumerator;
        if (this.currentEnumerator != null) {
            try {
                this.currentEnumerator.close();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
            this.currentEnumerator = null;
            ++this.currentSourceIndex;
        }
        HybridSource.SourceSwitchContext<Object> switchContext = new HybridSource.SourceSwitchContext<Object>(){

            @Override
            public Object getPreviousEnumerator() {
                return previousEnumerator;
            }
        };
        Object source = this.sources.get((int)this.currentSourceIndex).factory.create(switchContext);
        this.switchedSources.put(this.currentSourceIndex, (Source)source);
        this.currentEnumeratorCheckpointSerializer = source.getEnumeratorCheckpointSerializer();
        SplitEnumeratorContextProxy delegatingContext = new SplitEnumeratorContextProxy(this.currentSourceIndex, this.context, this.readerSourceIndex, this.switchedSources, this.sources.size());
        try {
            if (this.restoredEnumeratorState == null) {
                this.currentEnumerator = source.createEnumerator(delegatingContext);
            } else {
                LOG.info("Restoring enumerator for sourceIndex={}", (Object)this.currentSourceIndex);
                Object nestedEnumState = this.currentEnumeratorCheckpointSerializer.deserialize(this.restoredEnumeratorState.getWrappedStateSerializerVersion(), this.restoredEnumeratorState.getWrappedState());
                this.currentEnumerator = source.restoreEnumerator(delegatingContext, nestedEnumState);
                this.restoredEnumeratorState = null;
            }
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to create enumerator for sourceIndex=" + this.currentSourceIndex, e);
        }
        LOG.info("Starting enumerator for sourceIndex={}", (Object)this.currentSourceIndex);
        this.currentEnumerator.start();
    }

    private static void checkAndSignalNoMoreSplits(SplitEnumeratorContext<HybridSourceSplit> context, int subtaskId, int sourceIndex, int sourceSize) {
        Preconditions.checkState((boolean)(context instanceof SupportsIntermediateNoMoreSplits), (String)"The split enumerator context %s must implement SupportsIntermediateNoMoreSplits to be used in hybrid source scenario.", (Object[])new Object[]{context.getClass().getCanonicalName()});
        if (sourceIndex >= sourceSize - 1) {
            context.signalNoMoreSplits(subtaskId);
        } else {
            ((SupportsIntermediateNoMoreSplits)context).signalIntermediateNoMoreSplits(subtaskId);
        }
    }

    private static class SplitEnumeratorContextProxy<SplitT extends SourceSplit>
    implements SplitEnumeratorContext<SplitT> {
        private static final Logger LOG = LoggerFactory.getLogger(SplitEnumeratorContextProxy.class);
        private final SplitEnumeratorContext<HybridSourceSplit> realContext;
        private final int sourceIndex;
        private final Map<Integer, Integer> readerSourceIndex;
        private final SwitchedSources switchedSources;
        private final int sourceSize;

        private SplitEnumeratorContextProxy(int sourceIndex, SplitEnumeratorContext<HybridSourceSplit> realContext, Map<Integer, Integer> readerSourceIndex, SwitchedSources switchedSources, int sourceSize) {
            this.realContext = realContext;
            this.sourceIndex = sourceIndex;
            this.readerSourceIndex = readerSourceIndex;
            this.switchedSources = switchedSources;
            this.sourceSize = sourceSize;
        }

        public SplitEnumeratorMetricGroup metricGroup() {
            return this.realContext.metricGroup();
        }

        public void sendEventToSourceReader(int subtaskId, SourceEvent event) {
            this.realContext.sendEventToSourceReader(subtaskId, event);
        }

        public int currentParallelism() {
            return this.realContext.currentParallelism();
        }

        public Map<Integer, ReaderInfo> registeredReaders() {
            Map readers = this.realContext.registeredReaders();
            if (readers.size() != this.readerSourceIndex.size()) {
                return this.filterRegisteredReaders(readers);
            }
            Integer lastIndex = null;
            for (Integer sourceIndex : this.readerSourceIndex.values()) {
                if (lastIndex != null && lastIndex != sourceIndex) {
                    return this.filterRegisteredReaders(readers);
                }
                lastIndex = sourceIndex;
            }
            return readers;
        }

        private Map<Integer, ReaderInfo> filterRegisteredReaders(Map<Integer, ReaderInfo> readers) {
            HashMap<Integer, ReaderInfo> readersForSource = new HashMap<Integer, ReaderInfo>(readers.size());
            for (Map.Entry<Integer, ReaderInfo> e : readers.entrySet()) {
                if (this.readerSourceIndex.get(e.getKey()) != Integer.valueOf(this.sourceIndex)) continue;
                readersForSource.put(e.getKey(), e.getValue());
            }
            return readersForSource;
        }

        public void assignSplits(SplitsAssignment<SplitT> newSplitAssignments) {
            HashMap<Integer, List<HybridSourceSplit>> wrappedAssignmentMap = new HashMap<Integer, List<HybridSourceSplit>>();
            for (Map.Entry e : newSplitAssignments.assignment().entrySet()) {
                List<HybridSourceSplit> splits = HybridSourceSplit.wrapSplits((List)e.getValue(), this.sourceIndex, this.switchedSources);
                wrappedAssignmentMap.put((Integer)e.getKey(), splits);
            }
            SplitsAssignment wrappedAssignments = new SplitsAssignment(wrappedAssignmentMap);
            LOG.debug("Assigning splits sourceIndex={} {}", (Object)this.sourceIndex, (Object)wrappedAssignments);
            this.realContext.assignSplits(wrappedAssignments);
        }

        public void assignSplit(SplitT split, int subtask) {
            HybridSourceSplit wrappedSplit = HybridSourceSplit.wrapSplit(split, this.sourceIndex, this.switchedSources);
            this.realContext.assignSplit((SourceSplit)wrappedSplit, subtask);
        }

        public void signalNoMoreSplits(int subtask) {
            HybridSourceSplitEnumerator.checkAndSignalNoMoreSplits((SplitEnumeratorContext<HybridSourceSplit>)this.realContext, subtask, this.sourceIndex, this.sourceSize);
        }

        public <T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> handler) {
            this.realContext.callAsync(callable, handler);
        }

        public <T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> handler, long initialDelay, long period) {
            this.realContext.callAsync(callable, handler, initialDelay, period);
        }

        public void runInCoordinatorThread(Runnable runnable) {
            this.realContext.runInCoordinatorThread(runnable);
        }
    }
}

