/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators;

import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.Iterator;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.io.disk.InputViewIterator;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.util.ReusingMutableToRegularIteratorWrapper;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class GenericWriteAheadSink<IN>
extends AbstractStreamOperator<IN>
implements OneInputStreamOperator<IN, IN> {
    private static final long serialVersionUID = 1L;
    protected static final Logger LOG = LoggerFactory.getLogger(GenericWriteAheadSink.class);
    private final String id;
    private final CheckpointCommitter committer;
    protected final TypeSerializer<IN> serializer;
    private transient CheckpointStreamFactory.CheckpointStateOutputStream out;
    private transient CheckpointStreamFactory checkpointStreamFactory;
    private transient ListState<PendingCheckpoint> checkpointedState;
    private final Set<PendingCheckpoint> pendingCheckpoints = new TreeSet<PendingCheckpoint>();

    public GenericWriteAheadSink(CheckpointCommitter committer, TypeSerializer<IN> serializer, String jobID) throws Exception {
        this.committer = (CheckpointCommitter)Preconditions.checkNotNull((Object)committer);
        this.serializer = (TypeSerializer)Preconditions.checkNotNull(serializer);
        this.id = UUID.randomUUID().toString();
        this.committer.setJobId(jobID);
        this.committer.createResource();
    }

    @Override
    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        Preconditions.checkState((this.checkpointedState == null ? 1 : 0) != 0, (Object)"The reader state has already been initialized.");
        this.checkpointedState = context.getOperatorStateStore().getSerializableListState("pending-checkpoints");
        int subtaskIdx = this.getRuntimeContext().getIndexOfThisSubtask();
        if (context.isRestored()) {
            LOG.info("Restoring state for the GenericWriteAheadSink (taskIdx={}).", (Object)subtaskIdx);
            for (PendingCheckpoint pendingCheckpoint : (Iterable)this.checkpointedState.get()) {
                this.pendingCheckpoints.add(pendingCheckpoint);
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("GenericWriteAheadSink idx {} restored {}.", (Object)subtaskIdx, this.pendingCheckpoints);
            }
        } else {
            LOG.info("No state to restore for the GenericWriteAheadSink (taskIdx={}).", (Object)subtaskIdx);
        }
    }

    @Override
    public void open() throws Exception {
        super.open();
        this.committer.setOperatorId(this.id);
        this.committer.open();
        this.checkpointStreamFactory = this.getContainingTask().createCheckpointStreamFactory(this);
        this.cleanRestoredHandles();
    }

    @Override
    public void close() throws Exception {
        this.committer.close();
    }

    private void saveHandleInState(long checkpointId, long timestamp) throws Exception {
        if (this.out != null) {
            StreamStateHandle handle;
            int subtaskIdx = this.getRuntimeContext().getIndexOfThisSubtask();
            PendingCheckpoint pendingCheckpoint = new PendingCheckpoint(checkpointId, subtaskIdx, timestamp, handle = this.out.closeAndGetHandle());
            if (this.pendingCheckpoints.contains(pendingCheckpoint)) {
                handle.discardState();
            } else {
                this.pendingCheckpoints.add(pendingCheckpoint);
            }
            this.out = null;
        }
    }

    @Override
    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);
        Preconditions.checkState((this.checkpointedState != null ? 1 : 0) != 0, (Object)"The operator state has not been properly initialized.");
        this.saveHandleInState(context.getCheckpointId(), context.getCheckpointTimestamp());
        this.checkpointedState.clear();
        try {
            for (PendingCheckpoint pendingCheckpoint : this.pendingCheckpoints) {
                this.checkpointedState.add((Object)pendingCheckpoint);
            }
        }
        catch (Exception e) {
            this.checkpointedState.clear();
            throw new Exception("Could not add panding checkpoints to operator state backend of operator " + this.getOperatorName() + '.', e);
        }
        int subtaskIdx = this.getRuntimeContext().getIndexOfThisSubtask();
        if (LOG.isDebugEnabled()) {
            LOG.debug("{} (taskIdx= {}) checkpointed {}.", new Object[]{this.getClass().getSimpleName(), subtaskIdx, this.pendingCheckpoints});
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void cleanRestoredHandles() throws Exception {
        Set<PendingCheckpoint> set = this.pendingCheckpoints;
        synchronized (set) {
            Iterator<PendingCheckpoint> pendingCheckpointIt = this.pendingCheckpoints.iterator();
            while (pendingCheckpointIt.hasNext()) {
                PendingCheckpoint pendingCheckpoint = pendingCheckpointIt.next();
                if (!this.committer.isCheckpointCommitted(pendingCheckpoint.subtaskId, pendingCheckpoint.checkpointId)) continue;
                pendingCheckpoint.stateHandle.discardState();
                pendingCheckpointIt.remove();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {
        super.notifyOfCompletedCheckpoint(checkpointId);
        Set<PendingCheckpoint> set = this.pendingCheckpoints;
        synchronized (set) {
            Iterator<PendingCheckpoint> pendingCheckpointIt = this.pendingCheckpoints.iterator();
            while (pendingCheckpointIt.hasNext()) {
                PendingCheckpoint pendingCheckpoint = pendingCheckpointIt.next();
                long pastCheckpointId = pendingCheckpoint.checkpointId;
                int subtaskId = pendingCheckpoint.subtaskId;
                long timestamp = pendingCheckpoint.timestamp;
                StreamStateHandle streamHandle = pendingCheckpoint.stateHandle;
                if (pastCheckpointId > checkpointId) continue;
                try {
                    if (!this.committer.isCheckpointCommitted(subtaskId, pastCheckpointId)) {
                        FSDataInputStream in = streamHandle.openInputStream();
                        Throwable throwable = null;
                        try {
                            boolean success = this.sendValues((Iterable<IN>)new ReusingMutableToRegularIteratorWrapper((MutableObjectIterator)new InputViewIterator((DataInputView)new DataInputViewStreamWrapper((InputStream)in), this.serializer), this.serializer), checkpointId, timestamp);
                            if (!success) continue;
                            this.committer.commitCheckpoint(subtaskId, pastCheckpointId);
                            streamHandle.discardState();
                            pendingCheckpointIt.remove();
                            continue;
                        }
                        catch (Throwable throwable2) {
                            throwable = throwable2;
                            throw throwable2;
                        }
                        finally {
                            if (in == null) continue;
                            if (throwable != null) {
                                try {
                                    in.close();
                                }
                                catch (Throwable throwable3) {
                                    throwable.addSuppressed(throwable3);
                                }
                                continue;
                            }
                            in.close();
                            continue;
                        }
                    }
                    streamHandle.discardState();
                    pendingCheckpointIt.remove();
                }
                catch (Exception e) {
                    LOG.error("Could not commit checkpoint.", (Throwable)e);
                    break;
                }
            }
        }
    }

    protected abstract boolean sendValues(Iterable<IN> var1, long var2, long var4) throws Exception;

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        IN value = element.getValue();
        if (this.out == null) {
            this.out = this.checkpointStreamFactory.createCheckpointStateOutputStream(0L, 0L);
        }
        this.serializer.serialize(value, (DataOutputView)new DataOutputViewStreamWrapper((OutputStream)this.out));
    }

    private static final class PendingCheckpoint
    implements Comparable<PendingCheckpoint>,
    Serializable {
        private static final long serialVersionUID = -3571036395734603443L;
        private final long checkpointId;
        private final int subtaskId;
        private final long timestamp;
        private final StreamStateHandle stateHandle;

        PendingCheckpoint(long checkpointId, int subtaskId, long timestamp, StreamStateHandle handle) {
            this.checkpointId = checkpointId;
            this.subtaskId = subtaskId;
            this.timestamp = timestamp;
            this.stateHandle = handle;
        }

        @Override
        public int compareTo(PendingCheckpoint o) {
            int res = Long.compare(this.checkpointId, o.checkpointId);
            return res != 0 ? res : this.subtaskId - o.subtaskId;
        }

        public boolean equals(Object o) {
            if (o == null || !(o instanceof PendingCheckpoint)) {
                return false;
            }
            PendingCheckpoint other = (PendingCheckpoint)o;
            return this.checkpointId == other.checkpointId && this.subtaskId == other.subtaskId && this.timestamp == other.timestamp;
        }

        public int hashCode() {
            int hash = 17;
            hash = 31 * hash + (int)(this.checkpointId ^ this.checkpointId >>> 32);
            hash = 31 * hash + this.subtaskId;
            hash = 31 * hash + (int)(this.timestamp ^ this.timestamp >>> 32);
            return hash;
        }

        public String toString() {
            return "Pending Checkpoint: id=" + this.checkpointId + "/" + this.subtaskId + "@" + this.timestamp;
        }
    }
}

