/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.sink.committables;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManager;
import org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl;
import org.apache.flink.util.Preconditions;

@Internal
public class CommittableCollector<CommT> {
    private final NavigableMap<Long, CheckpointCommittableManagerImpl<CommT>> checkpointCommittables;
    private final SinkCommitterMetricGroup metricGroup;

    public CommittableCollector(SinkCommitterMetricGroup metricGroup) {
        this(new TreeMap<Long, CheckpointCommittableManagerImpl<CommT>>(), metricGroup);
    }

    CommittableCollector(Map<Long, CheckpointCommittableManagerImpl<CommT>> checkpointCommittables, SinkCommitterMetricGroup metricGroup) {
        this.checkpointCommittables = new TreeMap<Long, CheckpointCommittableManagerImpl<CommT>>(Preconditions.checkNotNull(checkpointCommittables));
        this.metricGroup = metricGroup;
        this.metricGroup.setCurrentPendingCommittablesGauge(this::getNumPending);
    }

    private int getNumPending() {
        return this.checkpointCommittables.values().stream().mapToInt(m -> (int)m.getPendingRequests().count()).sum();
    }

    public static <CommT> CommittableCollector<CommT> of(SinkCommitterMetricGroup metricGroup) {
        return new CommittableCollector<CommT>(metricGroup);
    }

    static <CommT> CommittableCollector<CommT> ofLegacy(List<CommT> committables, SinkCommitterMetricGroup metricGroup) {
        CommittableCollector committableCollector = new CommittableCollector(metricGroup);
        CommittableSummary summary = new CommittableSummary(0, 1, 1L, committables.size(), committables.size(), 0);
        committableCollector.addSummary(summary);
        committables.forEach(c -> {
            CommittableWithLineage<Object> committableWithLineage = new CommittableWithLineage<Object>(c, 1L, 0);
            committableCollector.addCommittable(committableWithLineage);
        });
        return committableCollector;
    }

    public void addMessage(CommittableMessage<CommT> message) {
        if (message instanceof CommittableSummary) {
            this.addSummary((CommittableSummary)message);
        } else if (message instanceof CommittableWithLineage) {
            this.addCommittable((CommittableWithLineage)message);
        }
    }

    public Collection<? extends CheckpointCommittableManager<CommT>> getCheckpointCommittablesUpTo(long checkpointId) {
        return new ArrayList(this.checkpointCommittables.headMap(checkpointId, true).values());
    }

    public boolean isFinished() {
        return this.checkpointCommittables.values().stream().allMatch(CheckpointCommittableManagerImpl::isFinished);
    }

    public void merge(CommittableCollector<CommT> cc) {
        for (Map.Entry checkpointEntry : cc.checkpointCommittables.entrySet()) {
            this.checkpointCommittables.merge((Long)checkpointEntry.getKey(), (CheckpointCommittableManagerImpl)checkpointEntry.getValue(), CheckpointCommittableManagerImpl::merge);
        }
    }

    public CommittableCollector<CommT> copy() {
        return new CommittableCollector<CommT>(this.checkpointCommittables.entrySet().stream().map(e -> Tuple2.of((Long)e.getKey(), ((CheckpointCommittableManagerImpl)e.getValue()).copy())).collect(Collectors.toMap(t -> (Long)t.f0, t -> (CheckpointCommittableManagerImpl)t.f1)), this.metricGroup);
    }

    Collection<CheckpointCommittableManagerImpl<CommT>> getCheckpointCommittables() {
        return this.checkpointCommittables.values();
    }

    private void addSummary(CommittableSummary<CommT> summary) {
        this.checkpointCommittables.computeIfAbsent(summary.getCheckpointIdOrEOI(), key -> CheckpointCommittableManagerImpl.forSummary(summary, this.metricGroup)).addSummary(summary);
    }

    private void addCommittable(CommittableWithLineage<CommT> committable) {
        this.getCheckpointCommittables(committable).addCommittable(committable);
    }

    private CheckpointCommittableManagerImpl<CommT> getCheckpointCommittables(CommittableMessage<CommT> committable) {
        CheckpointCommittableManagerImpl committables = (CheckpointCommittableManagerImpl)this.checkpointCommittables.get(committable.getCheckpointIdOrEOI());
        return Preconditions.checkNotNull(committables, "Unknown checkpoint for %s", committable);
    }

    public void remove(CheckpointCommittableManager<CommT> manager) {
        this.checkpointCommittables.remove(manager.getCheckpointId());
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || this.getClass() != o.getClass()) {
            return false;
        }
        CommittableCollector that = (CommittableCollector)o;
        return Objects.equals(this.checkpointCommittables, that.checkpointCommittables);
    }

    public int hashCode() {
        return Objects.hashCode(this.checkpointCommittables);
    }

    public String toString() {
        return "CommittableCollector{checkpointCommittables=" + String.valueOf(this.checkpointCommittables) + "}";
    }
}

