package org.eclipse.ditto.services.gateway.streaming.actors;

import akka.actor.AbstractActor;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import akka.stream.actor.AbstractActorPublisherWithStash;
import akka.stream.actor.ActorPublisherMessage;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.services.gateway.streaming.CloseStreamExceptionally;
import org.eclipse.ditto.services.gateway.streaming.Connect;
import org.eclipse.ditto.services.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.services.utils.akka.logging.DittoLoggerFactory;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/eclipse/ditto/services/gateway/streaming/actors/EventAndResponsePublisher.class */
public final class EventAndResponsePublisher extends AbstractActorPublisherWithStash<SessionedJsonifiable> {
    private static final int MESSAGE_CONSUMPTION_CHECK_SECONDS = 2;
    private final int backpressureBufferSize;
    private final DittoDiagnosticLoggingAdapter logger = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
    private final List<SessionedJsonifiable> buffer = new ArrayList();
    private final AtomicBoolean currentlyInMessageConsumedCheck = new AtomicBoolean(false);

    private EventAndResponsePublisher(int i) {
        this.backpressureBufferSize = i;
    }

    public static Props props(int i) {
        return Props.create(EventAndResponsePublisher.class, new Object[]{Integer.valueOf(i)});
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(Connect.class, connect -> {
            String connectionCorrelationId = connect.getConnectionCorrelationId();
            this.logger.withCorrelationId(connectionCorrelationId).debug("Established new connection: {}", connectionCorrelationId);
            getContext().become(connected(connectionCorrelationId));
        }).matchAny(obj -> {
            this.logger.info("Got unknown message during init phase '{}' - stashing..", obj);
            stash();
        }).build();
    }

    private AbstractActor.Receive connected(String str) {
        unstashAll();
        return ReceiveBuilder.create().match(SessionedJsonifiable.class, sessionedJsonifiable -> {
            return this.buffer.size() >= this.backpressureBufferSize;
        }, this::handleBackpressureFor).match(SessionedJsonifiable.class, sessionedJsonifiable2 -> {
            if (this.buffer.isEmpty() && totalDemand() > 0) {
                onNext(sessionedJsonifiable2);
            } else {
                this.buffer.add(sessionedJsonifiable2);
                deliverBuf();
            }
        }).match(CloseStreamExceptionally.class, closeStreamExceptionally -> {
            DittoRuntimeException reason = closeStreamExceptionally.getReason();
            this.logger.withCorrelationId(closeStreamExceptionally.getConnectionCorrelationId()).info("Closing stream exceptionally because of <{}>.", reason);
            if (0 < totalDemand()) {
                onNext(SessionedJsonifiable.error(reason));
            }
            onErrorThenStop(reason);
        }).match(ActorPublisherMessage.Request.class, request -> {
            this.logger.withCorrelationId(str).debug("Got new demand: {}", request);
            deliverBuf();
        }).match(ActorPublisherMessage.Cancel.class, cancel -> {
            getContext().stop(getSelf());
        }).matchAny(obj -> {
            this.logger.withCorrelationId(str).warning("Got unknown message during connected phase: '{}'", obj);
        }).build();
    }

    private void handleBackpressureFor(SessionedJsonifiable sessionedJsonifiable) {
        this.logger.setCorrelationId(sessionedJsonifiable.getDittoHeaders());
        if (this.currentlyInMessageConsumedCheck.compareAndSet(false, true)) {
            this.logger.warning("Backpressure - buffer of '{}' outstanding Events/CommandResponses is full, dropping '{}'", Integer.valueOf(this.backpressureBufferSize), sessionedJsonifiable);
            long size = this.buffer.size();
            AbstractActor.ActorContext context = getContext();
            context.system().scheduler().scheduleOnce(FiniteDuration.apply(2L, TimeUnit.SECONDS), () -> {
                if (size == this.buffer.size()) {
                    this.logger.warning("Terminating Publisher - did not to consume anything in the last '{}' seconds, buffer is still at '{}' outstanding messages", Integer.valueOf(MESSAGE_CONSUMPTION_CHECK_SECONDS), Long.valueOf(size));
                    context.stop(getSelf());
                } else {
                    this.currentlyInMessageConsumedCheck.set(false);
                    this.logger.info("Outstanding messages were consumed, Publisher is not terminated");
                }
            }, context.system().dispatcher());
        }
    }

    private void deliverBuf() {
        while (totalDemand() > 0) {
            if (totalDemand() <= 2147483647L) {
                List<SessionedJsonifiable> subList = this.buffer.subList(0, Math.min(this.buffer.size(), (int) totalDemand()));
                subList.forEach((v1) -> {
                    onNext(v1);
                });
                this.buffer.removeAll(subList);
                return;
            } else {
                List<SessionedJsonifiable> subList2 = this.buffer.subList(0, Math.min(this.buffer.size(), Integer.MAX_VALUE));
                subList2.forEach((v1) -> {
                    onNext(v1);
                });
                this.buffer.removeAll(subList2);
            }
        }
    }
}
