package org.eclipse.ditto.services.gateway.streaming.actors;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.event.DiagnosticLoggingAdapter;
import akka.japi.pf.ReceiveBuilder;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.query.criteria.Criteria;
import org.eclipse.ditto.model.query.criteria.CriteriaFactoryImpl;
import org.eclipse.ditto.model.query.filter.QueryFilterCriteriaFactory;
import org.eclipse.ditto.model.query.things.ModelBasedThingsFieldExpressionFactory;
import org.eclipse.ditto.model.query.things.ThingPredicateVisitor;
import org.eclipse.ditto.model.things.Thing;
import org.eclipse.ditto.protocoladapter.TopicPath;
import org.eclipse.ditto.services.gateway.streaming.StartStreaming;
import org.eclipse.ditto.services.gateway.streaming.StopStreaming;
import org.eclipse.ditto.services.gateway.streaming.StreamingAck;
import org.eclipse.ditto.services.models.concierge.streaming.StreamingType;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.signals.base.WithId;
import org.eclipse.ditto.signals.commands.base.CommandResponse;
import org.eclipse.ditto.signals.commands.messages.MessageCommand;
import org.eclipse.ditto.signals.events.base.Event;
import org.eclipse.ditto.signals.events.things.ThingEvent;
import org.eclipse.ditto.signals.events.things.ThingEventToThingConverter;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/eclipse/ditto/services/gateway/streaming/actors/StreamingSessionActor.class */
final class StreamingSessionActor extends AbstractActor {
    private static final int MAX_SUBSCRIBE_TIMEOUT_MS = 5000;
    private final String connectionCorrelationId;
    private final String type;
    private final ActorRef pubSubMediator;
    private final ActorRef eventAndResponsePublisher;
    private List<String> authorizationSubjects;
    private final DiagnosticLoggingAdapter logger = LogUtil.obtain(this);
    private final Set<StreamingType> outstandingSubscriptionAcks = new HashSet();
    private Map<StreamingType, List<String>> namespacesForStreamingTypes = new EnumMap(StreamingType.class);
    private Map<StreamingType, Criteria> eventFilterCriteriaForStreamingTypes = new EnumMap(StreamingType.class);

    /* loaded from: input_file:org/eclipse/ditto/services/gateway/streaming/actors/StreamingSessionActor$AcknowledgeSubscription.class */
    private static final class AcknowledgeSubscription extends WithStreamingType {
        private AcknowledgeSubscription(StreamingType streamingType) {
            super(streamingType);
        }
    }

    /* loaded from: input_file:org/eclipse/ditto/services/gateway/streaming/actors/StreamingSessionActor$AcknowledgeUnsubscription.class */
    private static final class AcknowledgeUnsubscription extends WithStreamingType {
        private AcknowledgeUnsubscription(StreamingType streamingType) {
            super(streamingType);
        }
    }

    /* loaded from: input_file:org/eclipse/ditto/services/gateway/streaming/actors/StreamingSessionActor$WithStreamingType.class */
    private static abstract class WithStreamingType {
        private final StreamingType streamingType;

        private WithStreamingType(StreamingType streamingType) {
            this.streamingType = streamingType;
        }

        StreamingType getStreamingType() {
            return this.streamingType;
        }
    }

