/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.sink.committables;

import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
import org.apache.flink.shaded.guava33.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestImpl;
import org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestState;
import org.apache.flink.util.Preconditions;

class SubtaskCommittableManager<CommT> {
    private final Deque<CommitRequestImpl<CommT>> requests;
    private final int numExpectedCommittables;
    private final long checkpointId;
    private final int subtaskId;
    @Deprecated
    private int numDrained;
    private int numFailed;
    private final SinkCommitterMetricGroup metricGroup;

    SubtaskCommittableManager(int numExpectedCommittables, int subtaskId, long checkpointId, SinkCommitterMetricGroup metricGroup) {
        this(Collections.emptyList(), numExpectedCommittables, 0, 0, subtaskId, checkpointId, metricGroup);
    }

    SubtaskCommittableManager(Collection<CommitRequestImpl<CommT>> requests, int numExpectedCommittables, int numFailed, int subtaskId, long checkpointId, SinkCommitterMetricGroup metricGroup) {
        this(requests, numExpectedCommittables, 0, numFailed, subtaskId, checkpointId, metricGroup);
    }

    @Deprecated
    SubtaskCommittableManager(Collection<CommitRequestImpl<CommT>> requests, int numExpectedCommittables, int numDrained, int numFailed, int subtaskId, long checkpointId, SinkCommitterMetricGroup metricGroup) {
        this.checkpointId = checkpointId;
        this.subtaskId = subtaskId;
        this.numExpectedCommittables = numExpectedCommittables;
        this.requests = new ArrayDeque<CommitRequestImpl<CommT>>(Preconditions.checkNotNull(requests));
        this.numDrained = numDrained;
        this.numFailed = numFailed;
        this.metricGroup = metricGroup;
    }

    void add(CommittableWithLineage<CommT> committable) {
        this.add(committable.getCommittable());
    }

    void add(CommT committable) {
        Preconditions.checkState(this.requests.size() < this.numExpectedCommittables, "Already received all committables.");
        this.requests.add(new CommitRequestImpl<CommT>(committable, this.metricGroup));
        this.metricGroup.getNumCommittablesTotalCounter().inc();
    }

    boolean hasReceivedAll() {
        return this.getNumCommittables() == this.numExpectedCommittables + this.numFailed;
    }

    int getNumCommittables() {
        return this.requests.size();
    }

    int getNumFailed() {
        return this.numFailed;
    }

    boolean isFinished() {
        return this.getPendingRequests().findAny().isEmpty();
    }

    Stream<CommitRequestImpl<CommT>> getPendingRequests() {
        return this.requests.stream().filter(c -> !c.isFinished());
    }

    Stream<CommT> getSuccessfulCommittables() {
        return this.getRequests().stream().filter(c -> c.getState() == CommitRequestState.COMMITTED).map(CommitRequestImpl::getCommittable);
    }

    int getSubtaskId() {
        return this.subtaskId;
    }

    @VisibleForTesting
    long getCheckpointId() {
        return this.checkpointId;
    }

    Collection<CommitRequestImpl<CommT>> getRequests() {
        return this.requests;
    }

    SubtaskCommittableManager<CommT> merge(SubtaskCommittableManager<CommT> other) {
        Preconditions.checkArgument(other.getSubtaskId() == this.getSubtaskId(), "Different subtasks.");
        Preconditions.checkArgument(other.getCheckpointId() == this.getCheckpointId(), "Different checkpoints.");
        return new SubtaskCommittableManager<CommT>(Stream.concat(this.requests.stream(), other.requests.stream()).collect(Collectors.toList()), this.numExpectedCommittables + other.numExpectedCommittables, this.numFailed + other.numFailed, this.subtaskId, this.checkpointId, this.metricGroup);
    }

    SubtaskCommittableManager<CommT> copy() {
        return new SubtaskCommittableManager<CommT>(this.requests.stream().map(CommitRequestImpl::copy).collect(Collectors.toList()), this.numExpectedCommittables, this.numFailed, this.subtaskId, this.checkpointId, this.metricGroup);
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || this.getClass() != o.getClass()) {
            return false;
        }
        SubtaskCommittableManager that = (SubtaskCommittableManager)o;
        return this.numExpectedCommittables == that.numExpectedCommittables && this.checkpointId == that.checkpointId && this.subtaskId == that.subtaskId && this.numDrained == that.numDrained && this.numFailed == that.numFailed && Iterables.elementsEqual(this.requests, that.requests);
    }

    public int hashCode() {
        return Objects.hash(this.requests, this.numExpectedCommittables, this.checkpointId, this.subtaskId, this.numDrained, this.numFailed);
    }

    public String toString() {
        return "SubtaskCommittableManager{requests=" + String.valueOf(this.requests) + ", numExpectedCommittables=" + this.numExpectedCommittables + ", checkpointId=" + this.checkpointId + ", subtaskId=" + this.subtaskId + ", numFailed=" + this.numFailed + "}";
    }
}

