/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.ditto.services.gateway.streaming.actors;

import akka.actor.AbstractActor;
import akka.actor.Actor;
import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.Cancellable;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.event.DiagnosticLoggingAdapter;
import akka.japi.pf.ReceiveBuilder;
import java.time.Instant;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.eclipse.ditto.model.base.entity.id.EntityId;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.base.headers.WithDittoHeaders;
import org.eclipse.ditto.model.namespaces.NamespaceReader;
import org.eclipse.ditto.model.query.criteria.Criteria;
import org.eclipse.ditto.model.query.criteria.CriteriaFactory;
import org.eclipse.ditto.model.query.criteria.CriteriaFactoryImpl;
import org.eclipse.ditto.model.query.expression.ThingsFieldExpressionFactory;
import org.eclipse.ditto.model.query.filter.QueryFilterCriteriaFactory;
import org.eclipse.ditto.model.query.things.ModelBasedThingsFieldExpressionFactory;
import org.eclipse.ditto.model.query.things.ThingPredicateVisitor;
import org.eclipse.ditto.model.things.Thing;
import org.eclipse.ditto.protocoladapter.TopicPath;
import org.eclipse.ditto.services.gateway.streaming.InvalidJwtToken;
import org.eclipse.ditto.services.gateway.streaming.RefreshSession;
import org.eclipse.ditto.services.gateway.streaming.StartStreaming;
import org.eclipse.ditto.services.gateway.streaming.StopStreaming;
import org.eclipse.ditto.services.gateway.streaming.StreamingAck;
import org.eclipse.ditto.services.models.concierge.pubsub.DittoProtocolSub;
import org.eclipse.ditto.services.models.concierge.streaming.StreamingType;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.signals.base.WithId;
import org.eclipse.ditto.signals.commands.base.CommandResponse;
import org.eclipse.ditto.signals.commands.base.exceptions.GatewayWebsocketSessionClosedException;
import org.eclipse.ditto.signals.commands.base.exceptions.GatewayWebsocketSessionExpiredException;
import org.eclipse.ditto.signals.commands.messages.MessageCommand;
import org.eclipse.ditto.signals.events.base.Event;
import org.eclipse.ditto.signals.events.things.ThingEvent;
import org.eclipse.ditto.signals.events.things.ThingEventToThingConverter;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.FiniteDuration;

