/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.changelog;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.PhysicalStateHandleID;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.changelog.LocalChangelogRegistry;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class LocalChangelogRegistryImpl
implements LocalChangelogRegistry {
    private static final Logger LOG = LoggerFactory.getLogger(LocalChangelogRegistry.class);
    private final Map<PhysicalStateHandleID, Tuple2<StreamStateHandle, Long>> handleToLastUsedCheckpointID = new ConcurrentHashMap<PhysicalStateHandleID, Tuple2<StreamStateHandle, Long>>();
    private final ExecutorService asyncDisposalExecutor;

    public LocalChangelogRegistryImpl(ExecutorService ioExecutor) {
        this.asyncDisposalExecutor = ioExecutor;
    }

    @Override
    public void register(StreamStateHandle handle, long checkpointID) {
        this.handleToLastUsedCheckpointID.compute(handle.getStreamStateHandleID(), (k, v) -> {
            if (v == null) {
                return Tuple2.of(handle, checkpointID);
            }
            Preconditions.checkState(handle.equals(v.f0));
            return Tuple2.of(handle, Math.max((Long)v.f1, checkpointID));
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void discardUpToCheckpoint(long upTo) {
        ArrayList<StreamStateHandle> handles = new ArrayList<StreamStateHandle>();
        Map<PhysicalStateHandleID, Tuple2<StreamStateHandle, Long>> map = this.handleToLastUsedCheckpointID;
        synchronized (map) {
            Iterator<Tuple2<StreamStateHandle, Long>> iterator = this.handleToLastUsedCheckpointID.values().iterator();
            while (iterator.hasNext()) {
                Tuple2<StreamStateHandle, Long> entry = iterator.next();
                if ((Long)entry.f1 >= upTo) continue;
                handles.add((StreamStateHandle)entry.f0);
                iterator.remove();
            }
        }
        for (StreamStateHandle handle : handles) {
            this.scheduleAsyncDelete(handle);
        }
    }

    private void scheduleAsyncDelete(StreamStateHandle streamStateHandle) {
        if (streamStateHandle != null) {
            LOG.trace("Scheduled delete of state handle {}.", (Object)streamStateHandle);
            Runnable discardRunner = () -> {
                try {
                    streamStateHandle.discardState();
                }
                catch (Exception exception) {
                    LOG.warn("A problem occurred during asynchronous disposal of a stream handle {}.", (Object)streamStateHandle);
                }
            };
            try {
                this.asyncDisposalExecutor.execute(discardRunner);
            }
            catch (RejectedExecutionException ex) {
                discardRunner.run();
            }
        }
    }

    @Override
    public void close() throws IOException {
        this.asyncDisposalExecutor.shutdown();
        this.handleToLastUsedCheckpointID.clear();
    }
}

