package org.eclipse.ditto.services.gateway.streaming.actors;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.event.DiagnosticLoggingAdapter;
import akka.event.EventStream;
import akka.japi.pf.ReceiveBuilder;
import akka.stream.actor.AbstractActorSubscriber;
import akka.stream.actor.ActorSubscriberMessage;
import akka.stream.actor.MaxInFlightRequestStrategy;
import akka.stream.actor.RequestStrategy;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.eclipse.ditto.json.JsonRuntimeException;
import org.eclipse.ditto.model.base.exceptions.DittoJsonException;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.headers.DittoHeaderDefinition;
import org.eclipse.ditto.services.gateway.streaming.ResponsePublished;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.signals.commands.base.Command;
import org.eclipse.ditto.signals.commands.base.exceptions.GatewayInternalErrorException;

/* loaded from: input_file:org/eclipse/ditto/services/gateway/streaming/actors/CommandSubscriber.class */
public final class CommandSubscriber extends AbstractActorSubscriber {
    private final ActorRef delegateActor;
    private final int backpressureQueueSize;
    private final DiagnosticLoggingAdapter logger = LogUtil.obtain(this);
    private final List<String> outstandingCommandCorrelationIds = new ArrayList();

    private CommandSubscriber(ActorRef actorRef, int i, EventStream eventStream) {
        this.delegateActor = actorRef;
        this.backpressureQueueSize = i;
        eventStream.subscribe(getSelf(), ResponsePublished.class);
    }

    public static Props props(ActorRef actorRef, int i, EventStream eventStream) {
        return Props.create(CommandSubscriber.class, new Object[]{actorRef, Integer.valueOf(i), eventStream});
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(ActorSubscriberMessage.OnNext.class, onNext -> {
            return onNext.element() instanceof Signal;
        }, onNext2 -> {
            Signal<?> signal = (Signal) onNext2.element();
            Optional correlationId = signal.getDittoHeaders().getCorrelationId();
            if (!correlationId.isPresent()) {
                this.logger.warning("Got a Signal <{}> without correlationId, NOT accepting/forwarding it: {}", signal.getType(), signal);
                return;
            }
            String str = (String) correlationId.get();
            LogUtil.enhanceLogWithCorrelationId(this.logger, str, new LogUtil.MdcField[0]);
            if (isResponseExpected(signal)) {
                this.outstandingCommandCorrelationIds.add(str);
                if (this.outstandingCommandCorrelationIds.size() > this.backpressureQueueSize) {
                    throw new IllegalStateException("queued too many: " + this.outstandingCommandCorrelationIds.size() + " - backpressureQueueSize is: " + this.backpressureQueueSize);
                }
            }
            this.logger.debug("Got new Signal <{}>, currently outstanding are <{}>", signal.getType(), Integer.valueOf(this.outstandingCommandCorrelationIds.size()));
            this.delegateActor.tell(signal, getSelf());
        }).match(ResponsePublished.class, responsePublished -> {
            this.outstandingCommandCorrelationIds.remove(responsePublished.getCorrelationId());
        }).match(DittoRuntimeException.class, dittoRuntimeException -> {
            handleDittoRuntimeException(this.delegateActor, dittoRuntimeException);
        }).match(RuntimeException.class, runtimeException -> {
            handleDittoRuntimeException(this.delegateActor, new DittoJsonException(runtimeException));
        }).match(ActorSubscriberMessage.OnNext.class, onNext3 -> {
            this.logger.warning("Got unknown element in 'OnNext'");
        }).matchEquals(ActorSubscriberMessage.onCompleteInstance(), actorSubscriberMessage$OnComplete$ -> {
            this.logger.info("Stream completed, stopping myself..");
            getContext().stop(getSelf());
        }).match(ActorSubscriberMessage.OnError.class, onError -> {
            Throwable cause = onError.cause();
            if (cause instanceof DittoRuntimeException) {
                handleDittoRuntimeException(this.delegateActor, (DittoRuntimeException) cause);
                return;
            }
            if (cause instanceof JsonRuntimeException) {
                handleDittoRuntimeException(this.delegateActor, new DittoJsonException((RuntimeException) cause));
            } else if (!(cause instanceof RuntimeException)) {
                this.logger.warning("Got 'OnError': {} {}", cause.getClass().getName(), cause.getMessage());
            } else {
                this.logger.error(cause, "Unexpected RuntimeException <{}>: ", cause.getClass().getSimpleName(), cause.getMessage());
                handleDittoRuntimeException(this.delegateActor, GatewayInternalErrorException.newBuilder().cause(cause).build());
            }
        }).matchAny(obj -> {
            this.logger.warning("Got unknown message '{}'", obj);
        }).build();
    }

    private boolean isResponseExpected(Signal<?> signal) {
        return (signal instanceof Command) && signal.getDittoHeaders().isResponseRequired();
    }

    private void handleDittoRuntimeException(ActorRef actorRef, DittoRuntimeException dittoRuntimeException) {
        LogUtil.enhanceLogWithCorrelationId(this.logger, dittoRuntimeException.getDittoHeaders().getCorrelationId(), new LogUtil.MdcField[0]);
        this.logger.info("Got 'DittoRuntimeException': {} {}", dittoRuntimeException.getClass().getName(), dittoRuntimeException.getMessage());
        Optional correlationId = dittoRuntimeException.getDittoHeaders().getCorrelationId();
        List<String> list = this.outstandingCommandCorrelationIds;
        list.getClass();
        correlationId.ifPresent((v1) -> {
            r1.remove(v1);
        });
        if (dittoRuntimeException.getDittoHeaders().isResponseRequired()) {
            actorRef.forward(dittoRuntimeException, getContext());
        } else {
            this.logger.debug("Requester did not require response (via DittoHeader '{}') - not sending one", DittoHeaderDefinition.RESPONSE_REQUIRED);
        }
    }

    public RequestStrategy requestStrategy() {
        return new MaxInFlightRequestStrategy(this.backpressureQueueSize) { // from class: org.eclipse.ditto.services.gateway.streaming.actors.CommandSubscriber.1
            public int inFlightInternally() {
                return CommandSubscriber.this.outstandingCommandCorrelationIds.size();
            }
        };
    }
}
