package org.eclipse.ditto.services.gateway.streaming.actors;

import akka.actor.AbstractActor;
import akka.actor.AbstractActorWithTimers;
import akka.actor.ActorRef;
import akka.actor.OneForOneStrategy;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.japi.pf.DeciderBuilder;
import akka.japi.pf.ReceiveBuilder;
import akka.stream.ActorMaterializer;
import com.typesafe.config.Config;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.stream.StreamSupport;
import org.eclipse.ditto.protocoladapter.HeaderTranslator;
import org.eclipse.ditto.services.gateway.security.authentication.jwt.JwtAuthenticationFactory;
import org.eclipse.ditto.services.gateway.security.authentication.jwt.JwtAuthenticationResultProvider;
import org.eclipse.ditto.services.gateway.security.authentication.jwt.JwtValidator;
import org.eclipse.ditto.services.gateway.streaming.Connect;
import org.eclipse.ditto.services.gateway.util.config.streaming.DefaultStreamingConfig;
import org.eclipse.ditto.services.gateway.util.config.streaming.StreamingConfig;
import org.eclipse.ditto.services.models.concierge.pubsub.DittoProtocolSub;
import org.eclipse.ditto.services.utils.akka.actors.ModifyConfigBehavior;
import org.eclipse.ditto.services.utils.akka.actors.RetrieveConfigBehavior;
import org.eclipse.ditto.services.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.services.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.services.utils.metrics.DittoMetrics;
import org.eclipse.ditto.services.utils.metrics.instruments.gauge.Gauge;
import org.eclipse.ditto.services.utils.search.SubscriptionManager;

/* loaded from: input_file:org/eclipse/ditto/services/gateway/streaming/actors/StreamingActor.class */
public final class StreamingActor extends AbstractActorWithTimers implements RetrieveConfigBehavior, ModifyConfigBehavior {
    public static final String ACTOR_NAME = "streaming";
    private final DittoProtocolSub dittoProtocolSub;
    private final ActorRef commandRouter;
    private final JwtValidator jwtValidator;
    private final JwtAuthenticationResultProvider jwtAuthenticationResultProvider;
    private final Props subscriptionManagerProps;
    private final HeaderTranslator headerTranslator;
    private StreamingConfig streamingConfig;
    private final DittoDiagnosticLoggingAdapter logger = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
    private int childCounter = -1;
    private final SupervisorStrategy strategy = new OneForOneStrategy(true, DeciderBuilder.match(Throwable.class, th -> {
        this.logger.error(th, "Escalating above actor!");
        return SupervisorStrategy.escalate();
    }).matchAny(th2 -> {
        this.logger.error("Unknown message:'{}'! Escalating above actor!", th2);
        return SupervisorStrategy.escalate();
    }).build());
    private final Gauge streamingSessionsCounter = DittoMetrics.gauge("streaming_sessions_count");

    /* loaded from: input_file:org/eclipse/ditto/services/gateway/streaming/actors/StreamingActor$Control.class */
    public enum Control {
        SCRAPE_STREAM_COUNTER,
        RETRIEVE_WEBSOCKET_CONFIG
    }

    private StreamingActor(DittoProtocolSub dittoProtocolSub, ActorRef actorRef, JwtAuthenticationFactory jwtAuthenticationFactory, StreamingConfig streamingConfig, HeaderTranslator headerTranslator, ActorRef actorRef2, ActorRef actorRef3) {
        this.dittoProtocolSub = dittoProtocolSub;
        this.commandRouter = actorRef;
        this.streamingConfig = streamingConfig;
        this.headerTranslator = headerTranslator;
        this.jwtValidator = jwtAuthenticationFactory.getJwtValidator();
        this.jwtAuthenticationResultProvider = jwtAuthenticationFactory.newJwtAuthenticationResultProvider();
        this.subscriptionManagerProps = SubscriptionManager.props(streamingConfig.getSearchIdleTimeout(), actorRef2, actorRef3, ActorMaterializer.create(getContext()));
        scheduleScrapeStreamSessionsCounter();
    }

    public static Props props(DittoProtocolSub dittoProtocolSub, ActorRef actorRef, JwtAuthenticationFactory jwtAuthenticationFactory, StreamingConfig streamingConfig, HeaderTranslator headerTranslator, ActorRef actorRef2, ActorRef actorRef3) {
        return Props.create(StreamingActor.class, new Object[]{dittoProtocolSub, actorRef, jwtAuthenticationFactory, streamingConfig, headerTranslator, actorRef2, actorRef3});
    }

    public SupervisorStrategy supervisorStrategy() {
        return this.strategy;
    }

    public AbstractActor.Receive createReceive() {
        return retrieveConfigBehavior().orElse(modifyConfigBehavior()).orElse(createConnectAndMetricsBehavior()).orElse(ReceiveBuilder.create().matchAny(obj -> {
            this.logger.warning("Got unknown message: '{}'", obj);
        }).build());
    }

    private AbstractActor.Receive createConnectAndMetricsBehavior() {
        return ReceiveBuilder.create().match(Connect.class, connect -> {
            connect.getEventAndResponsePublisher().forward(connect, getContext());
            getSender().tell(getContext().actorOf(StreamingSessionActor.props(connect, this.dittoProtocolSub, this.commandRouter, this.streamingConfig.getAcknowledgementConfig(), this.headerTranslator, this.subscriptionManagerProps, this.jwtValidator, this.jwtAuthenticationResultProvider), getUniqueChildActorName(connect.getConnectionCorrelationId())), ActorRef.noSender());
        }).matchEquals(Control.RETRIEVE_WEBSOCKET_CONFIG, this::replyWebSocketConfig).matchEquals(Control.SCRAPE_STREAM_COUNTER, this::updateStreamingSessionsCounter).build();
    }

    public Config getConfig() {
        return this.streamingConfig.render().getConfig(ACTOR_NAME);
    }

    public Config setConfig(Config config) {
        this.streamingConfig = DefaultStreamingConfig.of(config.atKey(ACTOR_NAME).withFallback(this.streamingConfig.render()));
        scheduleScrapeStreamSessionsCounter();
        return this.streamingConfig.render();
    }

    private String getUniqueChildActorName(String str) {
        int i = this.childCounter + 1;
        this.childCounter = i;
        return String.format("%x-%s", Integer.valueOf(i), URLEncoder.encode(str, StandardCharsets.UTF_8));
    }

    private void scheduleScrapeStreamSessionsCounter() {
        getTimers().startPeriodicTimer(Control.SCRAPE_STREAM_COUNTER, Control.SCRAPE_STREAM_COUNTER, this.streamingConfig.getSessionCounterScrapeInterval());
    }

    private void replyWebSocketConfig(Control control) {
        getSender().tell(this.streamingConfig.getWebsocketConfig(), getSelf());
    }

    private void updateStreamingSessionsCounter(Control control) {
        if (getContext() != null) {
            this.streamingSessionsCounter.set(Long.valueOf(StreamSupport.stream(getContext().getChildren().spliterator(), false).count()));
        }
    }
}
