/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.ditto.services.gateway.streaming.actors;

import akka.actor.AbstractActor;
import akka.actor.Actor;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import akka.stream.actor.AbstractActorPublisherWithStash;
import akka.stream.actor.ActorPublisherMessage;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.services.gateway.streaming.CloseStreamExceptionally;
import org.eclipse.ditto.services.gateway.streaming.Connect;
import org.eclipse.ditto.services.gateway.streaming.actors.SessionedJsonifiable;
import org.eclipse.ditto.services.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.services.utils.akka.logging.DittoLoggerFactory;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.FiniteDuration;

public final class EventAndResponsePublisher
extends AbstractActorPublisherWithStash<SessionedJsonifiable> {
    private static final int MESSAGE_CONSUMPTION_CHECK_SECONDS = 2;
    private final DittoDiagnosticLoggingAdapter logger = DittoLoggerFactory.getDiagnosticLoggingAdapter((Actor)this);
    private final int backpressureBufferSize;
    private final List<SessionedJsonifiable> buffer = new ArrayList<SessionedJsonifiable>();
    private final AtomicBoolean currentlyInMessageConsumedCheck = new AtomicBoolean(false);

    private EventAndResponsePublisher(int backpressureBufferSize) {
        this.backpressureBufferSize = backpressureBufferSize;
    }

    public static Props props(int backpressureBufferSize) {
        return Props.create(EventAndResponsePublisher.class, (Object[])new Object[]{backpressureBufferSize});
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(Connect.class, connect -> {
            String connectionCorrelationId = connect.getConnectionCorrelationId();
            this.logger.withCorrelationId((CharSequence)connectionCorrelationId).debug("Established new connection: {}", (Object)connectionCorrelationId);
            this.getContext().become(this.connected(connectionCorrelationId));
        }).matchAny(any -> {
            this.logger.info("Got unknown message during init phase '{}' - stashing..", any);
            this.stash();
        }).build();
    }

    private AbstractActor.Receive connected(String connectionCorrelationId) {
        this.unstashAll();
        return ReceiveBuilder.create().match(SessionedJsonifiable.class, j -> this.buffer.size() >= this.backpressureBufferSize, this::handleBackpressureFor).match(SessionedJsonifiable.class, jsonifiable -> {
            if (this.buffer.isEmpty() && this.totalDemand() > 0L) {
                this.onNext(jsonifiable);
            } else {
                this.buffer.add((SessionedJsonifiable)jsonifiable);
                this.deliverBuf();
            }
        }).match(CloseStreamExceptionally.class, closeStreamExceptionally -> {
            DittoRuntimeException reason = closeStreamExceptionally.getReason();
            this.logger.withCorrelationId((CharSequence)closeStreamExceptionally.getConnectionCorrelationId()).info("Closing stream exceptionally because of <{}>.", (Object)reason);
            if (0L < this.totalDemand()) {
                this.onNext(SessionedJsonifiable.error(reason));
            }
            this.onErrorThenStop((Throwable)reason);
        }).match(ActorPublisherMessage.Request.class, request -> {
            this.logger.withCorrelationId((CharSequence)connectionCorrelationId).debug("Got new demand: {}", request);
            this.deliverBuf();
        }).match(ActorPublisherMessage.Cancel.class, cancel -> this.getContext().stop(this.getSelf())).matchAny(any -> this.logger.withCorrelationId((CharSequence)connectionCorrelationId).warning("Got unknown message during connected phase: '{}'", any)).build();
    }

    private void handleBackpressureFor(SessionedJsonifiable jsonifiable) {
        this.logger.setCorrelationId(jsonifiable.getDittoHeaders());
        if (this.currentlyInMessageConsumedCheck.compareAndSet(false, true)) {
            this.logger.warning("Backpressure - buffer of '{}' outstanding Events/CommandResponses is full, dropping '{}'", (Object)this.backpressureBufferSize, (Object)jsonifiable);
            long bufSize = this.buffer.size();
            AbstractActor.ActorContext context = this.getContext();
            context.system().scheduler().scheduleOnce(FiniteDuration.apply((long)2L, (TimeUnit)TimeUnit.SECONDS), () -> {
                if (bufSize == (long)this.buffer.size()) {
                    this.logger.warning("Terminating Publisher - did not to consume anything in the last '{}' seconds, buffer is still at '{}' outstanding messages", (Object)2, (Object)bufSize);
                    context.stop(this.getSelf());
                } else {
                    this.currentlyInMessageConsumedCheck.set(false);
                    this.logger.info("Outstanding messages were consumed, Publisher is not terminated");
                }
            }, (ExecutionContext)context.system().dispatcher());
        }
    }

    private void deliverBuf() {
        while (this.totalDemand() > 0L) {
            List<SessionedJsonifiable> took;
            if (this.totalDemand() <= Integer.MAX_VALUE) {
                took = this.buffer.subList(0, Math.min(this.buffer.size(), (int)this.totalDemand()));
                took.forEach(arg_0 -> ((EventAndResponsePublisher)this).onNext(arg_0));
                this.buffer.removeAll(took);
                break;
            }
            took = this.buffer.subList(0, Math.min(this.buffer.size(), Integer.MAX_VALUE));
            took.forEach(arg_0 -> ((EventAndResponsePublisher)this).onNext(arg_0));
            this.buffer.removeAll(took);
        }
    }
}

