package com.alibaba.ververica.connectors.common.source;

import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.ParameterizedType;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.PojoField;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/ververica/connectors/common/source/AbstractDynamicParallelSource.class */
public abstract class AbstractDynamicParallelSource<T, CURSOR extends Serializable> extends AbstractParallelSourceBase<T, CURSOR> implements CheckpointedFunction {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractDynamicParallelSource.class);
    private static final long serialVersionUID = -7848357196819780804L;
    private static final String SOURCE_STATE_NAME = "source_offsets_state_name";
    protected transient List<Tuple2<InputSplit, CURSOR>> reservedProgress;
    private transient ListState<InnerProgress<CURSOR>> unionInitialProgress;
    private transient List<InnerProgress<CURSOR>> allSplitsInCP;

    /* loaded from: input_file:com/alibaba/ververica/connectors/common/source/AbstractDynamicParallelSource$InnerProgress.class */
    public static class InnerProgress<CURSOR extends Serializable> implements Serializable {
        private static final long serialVersionUID = -7756210303146639268L;
        public InputSplit inputSplit;
        public CURSOR cursor;

        public InnerProgress() {
        }

        public InnerProgress(InputSplit inputSplit, CURSOR cursor) {
            this.inputSplit = inputSplit;
            this.cursor = cursor;
        }

        public InputSplit getInputSplit() {
            return this.inputSplit;
        }

        public InnerProgress setInputSplit(InputSplit inputSplit) {
            this.inputSplit = inputSplit;
            return this;
        }

        public CURSOR getCursor() {
            return this.cursor;
        }

        public InnerProgress setCursor(CURSOR cursor) {
            this.cursor = cursor;
            return this;
        }

        public String toString() {
            return "InnerProgress{inputSplit=" + this.inputSplit + ", cursor=" + this.cursor + '}';
        }
    }

    public abstract List<Tuple2<InputSplit, CURSOR>> reAssignInputSplitsForCurrentSubTask(int i, int i2, List<InnerProgress<CURSOR>> list) throws IOException;

    public List<Tuple2<InputSplit, CURSOR>> reserveInputSplitsForCurrentSubTask(int i, int i2, List<InnerProgress<CURSOR>> list) throws IOException {
        return new ArrayList();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.alibaba.ververica.connectors.common.source.AbstractParallelSourceBase
    public void createParallelReader(Configuration configuration) throws IOException {
        if (isRecoveryFromState()) {
            LOG.info("Reocory State!");
            this.initialProgress = reAssignInputSplitsForCurrentSubTask(getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getIndexOfThisSubtask(), this.allSplitsInCP);
            this.reservedProgress = reserveInputSplitsForCurrentSubTask(getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getIndexOfThisSubtask(), this.allSplitsInCP);
        }
        super.createParallelReader(configuration);
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        LOG.info("initializeState");
        ParameterizedType parameterizedType = (ParameterizedType) getClass().getGenericSuperclass();
        TypeInformation createTypeInfo = TypeExtractor.createTypeInfo(InputSplit.class);
        TypeInformation createTypeInfo2 = TypeExtractor.createTypeInfo(parameterizedType.getActualTypeArguments()[1]);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new PojoField(InnerProgress.class.getField("inputSplit"), createTypeInfo));
        arrayList.add(new PojoField(InnerProgress.class.getField("cursor"), createTypeInfo2));
        this.unionInitialProgress = functionInitializationContext.getOperatorStateStore().getUnionListState(new ListStateDescriptor(SOURCE_STATE_NAME, new PojoTypeInfo(InnerProgress.class, arrayList)));
        LOG.info("Restoring state: {}", this.unionInitialProgress);
        this.allSplitsInCP = new ArrayList();
        if (functionInitializationContext.isRestored()) {
            this.recoveryFromState = true;
            for (InnerProgress innerProgress : (Iterable) this.unionInitialProgress.get()) {
                this.allSplitsInCP.add(new InnerProgress<>(innerProgress.inputSplit, innerProgress.cursor));
            }
        }
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        if (this.disableParallelRead) {
            return;
        }
        this.unionInitialProgress.clear();
        HashSet hashSet = new HashSet();
        for (Map.Entry<InputSplit, CURSOR> entry : this.parallelReader.getProgress().getProgress().entrySet()) {
            this.unionInitialProgress.add(new InnerProgress(entry.getKey(), entry.getValue()));
            hashSet.add(entry.getKey());
        }
        for (Tuple2<InputSplit, CURSOR> tuple2 : this.initialProgress) {
            if (!hashSet.contains(tuple2.f0)) {
                this.unionInitialProgress.add(new InnerProgress((InputSplit) tuple2.f0, (Serializable) tuple2.f1));
            }
        }
        if (null != this.reservedProgress) {
            for (Tuple2<InputSplit, CURSOR> tuple22 : this.reservedProgress) {
                if (!hashSet.contains(tuple22.f0)) {
                    this.unionInitialProgress.add(new InnerProgress((InputSplit) tuple22.f0, (Serializable) tuple22.f1));
                }
            }
        }
    }
}
