/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.ditto.services.gateway.streaming.actors;

import akka.actor.AbstractActor;
import akka.actor.AbstractActorWithTimers;
import akka.actor.Actor;
import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.ActorRefFactory;
import akka.actor.OneForOneStrategy;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.japi.pf.DeciderBuilder;
import akka.japi.pf.ReceiveBuilder;
import akka.stream.ActorMaterializer;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigMergeable;
import java.time.Duration;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.stream.StreamSupport;
import org.eclipse.ditto.model.base.auth.AuthorizationContext;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.base.headers.WithDittoHeaders;
import org.eclipse.ditto.model.jwt.ImmutableJsonWebToken;
import org.eclipse.ditto.model.jwt.JsonWebToken;
import org.eclipse.ditto.protocoladapter.HeaderTranslator;
import org.eclipse.ditto.services.gateway.security.authentication.AuthenticationResult;
import org.eclipse.ditto.services.gateway.security.authentication.jwt.JwtAuthenticationFactory;
import org.eclipse.ditto.services.gateway.security.authentication.jwt.JwtAuthenticationResultProvider;
import org.eclipse.ditto.services.gateway.security.authentication.jwt.JwtValidator;
import org.eclipse.ditto.services.gateway.streaming.Connect;
import org.eclipse.ditto.services.gateway.streaming.InvalidJwt;
import org.eclipse.ditto.services.gateway.streaming.Jwt;
import org.eclipse.ditto.services.gateway.streaming.RefreshSession;
import org.eclipse.ditto.services.gateway.streaming.StartStreaming;
import org.eclipse.ditto.services.gateway.streaming.StopStreaming;
import org.eclipse.ditto.services.gateway.streaming.actors.StreamingSessionActor;
import org.eclipse.ditto.services.gateway.util.config.streaming.DefaultStreamingConfig;
import org.eclipse.ditto.services.gateway.util.config.streaming.StreamingConfig;
import org.eclipse.ditto.services.models.concierge.pubsub.DittoProtocolSub;
import org.eclipse.ditto.services.utils.akka.actors.ModifyConfigBehavior;
import org.eclipse.ditto.services.utils.akka.actors.RetrieveConfigBehavior;
import org.eclipse.ditto.services.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.services.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.services.utils.metrics.DittoMetrics;
import org.eclipse.ditto.services.utils.metrics.instruments.gauge.Gauge;
import org.eclipse.ditto.services.utils.search.SubscriptionManager;
import org.eclipse.ditto.signals.acks.base.Acknowledgement;
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.signals.commands.things.modify.ThingModifyCommand;
import org.eclipse.ditto.signals.commands.thingsearch.ThingSearchCommand;