final class StreamingSessionActor
extends AbstractActor {
    private final DiagnosticLoggingAdapter logger = LogUtil.obtain((Actor)this);
    private final String connectionCorrelationId;
    private final String type;
    private final DittoProtocolSub dittoProtocolSub;
    private final ActorRef eventAndResponsePublisher;
    private final Set<StreamingType> outstandingSubscriptionAcks;
    private Cancellable sessionTerminationCancellable;
    private List<String> authorizationSubjects;
    private final Map<StreamingType, List<String>> namespacesForStreamingTypes;
    private final Map<StreamingType, Criteria> eventFilterCriteriaForStreamingTypes;

    private StreamingSessionActor(String connectionCorrelationId, String type, DittoProtocolSub dittoProtocolSub, ActorRef eventAndResponsePublisher, Instant sessionExpirationTime) {
        this.connectionCorrelationId = connectionCorrelationId;
        this.type = type;
        this.dittoProtocolSub = dittoProtocolSub;
        this.eventAndResponsePublisher = eventAndResponsePublisher;
        this.outstandingSubscriptionAcks = new HashSet<StreamingType>();
        this.authorizationSubjects = Collections.emptyList();
        this.namespacesForStreamingTypes = new EnumMap<StreamingType, List<String>>(StreamingType.class);
        this.eventFilterCriteriaForStreamingTypes = new EnumMap<StreamingType, Criteria>(StreamingType.class);
        this.getContext().watch(eventAndResponsePublisher);
        this.sessionTerminationCancellable = sessionExpirationTime != null ? this.startSessionTimeout(sessionExpirationTime) : null;
    }

    static Props props(String connectionCorrelationId, String type, DittoProtocolSub dittoProtocolSub, ActorRef eventAndResponsePublisher, Instant sessionExpirationTime) {
        return Props.create(StreamingSessionActor.class, (Object[])new Object[]{connectionCorrelationId, type, dittoProtocolSub, eventAndResponsePublisher, sessionExpirationTime});
    }

    public void postStop() {
        LogUtil.enhanceLogWithCorrelationId((DiagnosticLoggingAdapter)this.logger, (String)this.connectionCorrelationId, (LogUtil.MdcField[])new LogUtil.MdcField[0]);
        this.sessionTerminationCancellable.cancel();
        this.logger.info("Closing '{}' streaming session: {}", (Object)this.type, (Object)this.connectionCorrelationId);
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(CommandResponse.class, response -> {
            LogUtil.enhanceLogWithCorrelationId((DiagnosticLoggingAdapter)this.logger, (WithDittoHeaders)response, (LogUtil.MdcField[])new LogUtil.MdcField[0]);
            this.logger.debug("Got 'CommandResponse' message in <{}> session, telling eventAndResponsePublisher about it: {}", (Object)this.type, response);
            this.eventAndResponsePublisher.forward(response, (ActorContext)this.getContext());
        }).match(Signal.class, this::handleSignal).match(DittoRuntimeException.class, cre -> {
            LogUtil.enhanceLogWithCorrelationId((DiagnosticLoggingAdapter)this.logger, (WithDittoHeaders)cre, (LogUtil.MdcField[])new LogUtil.MdcField[0]);
            this.logger.info("Got 'DittoRuntimeException' message in <{}> session, telling eventAndResponsePublisher about it: {}", (Object)this.type, (Object)cre);
            this.eventAndResponsePublisher.forward((Object)cre, (ActorContext)this.getContext());
        }).match(StartStreaming.class, startStreaming -> {
            this.authorizationSubjects = startStreaming.getAuthorizationContext().getAuthorizationSubjectIds();
            this.namespacesForStreamingTypes.put(startStreaming.getStreamingType(), startStreaming.getNamespaces());
            LogUtil.enhanceLogWithCorrelationId((DiagnosticLoggingAdapter)this.logger, (String)this.connectionCorrelationId, (LogUtil.MdcField[])new LogUtil.MdcField[0]);
            try {
                this.eventFilterCriteriaForStreamingTypes.put(startStreaming.getStreamingType(), startStreaming.getFilter().map(f -> this.parseCriteria((String)f, DittoHeaders.newBuilder().correlationId((CharSequence)startStreaming.getConnectionCorrelationId()).build())).orElse(null));
            }
            catch (DittoRuntimeException e) {
                this.logger.info("Got 'DittoRuntimeException' <{}> session during 'StartStreaming' processing: {}: <{}>", (Object)this.type, (Object)((Object)((Object)e)).getClass().getSimpleName(), (Object)e.getMessage());
                this.eventAndResponsePublisher.tell((Object)e, this.getSelf());
                return;
            }
            this.logger.debug("Got 'StartStreaming' message in <{}> session, subscribing for <{}> in Cluster..", (Object)this.type, (Object)startStreaming.getStreamingType().name());
            this.outstandingSubscriptionAcks.add(startStreaming.getStreamingType());
            AcknowledgeSubscription subscribeAck = new AcknowledgeSubscription(startStreaming.getStreamingType());
            Set<StreamingType> currentStreamingTypes = this.namespacesForStreamingTypes.keySet();
            this.dittoProtocolSub.subscribe(currentStreamingTypes, this.authorizationSubjects, this.getSelf()).thenAccept(ack -> this.getSelf().tell((Object)subscribeAck, this.getSelf()));
        }).match(StopStreaming.class, stopStreaming -> {
            LogUtil.enhanceLogWithCorrelationId((DiagnosticLoggingAdapter)this.logger, (String)this.connectionCorrelationId, (LogUtil.MdcField[])new LogUtil.MdcField[0]);
            this.logger.debug("Got 'StopStreaming' message in <{}> session, unsubscribing from <{}> in Cluster..", (Object)this.type, (Object)stopStreaming.getStreamingType().name());
            this.namespacesForStreamingTypes.remove(stopStreaming.getStreamingType());
            this.eventFilterCriteriaForStreamingTypes.remove(stopStreaming.getStreamingType());
            AcknowledgeUnsubscription unsubscribeAck = new AcknowledgeUnsubscription(stopStreaming.getStreamingType());
            Set<StreamingType> currentStreamingTypes = this.namespacesForStreamingTypes.keySet();
            if (stopStreaming.getStreamingType() != StreamingType.EVENTS) {
                this.dittoProtocolSub.updateLiveSubscriptions(currentStreamingTypes, this.authorizationSubjects, this.getSelf()).thenAccept(ack -> this.getSelf().tell((Object)unsubscribeAck, this.getSelf()));
            } else {
                this.dittoProtocolSub.removeTwinSubscriber(this.getSelf(), this.authorizationSubjects).thenAccept(ack -> this.getSelf().tell((Object)unsubscribeAck, this.getSelf()));
            }
        }).match(RefreshSession.class, refreshSession -> {
            this.sessionTerminationCancellable.cancel();
            this.checkAuthorizationContextAndStartSessionTimer((RefreshSession)refreshSession);
        }).match(InvalidJwtToken.class, invalidJwtToken -> this.sessionTerminationCancellable.cancel()).match(AcknowledgeSubscription.class, msg -> this.acknowledgeSubscription(msg.getStreamingType(), this.getSelf())).match(AcknowledgeUnsubscription.class, msg -> this.acknowledgeUnsubscription(msg.getStreamingType(), this.getSelf())).match(Terminated.class, terminated -> {
            LogUtil.enhanceLogWithCorrelationId((DiagnosticLoggingAdapter)this.logger, (String)this.connectionCorrelationId, (LogUtil.MdcField[])new LogUtil.MdcField[0]);
            this.logger.debug("eventAndResponsePublisher was terminated");
            this.logger.info("<{}> connection was closed, unsubscribing from Streams in Cluster..", (Object)this.type);
            this.dittoProtocolSub.removeSubscriber(this.getSelf());
            this.getContext().getSystem().scheduler().scheduleOnce(FiniteDuration.apply((long)1L, (TimeUnit)TimeUnit.SECONDS), this.getSelf(), (Object)PoisonPill.getInstance(), (ExecutionContext)this.getContext().dispatcher(), this.getSelf());
        }).matchAny(any -> {
            LogUtil.enhanceLogWithCorrelationId((DiagnosticLoggingAdapter)this.logger, (String)this.connectionCorrelationId, (LogUtil.MdcField[])new LogUtil.MdcField[0]);
            this.logger.warning("Got unknown message in '{}' session: '{}'", (Object)this.type, any);
        }).build();
    }

    private void handleSignal(Signal<?> signal) {
        LogUtil.enhanceLogWithCorrelationId((DiagnosticLoggingAdapter)this.logger, signal, (LogUtil.MdcField[])new LogUtil.MdcField[0]);
        DittoHeaders dittoHeaders = signal.getDittoHeaders();
        if (this.connectionCorrelationId.equals(dittoHeaders.getOrigin().orElse(null))) {
            this.logger.debug("Got Signal <{}> in <{}> session, but this was issued by this connection itself, not telling eventAndResponsePublisher about it", (Object)signal.getType(), (Object)this.type);
        } else if (this.authorizationSubjects != null && !Collections.disjoint(dittoHeaders.getReadSubjects(), this.authorizationSubjects)) {
            if (this.matchesNamespaces(signal)) {
                if (this.matchesFilter(signal)) {
                    this.logger.debug("Got Signal <{}> in <{}> session, telling eventAndResponsePublisher about it: {}", (Object)signal.getType(), (Object)this.type, signal);
                    this.eventAndResponsePublisher.tell(signal, this.getSelf());
                } else {
                    this.logger.debug("Signal does not match filter");
                }
            } else {
                this.logger.debug("Signal does not match namespaces");
            }
        }
    }

    private Cancellable startSessionTimeout(Instant sessionExpirationTime) {
        long timeout = sessionExpirationTime.minusMillis(Instant.now().toEpochMilli()).toEpochMilli();
        this.logger.debug("Starting session timeout - session will expire in {} ms", (Object)timeout);
        FiniteDuration sessionExpirationTimeMillis = FiniteDuration.apply((long)timeout, (TimeUnit)TimeUnit.MILLISECONDS);
        return this.getContext().getSystem().getScheduler().scheduleOnce(sessionExpirationTimeMillis, this::handleSessionTimeout, (ExecutionContext)this.getContext().getDispatcher());
    }

    private void handleSessionTimeout() {
        this.logger.info("Stopping websocket session for connection with id: {}", (Object)this.connectionCorrelationId);
        this.eventAndResponsePublisher.tell((Object)GatewayWebsocketSessionExpiredException.newBuilder().dittoHeaders(DittoHeaders.newBuilder().correlationId((CharSequence)this.connectionCorrelationId).build()).build(), this.getSelf());
    }

    private void checkAuthorizationContextAndStartSessionTimer(RefreshSession refreshSession) {
        List newAuthorizationSubjects = refreshSession.getAuthorizationContext().getAuthorizationSubjectIds();
        if (!this.authorizationSubjects.equals(newAuthorizationSubjects)) {
            this.logger.debug("Authorization Context changed for websocket session: <{}> - terminating the session", (Object)this.connectionCorrelationId);
            this.eventAndResponsePublisher.tell((Object)GatewayWebsocketSessionClosedException.newBuilder().dittoHeaders(DittoHeaders.newBuilder().correlationId((CharSequence)this.connectionCorrelationId).build()).build(), this.getSelf());
        } else {
            this.sessionTerminationCancellable = this.startSessionTimeout(refreshSession.getSessionTimeout());
        }
    }

    private boolean matchesNamespaces(Signal<?> signal) {
        StreamingType streamingType = StreamingSessionActor.determineStreamingType(signal);
        List namespaces = Optional.ofNullable(this.namespacesForStreamingTypes.get(streamingType)).orElse(Collections.emptyList());
        return namespaces.isEmpty() || namespaces.contains(StreamingSessionActor.namespaceFromId(signal));
    }

    private static StreamingType determineStreamingType(Signal<?> signal) {
        String channel = signal.getDittoHeaders().getChannel().orElse(TopicPath.Channel.TWIN.getName());
        StreamingType streamingType = signal instanceof Event ? (channel.equals(TopicPath.Channel.TWIN.getName()) ? StreamingType.EVENTS : StreamingType.LIVE_EVENTS) : (signal instanceof MessageCommand ? StreamingType.MESSAGES : StreamingType.LIVE_COMMANDS);
        return streamingType;
    }

    private static String namespaceFromId(WithId withId) {
        return NamespaceReader.fromEntityId((EntityId)withId.getEntityId()).orElse(null);
    }

    private Criteria parseCriteria(String filter, DittoHeaders dittoHeaders) {
        CriteriaFactoryImpl criteriaFactory = new CriteriaFactoryImpl();
        ModelBasedThingsFieldExpressionFactory fieldExpressionFactory = new ModelBasedThingsFieldExpressionFactory();
        QueryFilterCriteriaFactory queryFilterCriteriaFactory = new QueryFilterCriteriaFactory((CriteriaFactory)criteriaFactory, (ThingsFieldExpressionFactory)fieldExpressionFactory);
        return queryFilterCriteriaFactory.filterCriteria(filter, dittoHeaders);
    }

    private boolean matchesFilter(Signal<?> signal) {
        if (signal instanceof ThingEvent) {
            StreamingType streamingType = StreamingSessionActor.determineStreamingType(signal);
            return ThingEventToThingConverter.thingEventToThing((ThingEvent)((ThingEvent)signal)).filter(thing -> this.doMatchFilter(streamingType, (Thing)thing)).isPresent();
        }
        return true;
    }

    private boolean doMatchFilter(StreamingType streamingType, Thing thing) {
        Optional<Criteria> criteria = Optional.ofNullable(this.eventFilterCriteriaForStreamingTypes.get(streamingType));
        return criteria.map(c -> ThingPredicateVisitor.apply((Criteria)c).test(thing)).orElse(true);
    }

    private void acknowledgeSubscription(StreamingType streamingType, ActorRef self) {
        if (this.outstandingSubscriptionAcks.contains(streamingType)) {
            this.outstandingSubscriptionAcks.remove(streamingType);
            this.eventAndResponsePublisher.tell((Object)new StreamingAck(streamingType, true), self);
            this.logger.debug("Subscribed to Cluster <{}> in <{}> session", (Object)streamingType, (Object)this.type);
        } else {
            this.logger.debug("Subscription already acked for type <{}> in <{}> session", (Object)streamingType, (Object)this.type);
        }
    }

    private void acknowledgeUnsubscription(StreamingType streamingType, ActorRef self) {
        this.eventAndResponsePublisher.tell((Object)new StreamingAck(streamingType, false), self);
        this.logger.debug("Unsubscribed from Cluster <{}> in <{}> session", (Object)streamingType, (Object)this.type);
    }

    private static final class AcknowledgeUnsubscription
    extends WithStreamingType {
        private AcknowledgeUnsubscription(StreamingType streamingType) {
            super(streamingType);
        }
    }

    private static final class AcknowledgeSubscription
    extends WithStreamingType {
        private AcknowledgeSubscription(StreamingType streamingType) {
            super(streamingType);
        }
    }

    private static abstract class WithStreamingType {
        private final StreamingType streamingType;

        private WithStreamingType(StreamingType streamingType) {
            this.streamingType = streamingType;
        }

        StreamingType getStreamingType() {
            return this.streamingType;
        }
    }
}

