package org.apache.pekko.stream.impl;

import org.apache.pekko.dispatch.ExecutionContexts$;
import org.apache.pekko.stream.AbruptStageTerminationException;
import org.apache.pekko.stream.Attributes;
import org.apache.pekko.stream.impl.QueueSink;
import org.apache.pekko.stream.scaladsl.SinkQueueWithCancel;
import org.apache.pekko.stream.stage.AsyncCallback;
import org.apache.pekko.stream.stage.GraphStageLogic;
import org.apache.pekko.stream.stage.InHandler;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.concurrent.Promise$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;
import scala.util.control.NonFatal$;

/* JADX INFO: Add missing generic type declarations: [T] */
/* compiled from: Sinks.scala */
/* loaded from: input_file:flink-rpc-akka.jar:org/apache/pekko/stream/impl/QueueSink$$anon$4.class */
public final class QueueSink$$anon$4<T> extends GraphStageLogic implements InHandler, SinkQueueWithCancel<T> {
    private final int maxBuffer;
    private final Buffer<Try<Option<T>>> buffer;
    private final Buffer<Promise<Option<T>>> currentRequests;
    private final AsyncCallback<QueueSink.Output<T>> callback;
    private final /* synthetic */ QueueSink $outer;

    public int maxBuffer() {
        return this.maxBuffer;
    }

    public Buffer<Try<Option<T>>> buffer() {
        return this.buffer;
    }

    public Buffer<Promise<Option<T>>> currentRequests() {
        return this.currentRequests;
    }

    @Override // org.apache.pekko.stream.stage.GraphStageLogic
    public void preStart() {
        setKeepGoing(true);
        pull(this.$outer.in());
    }

    private AsyncCallback<QueueSink.Output<T>> callback() {
        return this.callback;
    }

    public void sendDownstream(Promise<Option<T>> promise) {
        Try<Option<T>> dequeue = buffer().dequeue();
        promise.complete(dequeue);
        boolean z = false;
        Success success = null;
        if (dequeue instanceof Success) {
            z = true;
            success = (Success) dequeue;
            if (success.value() instanceof Some) {
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
                return;
            }
        }
        if (z) {
            if (None$.MODULE$.equals((Option) success.value())) {
                completeStage();
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                return;
            }
        }
        if (!(dequeue instanceof Failure)) {
            throw new MatchError(dequeue);
        }
        failStage(((Failure) dequeue).exception());
        BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
    }

    @Override // org.apache.pekko.stream.stage.InHandler
    public void onPush() {
        buffer().enqueue(new Success(new Some(grab(this.$outer.in()))));
        if (currentRequests().nonEmpty()) {
            currentRequests().dequeue().complete(buffer().dequeue());
        } else {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
        if (buffer().used() < maxBuffer()) {
            pull(this.$outer.in());
        }
    }

    @Override // org.apache.pekko.stream.stage.InHandler
    public void onUpstreamFinish() {
        buffer().enqueue(new Success(None$.MODULE$));
        while (currentRequests().nonEmpty() && buffer().nonEmpty()) {
            currentRequests().dequeue().complete(buffer().dequeue());
        }
        while (currentRequests().nonEmpty()) {
            currentRequests().dequeue().complete(new Success(None$.MODULE$));
        }
        if (buffer().isEmpty()) {
            completeStage();
        }
    }

    @Override // org.apache.pekko.stream.stage.InHandler
    public void onUpstreamFailure(Throwable th) {
        buffer().enqueue(new Failure(th));
        while (currentRequests().nonEmpty() && buffer().nonEmpty()) {
            currentRequests().dequeue().complete(buffer().dequeue());
        }
        while (currentRequests().nonEmpty()) {
            currentRequests().dequeue().complete(new Failure(th));
        }
        if (buffer().isEmpty()) {
            failStage(th);
        }
    }

    @Override // org.apache.pekko.stream.stage.GraphStageLogic
    public void postStop() {
        while (currentRequests().nonEmpty()) {
            currentRequests().dequeue().failure(new AbruptStageTerminationException(this));
        }
    }

    @Override // org.apache.pekko.stream.scaladsl.SinkQueue
    public Future<Option<T>> pull() {
        Promise<T> apply = Promise$.MODULE$.apply();
        callback().invokeWithFeedback(new QueueSink.Pull(apply)).failed().foreach(th -> {
            Option<Throwable> unapply = NonFatal$.MODULE$.unapply(th);
            return !unapply.isEmpty() ? BoxesRunTime.boxToBoolean(apply.tryFailure(unapply.get())) : BoxedUnit.UNIT;
        }, ExecutionContexts$.MODULE$.parasitic());
        return apply.future();
    }

    @Override // org.apache.pekko.stream.scaladsl.SinkQueueWithCancel
    public void cancel() {
        callback().invoke(QueueSink$Cancel$.MODULE$);
    }

    public static final /* synthetic */ void $anonfun$callback$1(QueueSink$$anon$4 queueSink$$anon$4, QueueSink.Output output) {
        Promise<Option<T>> promise;
        if (!(output instanceof QueueSink.Pull) || (promise = ((QueueSink.Pull) output).promise()) == null) {
            if (!QueueSink$Cancel$.MODULE$.equals(output)) {
                throw new MatchError(output);
            }
            queueSink$$anon$4.completeStage();
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            return;
        }
        if (queueSink$$anon$4.currentRequests().isFull()) {
            promise.failure(new IllegalStateException(new StringBuilder(129).append("Too many concurrent pulls. Specified maximum is ").append(queueSink$$anon$4.$outer.org$apache$pekko$stream$impl$QueueSink$$maxConcurrentPulls).append(". ").append("You have to wait for one previous future to be resolved to send another request").toString()));
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        } else if (queueSink$$anon$4.buffer().isEmpty()) {
            queueSink$$anon$4.currentRequests().enqueue(promise);
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        } else {
            if (queueSink$$anon$4.buffer().used() == queueSink$$anon$4.maxBuffer()) {
                queueSink$$anon$4.tryPull(queueSink$$anon$4.$outer.in());
            }
            queueSink$$anon$4.sendDownstream(promise);
            BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
        }
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public QueueSink$$anon$4(QueueSink queueSink, Attributes attributes) {
        super(queueSink.shape2());
        if (queueSink == null) {
            throw null;
        }
        this.$outer = queueSink;
        InHandler.$init$(this);
        this.maxBuffer = ((Attributes.InputBuffer) attributes.get(new Attributes.InputBuffer(16, 16), ClassTag$.MODULE$.apply(Attributes.InputBuffer.class))).max();
        Predef$.MODULE$.require(maxBuffer() > 0, () -> {
            return "Buffer size must be greater than 0";
        });
        this.buffer = Buffer$.MODULE$.apply(maxBuffer() + 1, attributes);
        this.currentRequests = Buffer$.MODULE$.apply(queueSink.org$apache$pekko$stream$impl$QueueSink$$maxConcurrentPulls, attributes);
        this.callback = getAsyncCallback(output -> {
            $anonfun$callback$1(this, output);
            return BoxedUnit.UNIT;
        });
        setHandler(queueSink.in(), this);
    }
}
