/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.ditto.services.gateway.streaming.actors;

import akka.actor.AbstractActor;
import akka.actor.Actor;
import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.event.DiagnosticLoggingAdapter;
import akka.japi.Creator;
import akka.japi.pf.ReceiveBuilder;
import akka.stream.actor.AbstractActorSubscriber;
import akka.stream.actor.ActorSubscriberMessage;
import akka.stream.actor.MaxInFlightRequestStrategy;
import akka.stream.actor.RequestStrategy;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.eclipse.ditto.model.base.exceptions.DittoJsonException;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.headers.DittoHeaderDefinition;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.signals.commands.base.CommandResponse;

public final class CommandSubscriber
extends AbstractActorSubscriber {
    private final DiagnosticLoggingAdapter logger = LogUtil.obtain((Actor)this);
    private final ActorRef delegateActor;
    private final int backpressureQueueSize;
    private final List<String> outstandingCommandCorrelationIds = new ArrayList<String>();

    private CommandSubscriber(ActorRef delegateActor, int backpressureQueueSize) {
        this.delegateActor = delegateActor;
        this.backpressureQueueSize = backpressureQueueSize;
    }

    public static Props props(final ActorRef delegateActor, final int backpressureQueueSize) {
        return Props.create(CommandSubscriber.class, (Creator)new Creator<CommandSubscriber>(){
            private static final long serialVersionUID = 1L;

            public CommandSubscriber create() throws Exception {
                return new CommandSubscriber(delegateActor, backpressureQueueSize);
            }
        });
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(ActorSubscriberMessage.OnNext.class, on -> on.element() instanceof Signal, onNext -> {
            Signal signal = (Signal)onNext.element();
            Optional correlationIdOpt = signal.getDittoHeaders().getCorrelationId();
            if (correlationIdOpt.isPresent()) {
                String correlationId = (String)correlationIdOpt.get();
                LogUtil.enhanceLogWithCorrelationId((DiagnosticLoggingAdapter)this.logger, (String)correlationId);
                this.outstandingCommandCorrelationIds.add(correlationId);
                if (this.outstandingCommandCorrelationIds.size() > this.backpressureQueueSize) {
                    throw new IllegalStateException("queued too many: " + this.outstandingCommandCorrelationIds.size() + " - backpressureQueueSize is: " + this.backpressureQueueSize);
                }
                this.logger.debug("Got new Signal <{}>, currently outstanding are <{}>", (Object)signal.getType(), (Object)this.outstandingCommandCorrelationIds.size());
                this.delegateActor.tell((Object)signal, this.getSelf());
            } else {
                this.logger.warning("Got a Signal <{}> without correlationId, NOT accepting/forwarding it: {}", (Object)signal.getType(), (Object)signal);
            }
        }).match(CommandResponse.class, response -> {
            LogUtil.enhanceLogWithCorrelationId((DiagnosticLoggingAdapter)this.logger, (Optional)response.getDittoHeaders().getCorrelationId());
            response.getDittoHeaders().getCorrelationId().ifPresent(this.outstandingCommandCorrelationIds::remove);
            if (response.getDittoHeaders().isResponseRequired()) {
                this.delegateActor.forward(response, (ActorContext)this.getContext());
            } else {
                this.logger.debug("Requester did not require response (via DittoHeader '{}') - not sending one", (Object)DittoHeaderDefinition.RESPONSE_REQUIRED);
            }
        }).match(DittoRuntimeException.class, cre -> this.handleDittoRuntimeException(this.delegateActor, (DittoRuntimeException)cre)).match(RuntimeException.class, jre -> this.handleDittoRuntimeException(this.delegateActor, (DittoRuntimeException)new DittoJsonException(jre))).match(ActorSubscriberMessage.OnNext.class, onComplete -> this.logger.warning("Got unknown element in 'OnNext'")).matchEquals((Object)ActorSubscriberMessage.onCompleteInstance(), onComplete -> {
            this.logger.info("Stream completed, stopping myself..");
            this.getContext().stop(this.getSelf());
        }).match(ActorSubscriberMessage.OnError.class, onError -> {
            Throwable cause = onError.cause();
            if (cause instanceof DittoRuntimeException) {
                this.handleDittoRuntimeException(this.delegateActor, (DittoRuntimeException)cause);
            } else if (cause instanceof RuntimeException) {
                this.handleDittoRuntimeException(this.delegateActor, (DittoRuntimeException)new DittoJsonException((RuntimeException)cause));
            } else {
                this.logger.warning("Got 'OnError': {} {}", (Object)cause.getClass().getName(), (Object)cause.getMessage());
            }
        }).matchAny(any -> this.logger.warning("Got unknown message '{}'", any)).build();
    }

    private void handleDittoRuntimeException(ActorRef streamingSessionActor, DittoRuntimeException cre) {
        LogUtil.enhanceLogWithCorrelationId((DiagnosticLoggingAdapter)this.logger, (Optional)cre.getDittoHeaders().getCorrelationId());
        this.logger.info("Got 'DittoRuntimeException': {} {}", (Object)cre.getClass().getName(), (Object)cre.getMessage());
        cre.getDittoHeaders().getCorrelationId().ifPresent(this.outstandingCommandCorrelationIds::remove);
        if (cre.getDittoHeaders().isResponseRequired()) {
            streamingSessionActor.forward((Object)cre, (ActorContext)this.getContext());
        } else {
            this.logger.debug("Requester did not require response (via DittoHeader '{}') - not sending one", (Object)DittoHeaderDefinition.RESPONSE_REQUIRED);
        }
    }

    public RequestStrategy requestStrategy() {
        return new MaxInFlightRequestStrategy(this.backpressureQueueSize){

            public int inFlightInternally() {
                return CommandSubscriber.this.outstandingCommandCorrelationIds.size();
            }
        };
    }
}