    private StreamingSessionActor(String str, String str2, ActorRef actorRef, ActorRef actorRef2) {
        this.connectionCorrelationId = str;
        this.type = str2;
        this.pubSubMediator = actorRef;
        this.eventAndResponsePublisher = actorRef2;
        getContext().watch(actorRef2);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Props props(String str, String str2, ActorRef actorRef, ActorRef actorRef2) {
        return Props.create(StreamingSessionActor.class, new Object[]{str, str2, actorRef, actorRef2});
    }

    public void postStop() throws Exception {
        LogUtil.enhanceLogWithCorrelationId(this.logger, this.connectionCorrelationId, new LogUtil.MdcField[0]);
        this.logger.info("Closing '{}' streaming session: {}", this.type, this.connectionCorrelationId);
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(CommandResponse.class, commandResponse -> {
            LogUtil.enhanceLogWithCorrelationId(this.logger, commandResponse, new LogUtil.MdcField[0]);
            this.logger.debug("Got 'CommandResponse' message in <{}> session, telling eventAndResponsePublisher about it: {}", this.type, commandResponse);
            this.eventAndResponsePublisher.forward(commandResponse, getContext());
        }).match(Signal.class, this::handleSignal).match(DittoRuntimeException.class, dittoRuntimeException -> {
            LogUtil.enhanceLogWithCorrelationId(this.logger, dittoRuntimeException, new LogUtil.MdcField[0]);
            this.logger.info("Got 'DittoRuntimeException' message in <{}> session, telling eventAndResponsePublisher about it: {}", this.type, dittoRuntimeException);
            this.eventAndResponsePublisher.forward(dittoRuntimeException, getContext());
        }).match(StartStreaming.class, startStreaming -> {
            this.authorizationSubjects = startStreaming.getAuthorizationContext().getAuthorizationSubjectIds();
            this.namespacesForStreamingTypes.put(startStreaming.getStreamingType(), startStreaming.getNamespaces());
            LogUtil.enhanceLogWithCorrelationId(this.logger, this.connectionCorrelationId, new LogUtil.MdcField[0]);
            try {
                this.eventFilterCriteriaForStreamingTypes.put(startStreaming.getStreamingType(), startStreaming.getFilter().map(str -> {
                    return parseCriteria(str, DittoHeaders.newBuilder().correlationId(startStreaming.getConnectionCorrelationId()).build());
                }).orElse(null));
                this.logger.debug("Got 'StartStreaming' message in <{}> session, subscribing for <{}> in Cluster..", this.type, startStreaming.getStreamingType().name());
                this.outstandingSubscriptionAcks.add(startStreaming.getStreamingType());
                this.pubSubMediator.tell(new DistributedPubSubMediator.Subscribe(startStreaming.getStreamingType().getDistributedPubSubTopic(), this.connectionCorrelationId, getSelf()), getSelf());
            } catch (DittoRuntimeException e) {
                this.logger.info("Got 'DittoRuntimeException' <{}> session during 'StartStreaming' processing: {}: <{}>", this.type, e.getClass().getSimpleName(), e.getMessage());
                this.eventAndResponsePublisher.tell(e, getSelf());
            }
        }).match(StopStreaming.class, stopStreaming -> {
            LogUtil.enhanceLogWithCorrelationId(this.logger, this.connectionCorrelationId, new LogUtil.MdcField[0]);
            this.logger.debug("Got 'StopStreaming' message in <{}> session, unsubscribing from <{}> in Cluster..", this.type, stopStreaming.getStreamingType().name());
            this.namespacesForStreamingTypes.remove(stopStreaming.getStreamingType());
            this.eventFilterCriteriaForStreamingTypes.remove(stopStreaming.getStreamingType());
            this.pubSubMediator.tell(new DistributedPubSubMediator.Unsubscribe(stopStreaming.getStreamingType().getDistributedPubSubTopic(), this.connectionCorrelationId, getSelf()), getSelf());
        }).match(DistributedPubSubMediator.SubscribeAck.class, subscribeAck -> {
            LogUtil.enhanceLogWithCorrelationId(this.logger, this.connectionCorrelationId, new LogUtil.MdcField[0]);
            StreamingType fromTopic = StreamingType.fromTopic(subscribeAck.subscribe().topic());
            ActorRef self = getSelf();
            getContext().getSystem().scheduler().scheduleOnce(FiniteDuration.apply(5000L, TimeUnit.MILLISECONDS), self, new AcknowledgeSubscription(fromTopic), getContext().getSystem().dispatcher(), self);
        }).match(DistributedPubSubMediator.UnsubscribeAck.class, unsubscribeAck -> {
            LogUtil.enhanceLogWithCorrelationId(this.logger, this.connectionCorrelationId, new LogUtil.MdcField[0]);
            StreamingType fromTopic = StreamingType.fromTopic(unsubscribeAck.unsubscribe().topic());
            ActorRef self = getSelf();
            getContext().getSystem().scheduler().scheduleOnce(FiniteDuration.apply(5000L, TimeUnit.MILLISECONDS), self, new AcknowledgeUnsubscription(fromTopic), getContext().getSystem().dispatcher(), self);
        }).match(AcknowledgeSubscription.class, acknowledgeSubscription -> {
            acknowledgeSubscription(acknowledgeSubscription.getStreamingType(), getSelf());
        }).match(AcknowledgeUnsubscription.class, acknowledgeUnsubscription -> {
            acknowledgeUnsubscription(acknowledgeUnsubscription.getStreamingType(), getSelf());
        }).match(Terminated.class, terminated -> {
            LogUtil.enhanceLogWithCorrelationId(this.logger, this.connectionCorrelationId, new LogUtil.MdcField[0]);
            this.logger.debug("eventAndResponsePublisher was terminated");
            this.logger.info("<{}> connection was closed, unsubscribing from Streams in Cluster..", this.type);
            Arrays.stream(StreamingType.values()).map((v0) -> {
                return v0.getDistributedPubSubTopic();
            }).forEach(str -> {
                this.pubSubMediator.tell(new DistributedPubSubMediator.Unsubscribe(str, this.connectionCorrelationId, getSelf()), getSelf());
            });
            getContext().getSystem().scheduler().scheduleOnce(FiniteDuration.apply(1L, TimeUnit.SECONDS), getSelf(), PoisonPill.getInstance(), getContext().dispatcher(), getSelf());
        }).matchAny(obj -> {
            LogUtil.enhanceLogWithCorrelationId(this.logger, this.connectionCorrelationId, new LogUtil.MdcField[0]);
            this.logger.warning("Got unknown message in '{}' session: '{}'", this.type, obj);
        }).build();
    }

    private void handleSignal(Signal<?> signal) {
        LogUtil.enhanceLogWithCorrelationId(this.logger, signal, new LogUtil.MdcField[0]);
        DittoHeaders dittoHeaders = signal.getDittoHeaders();
        if (this.connectionCorrelationId.equals(dittoHeaders.getOrigin().orElse(null))) {
            this.logger.debug("Got Signal <{}> in <{}> session, but this was issued by this connection itself, not telling eventAndResponsePublisher about it", signal.getType(), this.type);
            return;
        }
        if (this.authorizationSubjects == null || Collections.disjoint(dittoHeaders.getReadSubjects(), this.authorizationSubjects)) {
            return;
        }
        if (!matchesNamespaces(signal)) {
            this.logger.debug("Signal does not match namespaces");
        } else if (!matchesFilter(signal)) {
            this.logger.debug("Signal does not match filter");
        } else {
            this.logger.debug("Got Signal <{}> in <{}> session, telling eventAndResponsePublisher about it: {}", signal.getType(), this.type, signal);
            this.eventAndResponsePublisher.tell(signal, getSelf());
        }
    }

    private boolean matchesNamespaces(Signal<?> signal) {
        List list = (List) Optional.ofNullable(this.namespacesForStreamingTypes.get(determineStreamingType(signal))).orElse(Collections.emptyList());
        return list.isEmpty() || list.contains(namespaceFromId(signal));
    }

    private static StreamingType determineStreamingType(Signal<?> signal) {
        StreamingType streamingType;
        String str = (String) signal.getDittoHeaders().getChannel().orElse(TopicPath.Channel.TWIN.getName());
        if (signal instanceof Event) {
            streamingType = str.equals(TopicPath.Channel.TWIN.getName()) ? StreamingType.EVENTS : StreamingType.LIVE_EVENTS;
        } else {
            streamingType = signal instanceof MessageCommand ? StreamingType.MESSAGES : StreamingType.LIVE_COMMANDS;
        }
        return streamingType;
    }

    private static String namespaceFromId(WithId withId) {
        return withId.getId().split(":", 2)[0];
    }

    private Criteria parseCriteria(String str, DittoHeaders dittoHeaders) {
        return new QueryFilterCriteriaFactory(new CriteriaFactoryImpl(), new ModelBasedThingsFieldExpressionFactory()).filterCriteria(str, dittoHeaders);
    }

    private boolean matchesFilter(Signal<?> signal) {
        if (!(signal instanceof ThingEvent)) {
            return true;
        }
        StreamingType determineStreamingType = determineStreamingType(signal);
        return ThingEventToThingConverter.thingEventToThing((ThingEvent) signal).filter(thing -> {
            return doMatchFilter(determineStreamingType, thing);
        }).isPresent();
    }

    private boolean doMatchFilter(StreamingType streamingType, Thing thing) {
        return ((Boolean) Optional.ofNullable(this.eventFilterCriteriaForStreamingTypes.get(streamingType)).map(criteria -> {
            return Boolean.valueOf(ThingPredicateVisitor.apply(criteria).test(thing));
        }).orElse(true)).booleanValue();
    }

    private void acknowledgeSubscription(StreamingType streamingType, ActorRef actorRef) {
        if (!this.outstandingSubscriptionAcks.contains(streamingType)) {
            this.logger.debug("Subscription already acked for type <{}> in <{}> session", streamingType, this.type);
            return;
        }
        this.outstandingSubscriptionAcks.remove(streamingType);
        this.eventAndResponsePublisher.tell(new StreamingAck(streamingType, true), actorRef);
        this.logger.debug("Subscribed to Cluster <{}> in <{}> session", streamingType, this.type);
    }

    private void acknowledgeUnsubscription(StreamingType streamingType, ActorRef actorRef) {
        this.eventAndResponsePublisher.tell(new StreamingAck(streamingType, false), actorRef);
        this.logger.debug("Unsubscribed from Cluster <{}> in <{}> session", streamingType, this.type);
    }
}
