/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.changelog.inmemory;

import java.io.IOException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.changelog.SequenceNumber;
import org.apache.flink.runtime.state.changelog.StateChange;
import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
import org.apache.flink.runtime.state.changelog.inmemory.InMemoryChangelogStateHandle;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
class InMemoryStateChangelogWriter
implements StateChangelogWriter<InMemoryChangelogStateHandle> {
    private static final Logger LOG = LoggerFactory.getLogger(InMemoryStateChangelogWriter.class);
    private static final SequenceNumber INITIAL_SQN = SequenceNumber.of(0L);
    private final Map<Integer, NavigableMap<SequenceNumber, byte[]>> changesByKeyGroup = new HashMap<Integer, NavigableMap<SequenceNumber, byte[]>>();
    private final KeyGroupRange keyGroupRange;
    private SequenceNumber sqn = INITIAL_SQN;
    private boolean closed;

    public InMemoryStateChangelogWriter(KeyGroupRange keyGroupRange) {
        this.keyGroupRange = keyGroupRange;
    }

    @Override
    public void appendMeta(byte[] value) throws IOException {
        LOG.trace("append metadata: {} bytes", (Object)value.length);
        if (this.closed) {
            LOG.warn("LogWriter is closed.");
            return;
        }
        this.changesByKeyGroup.computeIfAbsent(-1, unused -> new TreeMap()).put(this.sqn, value);
        this.sqn = this.sqn.next();
    }

    @Override
    public void append(int keyGroup, byte[] value) {
        LOG.trace("append, keyGroup={}, {} bytes", (Object)keyGroup, (Object)value.length);
        if (this.closed) {
            LOG.warn("LogWriter is closed.");
            return;
        }
        this.changesByKeyGroup.computeIfAbsent(keyGroup, unused -> new TreeMap()).put(this.sqn, value);
        this.sqn = this.sqn.next();
    }

    @Override
    public SequenceNumber initialSequenceNumber() {
        return INITIAL_SQN;
    }

    @Override
    public SequenceNumber nextSequenceNumber() {
        return this.sqn;
    }

    @Override
    public CompletableFuture<SnapshotResult<InMemoryChangelogStateHandle>> persist(SequenceNumber from, long checkpointId) {
        LOG.debug("Persist after {}", (Object)from);
        Preconditions.checkNotNull((Object)from);
        return CompletableFuture.completedFuture(SnapshotResult.withLocalState(new InMemoryChangelogStateHandle(this.collectChanges(from), from, this.sqn, this.keyGroupRange), new InMemoryChangelogStateHandle(this.collectChanges(from), from, this.sqn, this.keyGroupRange)));
    }

    private List<StateChange> collectChanges(SequenceNumber after) {
        return this.changesByKeyGroup.entrySet().stream().flatMap(e -> this.toChangeStream((NavigableMap)e.getValue(), after, (Integer)e.getKey())).sorted(Comparator.comparing(sqnAndChange -> (SequenceNumber)sqnAndChange.f0)).map(t -> (StateChange)t.f1).collect(Collectors.toList());
    }

    private Stream<Tuple2<SequenceNumber, StateChange>> toChangeStream(NavigableMap<SequenceNumber, byte[]> changeMap, SequenceNumber after, int keyGroup) {
        if (keyGroup == -1) {
            return changeMap.tailMap(after, true).entrySet().stream().map(e2 -> Tuple2.of((Object)((SequenceNumber)e2.getKey()), (Object)StateChange.ofMetadataChange((byte[])e2.getValue())));
        }
        return changeMap.tailMap(after, true).entrySet().stream().map(e2 -> Tuple2.of((Object)((SequenceNumber)e2.getKey()), (Object)StateChange.ofDataChange(keyGroup, (byte[])e2.getValue())));
    }

    @Override
    public void close() {
        Preconditions.checkState((!this.closed ? 1 : 0) != 0);
        this.closed = true;
    }

    @Override
    public void truncate(SequenceNumber to) {
        this.changesByKeyGroup.forEach((kg, changesBySqn) -> changesBySqn.headMap(to, false).clear());
    }

    @Override
    public void truncateAndClose(SequenceNumber from) {
        this.close();
    }

    @Override
    public void confirm(SequenceNumber from, SequenceNumber to, long checkpointID) {
    }

    @Override
    public void reset(SequenceNumber from, SequenceNumber to, long checkpointID) {
    }
}

