/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.functions.source;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SerializedCheckpointData;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
public abstract class MessageAcknowledgingSourceBase<Type, UId>
extends RichSourceFunction<Type>
implements CheckpointedFunction,
CheckpointListener {
    private static final long serialVersionUID = -8689291992192955579L;
    private static final Logger LOG = LoggerFactory.getLogger(MessageAcknowledgingSourceBase.class);
    private final TypeSerializer<UId> idSerializer;
    private transient Set<UId> idsForCurrentCheckpoint;
    protected transient ArrayDeque<Tuple2<Long, Set<UId>>> pendingCheckpoints;
    private transient Set<UId> idsProcessedButNotAcknowledged;
    private transient ListState<SerializedCheckpointData[]> checkpointedState;

    protected MessageAcknowledgingSourceBase(Class<UId> idClass) {
        this(TypeExtractor.getForClass(idClass));
    }

    protected MessageAcknowledgingSourceBase(TypeInformation<UId> idTypeInfo) {
        this.idSerializer = idTypeInfo.createSerializer(new ExecutionConfig());
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        Preconditions.checkState((this.checkpointedState == null ? 1 : 0) != 0, (Object)("The " + this.getClass().getSimpleName() + " has already been initialized."));
        this.checkpointedState = context.getOperatorStateStore().getSerializableListState("message-acknowledging-source-state");
        this.idsForCurrentCheckpoint = new HashSet<UId>(64);
        this.pendingCheckpoints = new ArrayDeque();
        this.idsProcessedButNotAcknowledged = new HashSet<UId>();
        if (context.isRestored()) {
            LOG.info("Restoring state for the {}.", (Object)this.getClass().getSimpleName());
            ArrayList<SerializedCheckpointData[]> retrievedStates = new ArrayList<SerializedCheckpointData[]>();
            for (SerializedCheckpointData[] serializedCheckpointDataArray : (Iterable)this.checkpointedState.get()) {
                retrievedStates.add(serializedCheckpointDataArray);
            }
            Preconditions.checkArgument((retrievedStates.size() == 1 ? 1 : 0) != 0, (Object)(this.getClass().getSimpleName() + " retrieved invalid state."));
            this.pendingCheckpoints = SerializedCheckpointData.toDeque((SerializedCheckpointData[])retrievedStates.get(0), this.idSerializer);
            for (Tuple2 tuple2 : this.pendingCheckpoints) {
                this.idsProcessedButNotAcknowledged.addAll((Collection)tuple2.f1);
            }
        } else {
            LOG.info("No state to restore for the {}.", (Object)this.getClass().getSimpleName());
        }
    }

    public void close() throws Exception {
        this.idsForCurrentCheckpoint.clear();
        this.pendingCheckpoints.clear();
    }

    protected abstract void acknowledgeIDs(long var1, Set<UId> var3);

    protected boolean addId(UId uid) {
        this.idsForCurrentCheckpoint.add(uid);
        return this.idsProcessedButNotAcknowledged.add(uid);
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        Preconditions.checkState((this.checkpointedState != null ? 1 : 0) != 0, (Object)("The " + this.getClass().getSimpleName() + " has not been properly initialized."));
        if (LOG.isDebugEnabled()) {
            LOG.debug("{} checkpointing: Messages: {}, checkpoint id: {}, timestamp: {}", new Object[]{this.idsForCurrentCheckpoint, context.getCheckpointId(), context.getCheckpointTimestamp()});
        }
        this.pendingCheckpoints.addLast(new Tuple2((Object)context.getCheckpointId(), this.idsForCurrentCheckpoint));
        this.idsForCurrentCheckpoint = new HashSet<UId>(64);
        this.checkpointedState.clear();
        this.checkpointedState.add((Object)SerializedCheckpointData.fromDeque(this.pendingCheckpoints, this.idSerializer));
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        LOG.debug("Committing Messages externally for checkpoint {}", (Object)checkpointId);
        Iterator<Tuple2<Long, Set<UId>>> iter = this.pendingCheckpoints.iterator();
        while (iter.hasNext()) {
            Tuple2<Long, Set<UId>> checkpoint = iter.next();
            long id = (Long)checkpoint.f0;
            if (id > checkpointId) break;
            LOG.trace("Committing Messages with following IDs {}", checkpoint.f1);
            this.acknowledgeIDs(checkpointId, (Set)checkpoint.f1);
            this.idsProcessedButNotAcknowledged.removeAll((Collection)checkpoint.f1);
            iter.remove();
        }
    }
}

