/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.sink.committables;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
import org.apache.flink.shaded.guava32.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestImpl;
import org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestState;
import org.apache.flink.util.Preconditions;

class SubtaskCommittableManager<CommT> {
    private final Deque<CommitRequestImpl<CommT>> requests;
    private final int numExpectedCommittables;
    private final long checkpointId;
    private final int subtaskId;
    private int numDrained;
    private int numFailed;
    private final SinkCommitterMetricGroup metricGroup;

    SubtaskCommittableManager(int numExpectedCommittables, int subtaskId, long checkpointId, SinkCommitterMetricGroup metricGroup) {
        this(Collections.emptyList(), numExpectedCommittables, 0, 0, subtaskId, checkpointId, metricGroup);
    }

    SubtaskCommittableManager(Collection<CommitRequestImpl<CommT>> requests, int numExpectedCommittables, int numDrained, int numFailed, int subtaskId, long checkpointId, SinkCommitterMetricGroup metricGroup) {
        this.checkpointId = checkpointId;
        this.subtaskId = subtaskId;
        this.numExpectedCommittables = numExpectedCommittables;
        this.requests = new ArrayDeque<CommitRequestImpl<CommT>>((Collection)Preconditions.checkNotNull(requests));
        this.numDrained = numDrained;
        this.numFailed = numFailed;
        this.metricGroup = metricGroup;
    }

    void add(CommittableWithLineage<CommT> committable) {
        this.add(committable.getCommittable());
    }

    void add(CommT committable) {
        Preconditions.checkState((this.requests.size() < this.numExpectedCommittables ? 1 : 0) != 0, (Object)"Already received all committables.");
        this.requests.add(new CommitRequestImpl<CommT>(committable, this.metricGroup));
        this.metricGroup.getNumCommittablesTotalCounter().inc();
    }

    boolean hasReceivedAll() {
        return this.getNumCommittables() == this.numExpectedCommittables;
    }

    int getNumCommittables() {
        return this.requests.size() + this.numDrained + this.numFailed;
    }

    int getNumPending() {
        return this.numExpectedCommittables - (this.numDrained + this.numFailed);
    }

    int getNumFailed() {
        return this.numFailed;
    }

    boolean isFinished() {
        return this.getNumPending() == 0;
    }

    Stream<CommitRequestImpl<CommT>> getPendingRequests() {
        return this.requests.stream().filter(c -> !c.isFinished());
    }

    List<CommittableWithLineage<CommT>> drainCommitted() {
        ArrayList<CommittableWithLineage<CommT>> committed = new ArrayList<CommittableWithLineage<CommT>>(this.requests.size());
        Iterator<CommitRequestImpl<CommT>> iterator = this.requests.iterator();
        while (iterator.hasNext()) {
            CommitRequestImpl<CommT> request = iterator.next();
            if (!request.isFinished()) continue;
            if (request.getState() == CommitRequestState.FAILED) {
                ++this.numFailed;
                iterator.remove();
                continue;
            }
            committed.add(new CommittableWithLineage<CommT>(request.getCommittable(), this.checkpointId, this.subtaskId));
            iterator.remove();
        }
        this.numDrained += committed.size();
        return committed;
    }

    int getNumDrained() {
        return this.numDrained;
    }

    int getSubtaskId() {
        return this.subtaskId;
    }

    @VisibleForTesting
    long getCheckpointId() {
        return this.checkpointId;
    }

    Deque<CommitRequestImpl<CommT>> getRequests() {
        return this.requests;
    }

    SubtaskCommittableManager<CommT> merge(SubtaskCommittableManager<CommT> other) {
        Preconditions.checkArgument((other.getSubtaskId() == this.getSubtaskId() ? 1 : 0) != 0, (Object)"Different subtasks.");
        Preconditions.checkArgument((other.getCheckpointId() == this.getCheckpointId() ? 1 : 0) != 0, (Object)"Different checkpoints.");
        return new SubtaskCommittableManager<CommT>(Stream.concat(this.requests.stream(), other.requests.stream()).collect(Collectors.toList()), this.numExpectedCommittables + other.numExpectedCommittables, this.numDrained + other.numDrained, this.numFailed + other.numFailed, this.subtaskId, this.checkpointId, this.metricGroup);
    }

    SubtaskCommittableManager<CommT> copy() {
        return new SubtaskCommittableManager<CommT>(this.requests.stream().map(CommitRequestImpl::copy).collect(Collectors.toList()), this.numExpectedCommittables, this.numDrained, this.numFailed, this.subtaskId, this.checkpointId, this.metricGroup);
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || this.getClass() != o.getClass()) {
            return false;
        }
        SubtaskCommittableManager that = (SubtaskCommittableManager)o;
        return this.numExpectedCommittables == that.numExpectedCommittables && this.checkpointId == that.checkpointId && this.subtaskId == that.subtaskId && this.numDrained == that.numDrained && this.numFailed == that.numFailed && Iterables.elementsEqual(this.requests, that.requests);
    }

    public int hashCode() {
        return Objects.hash(this.requests, this.numExpectedCommittables, this.checkpointId, this.subtaskId, this.numDrained, this.numFailed);
    }

    public String toString() {
        return "SubtaskCommittableManager{requests=" + this.requests + ", numExpectedCommittables=" + this.numExpectedCommittables + ", checkpointId=" + this.checkpointId + ", subtaskId=" + this.subtaskId + ", numDrained=" + this.numDrained + ", numFailed=" + this.numFailed + "}";
    }
}