public final class StreamingActor
extends AbstractActorWithTimers
implements RetrieveConfigBehavior,
ModifyConfigBehavior {
    public static final String ACTOR_NAME = "streaming";
    private final DittoProtocolSub dittoProtocolSub;
    private final ActorRef commandRouter;
    private final Gauge streamingSessionsCounter;
    private final JwtValidator jwtValidator;
    private final JwtAuthenticationResultProvider jwtAuthenticationResultProvider;
    private final Props subscriptionManagerProps;
    private final DittoDiagnosticLoggingAdapter logger = DittoLoggerFactory.getDiagnosticLoggingAdapter((Actor)this);
    private final HeaderTranslator headerTranslator;
    private StreamingConfig streamingConfig;
    private final SupervisorStrategy strategy = new OneForOneStrategy(true, DeciderBuilder.match(Throwable.class, e -> {
        this.logger.error(e, "Escalating above actor!");
        return SupervisorStrategy.escalate();
    }).matchAny(e -> {
        this.logger.error("Unknown message:'{}'! Escalating above actor!", e);
        return SupervisorStrategy.escalate();
    }).build());

    private StreamingActor(DittoProtocolSub dittoProtocolSub, ActorRef commandRouter, JwtAuthenticationFactory jwtAuthenticationFactory, StreamingConfig streamingConfig, HeaderTranslator headerTranslator, ActorRef pubSubMediator, ActorRef conciergeForwarder) {
        this.dittoProtocolSub = dittoProtocolSub;
        this.commandRouter = commandRouter;
        this.streamingConfig = streamingConfig;
        this.headerTranslator = headerTranslator;
        this.streamingSessionsCounter = DittoMetrics.gauge((String)"streaming_sessions_count");
        this.jwtValidator = jwtAuthenticationFactory.getJwtValidator();
        this.jwtAuthenticationResultProvider = jwtAuthenticationFactory.newJwtAuthenticationResultProvider();
        this.subscriptionManagerProps = SubscriptionManager.props((Duration)streamingConfig.getSearchIdleTimeout(), (ActorRef)pubSubMediator, (ActorRef)conciergeForwarder, (ActorMaterializer)ActorMaterializer.create((ActorRefFactory)this.getContext()));
        this.scheduleScrapeStreamSessionsCounter();
    }

    public static Props props(DittoProtocolSub dittoProtocolSub, ActorRef commandRouter, JwtAuthenticationFactory jwtAuthenticationFactory, StreamingConfig streamingConfig, HeaderTranslator headerTranslator, ActorRef pubSubMediator, ActorRef conciergeForwarder) {
        return Props.create(StreamingActor.class, (Object[])new Object[]{dittoProtocolSub, commandRouter, jwtAuthenticationFactory, streamingConfig, headerTranslator, pubSubMediator, conciergeForwarder});
    }

    public SupervisorStrategy supervisorStrategy() {
        return this.strategy;
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(Connect.class, connect -> {
            ActorRef eventAndResponsePublisher = connect.getEventAndResponsePublisher();
            eventAndResponsePublisher.forward(connect, (ActorContext)this.getContext());
            String connectionCorrelationId = connect.getConnectionCorrelationId();
            this.getContext().actorOf(StreamingSessionActor.props(connect, this.dittoProtocolSub, eventAndResponsePublisher, this.streamingConfig.getAcknowledgementConfig(), this.headerTranslator, this.subscriptionManagerProps), connectionCorrelationId);
        }).match(StartStreaming.class, startStreaming -> this.forwardToSessionActor(startStreaming.getConnectionCorrelationId(), startStreaming)).match(StopStreaming.class, stopStreaming -> this.forwardToSessionActor(stopStreaming.getConnectionCorrelationId(), stopStreaming)).match(Jwt.class, this::refreshWebSocketSession).build().orElse(this.retrieveConfigBehavior()).orElse(this.modifyConfigBehavior()).orElse(ReceiveBuilder.create().match(Acknowledgement.class, acknowledgement -> this.lookupSessionActor((WithDittoHeaders<?>)acknowledgement, sessionActor -> sessionActor.forward(acknowledgement, (ActorContext)this.getContext()))).match(Signal.class, this::handleSignal).matchEquals((Object)Control.RETRIEVE_WEBSOCKET_CONFIG, this::replyWebSocketConfig).matchEquals((Object)Control.SCRAPE_STREAM_COUNTER, this::updateStreamingSessionsCounter).match(DittoRuntimeException.class, cre -> {
            Optional originOpt = cre.getDittoHeaders().getOrigin();
            if (originOpt.isPresent()) {
                this.forwardToSessionActor((CharSequence)originOpt.get(), cre);
            } else {
                this.logger.withCorrelationId((WithDittoHeaders)cre).warning("Unhandled DittoRuntimeException: <{}: {}>", (Object)cre.getClass().getSimpleName(), (Object)cre.getMessage());
            }
        }).matchAny(any -> this.logger.warning("Got unknown message: '{}'", any)).build());
    }

    public Config getConfig() {
        return this.streamingConfig.render().getConfig(ACTOR_NAME);
    }

    public Config setConfig(Config config) {
        this.streamingConfig = DefaultStreamingConfig.of((Config)config.atKey(ACTOR_NAME).withFallback((ConfigMergeable)this.streamingConfig.render()));
        this.scheduleScrapeStreamSessionsCounter();
        return this.streamingConfig.render();
    }

    private void refreshWebSocketSession(Jwt jwt) {
        String connectionCorrelationId = jwt.getConnectionCorrelationId();
        JsonWebToken jsonWebToken = ImmutableJsonWebToken.fromToken((String)jwt.toString());
        this.jwtValidator.validate(jsonWebToken).thenAccept(binaryValidationResult -> {
            if (binaryValidationResult.isValid()) {
                try {
                    AuthenticationResult authorizationResult = this.jwtAuthenticationResultProvider.getAuthenticationResult(jsonWebToken, DittoHeaders.empty());
                    AuthorizationContext authorizationContext = authorizationResult.getDittoHeaders().getAuthorizationContext();
                    this.forwardToSessionActor(connectionCorrelationId, new RefreshSession(connectionCorrelationId, jsonWebToken.getExpirationTime(), authorizationContext));
                }
                catch (Exception exception) {
                    this.logger.info("Got exception when handling refreshed JWT for WebSocket session <{}>: {}", (Object)connectionCorrelationId, (Object)exception.getMessage());
                    this.forwardToSessionActor(connectionCorrelationId, InvalidJwt.getInstance());
                }
            } else {
                this.forwardToSessionActor(connectionCorrelationId, InvalidJwt.getInstance());
            }
        });
    }

    private void forwardToSessionActor(CharSequence connectionCorrelationId, Object object) {
        if (object instanceof WithDittoHeaders) {
            this.logger.setCorrelationId((WithDittoHeaders)object);
        }
        this.logger.debug("Forwarding to session actor '{}': {}", (Object)connectionCorrelationId, object);
        this.logger.discardCorrelationId();
        this.getContext().actorSelection(connectionCorrelationId.toString()).forward(object, (ActorContext)this.getContext());
    }

    private void handleSignal(Signal<?> signal) {
        if (signal.getDittoHeaders().isResponseRequired()) {
            this.lookupSessionActor((WithDittoHeaders<?>)signal, sessionActor -> {
                if (signal instanceof ThingModifyCommand || signal instanceof ThingSearchCommand) {
                    sessionActor.tell((Object)signal, this.getSelf());
                }
                this.commandRouter.tell((Object)signal, sessionActor);
            });
        } else {
            this.commandRouter.tell(signal, ActorRef.noSender());
        }
    }

    private void lookupSessionActor(WithDittoHeaders<?> withHeaders, Consumer<ActorRef> sessionActorCon) {
        DittoHeaders dittoHeaders = withHeaders.getDittoHeaders();
        Optional originOpt = dittoHeaders.getOrigin();
        if (originOpt.isPresent()) {
            String origin = (String)originOpt.get();
            Optional sessionActor = this.getContext().findChild(origin);
            if (sessionActor.isPresent()) {
                sessionActorCon.accept((ActorRef)sessionActor.get());
            } else {
                this.logger.withCorrelationId(dittoHeaders).error("No session actor found for origin <{}>", (Object)origin);
            }
        } else {
            this.logger.withCorrelationId(dittoHeaders).error("No origin header present for WithDittoHeaders <{}>", withHeaders);
        }
    }

    private void scheduleScrapeStreamSessionsCounter() {
        this.getTimers().startPeriodicTimer((Object)Control.SCRAPE_STREAM_COUNTER, (Object)Control.SCRAPE_STREAM_COUNTER, this.streamingConfig.getSessionCounterScrapeInterval());
    }

    private void replyWebSocketConfig(Control trigger) {
        this.getSender().tell((Object)this.streamingConfig.getWebsocketConfig(), this.getSelf());
    }

    private void updateStreamingSessionsCounter(Control trigger) {
        if (this.getContext() != null) {
            this.streamingSessionsCounter.set(Long.valueOf(StreamSupport.stream(this.getContext().getChildren().spliterator(), false).count()));
        }
    }

    public static enum Control {
        SCRAPE_STREAM_COUNTER,
        RETRIEVE_WEBSOCKET_CONFIG;

    }
}

