package org.eclipse.ditto.services.gateway.streaming.actors;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Cancellable;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.japi.pf.ReceiveBuilder;
import java.time.Instant;
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.eclipse.ditto.model.base.auth.AuthorizationContext;
import org.eclipse.ditto.model.base.auth.AuthorizationModelFactory;
import org.eclipse.ditto.model.base.entity.id.EntityIdWithType;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.base.json.JsonSchemaVersion;
import org.eclipse.ditto.model.namespaces.NamespaceReader;
import org.eclipse.ditto.model.query.criteria.Criteria;
import org.eclipse.ditto.model.query.criteria.CriteriaFactoryImpl;
import org.eclipse.ditto.model.query.filter.QueryFilterCriteriaFactory;
import org.eclipse.ditto.model.query.things.ModelBasedThingsFieldExpressionFactory;
import org.eclipse.ditto.protocoladapter.HeaderTranslator;
import org.eclipse.ditto.protocoladapter.TopicPath;
import org.eclipse.ditto.services.gateway.streaming.CloseStreamExceptionally;
import org.eclipse.ditto.services.gateway.streaming.Connect;
import org.eclipse.ditto.services.gateway.streaming.InvalidJwt;
import org.eclipse.ditto.services.gateway.streaming.RefreshSession;
import org.eclipse.ditto.services.gateway.streaming.StartStreaming;
import org.eclipse.ditto.services.gateway.streaming.StopStreaming;
import org.eclipse.ditto.services.models.acks.AcknowledgementAggregatorActor;
import org.eclipse.ditto.services.models.acks.AcknowledgementForwarderActor;
import org.eclipse.ditto.services.models.acks.config.AcknowledgementConfig;
import org.eclipse.ditto.services.models.concierge.pubsub.DittoProtocolSub;
import org.eclipse.ditto.services.models.concierge.streaming.StreamingType;
import org.eclipse.ditto.services.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.services.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.signals.acks.base.Acknowledgement;
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.signals.base.WithId;
import org.eclipse.ditto.signals.commands.base.CommandResponse;
import org.eclipse.ditto.signals.commands.base.exceptions.GatewayWebsocketSessionClosedException;
import org.eclipse.ditto.signals.commands.base.exceptions.GatewayWebsocketSessionExpiredException;
import org.eclipse.ditto.signals.commands.messages.MessageCommand;
import org.eclipse.ditto.signals.commands.things.ThingCommandResponse;
import org.eclipse.ditto.signals.commands.things.modify.ThingModifyCommand;
import org.eclipse.ditto.signals.commands.thingsearch.subscription.CancelSubscription;
import org.eclipse.ditto.signals.commands.thingsearch.subscription.CreateSubscription;
import org.eclipse.ditto.signals.commands.thingsearch.subscription.RequestFromSubscription;
import org.eclipse.ditto.signals.events.base.Event;
import org.eclipse.ditto.signals.events.things.ThingEvent;
import org.eclipse.ditto.signals.events.thingsearch.SubscriptionEvent;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/eclipse/ditto/services/gateway/streaming/actors/StreamingSessionActor.class */
final class StreamingSessionActor extends AbstractActor {
    private final JsonSchemaVersion jsonSchemaVersion;
    private final String connectionCorrelationId;
    private final String type;
    private final DittoProtocolSub dittoProtocolSub;
    private final ActorRef eventAndResponsePublisher;
    private final AcknowledgementConfig acknowledgementConfig;
    private final HeaderTranslator headerTranslator;
    private final ActorRef subscriptionManager;

    @Nullable
    private Cancellable sessionTerminationCancellable;
    private final Set<StreamingType> outstandingSubscriptionAcks = EnumSet.noneOf(StreamingType.class);
    private AuthorizationContext authorizationContext = AuthorizationModelFactory.emptyAuthContext();
    private final Map<StreamingType, StreamingSession> streamingSessions = new EnumMap(StreamingType.class);
    private final DittoDiagnosticLoggingAdapter logger = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);

    /* loaded from: input_file:org/eclipse/ditto/services/gateway/streaming/actors/StreamingSessionActor$AcknowledgeSubscription.class */
    private static final class AcknowledgeSubscription extends WithStreamingType {
        private AcknowledgeSubscription(StreamingType streamingType) {
            super(streamingType);
        }
    }

    /* loaded from: input_file:org/eclipse/ditto/services/gateway/streaming/actors/StreamingSessionActor$AcknowledgeUnsubscription.class */
    private static final class AcknowledgeUnsubscription extends WithStreamingType {
        private AcknowledgeUnsubscription(StreamingType streamingType) {
            super(streamingType);
        }
    }

    /* loaded from: input_file:org/eclipse/ditto/services/gateway/streaming/actors/StreamingSessionActor$WithStreamingType.class */
    private static abstract class WithStreamingType {
        private final StreamingType streamingType;

        private WithStreamingType(StreamingType streamingType) {
            this.streamingType = streamingType;
        }

        StreamingType getStreamingType() {
            return this.streamingType;
        }
    }

    private StreamingSessionActor(Connect connect, DittoProtocolSub dittoProtocolSub, ActorRef actorRef, AcknowledgementConfig acknowledgementConfig, HeaderTranslator headerTranslator, Props props) {
        this.jsonSchemaVersion = connect.getJsonSchemaVersion();
        this.connectionCorrelationId = connect.getConnectionCorrelationId();
        this.type = connect.getType();
        this.dittoProtocolSub = dittoProtocolSub;
        this.eventAndResponsePublisher = actorRef;
        this.acknowledgementConfig = acknowledgementConfig;
        this.headerTranslator = headerTranslator;
        this.logger.setCorrelationId(this.connectionCorrelationId);
        connect.getSessionExpirationTime().ifPresent(instant -> {
            this.sessionTerminationCancellable = startSessionTimeout(instant);
        });
        this.subscriptionManager = getContext().actorOf(props, "subscriptionManager");
        getContext().watch(actorRef);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Props props(Connect connect, DittoProtocolSub dittoProtocolSub, ActorRef actorRef, AcknowledgementConfig acknowledgementConfig, HeaderTranslator headerTranslator, Props props) {
        return Props.create(StreamingSessionActor.class, new Object[]{connect, dittoProtocolSub, actorRef, acknowledgementConfig, headerTranslator, props});
    }

    public void postStop() {
        cancelSessionTimeout();
        this.logger.info("Closing <{}> streaming session.", this.type);
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(Acknowledgement.class, acknowledgement -> {
            potentiallyForwardToAckregator(acknowledgement, () -> {
                forwardAcknowledgement(acknowledgement);
            });
        }).match(ThingCommandResponse.class, thingCommandResponse -> {
            potentiallyForwardToAckregator(thingCommandResponse, () -> {
                handleResponse(thingCommandResponse);
            });
        }).match(CommandResponse.class, this::handleResponse).match(ThingEvent.class, thingEvent -> {
            handleSignalsToStartAckForwarderFor(thingEvent, thingEvent.getEntityId());
        }).match(ThingModifyCommand.class, thingModifyCommand -> {
            ActorRef self = getSelf();
            try {
                AcknowledgementAggregatorActor.startAcknowledgementAggregator(getContext(), thingModifyCommand, this.acknowledgementConfig, this.headerTranslator, signal -> {
                    self.tell(signal, self);
                });
                handleSignal(thingModifyCommand);
            } catch (DittoRuntimeException e) {
                this.logger.withCorrelationId(thingModifyCommand).info("Got 'DittoRuntimeException' <{}> session during 'startAcknowledgementAggregator': {}: <{}>", this.type, e.getClass().getSimpleName(), e.getMessage());
                this.eventAndResponsePublisher.tell(SessionedJsonifiable.error(e), getSelf());
            }
        }).match(Signal.class, this::handleSignal).match(DittoRuntimeException.class, dittoRuntimeException -> {
            this.logger.withCorrelationId(dittoRuntimeException).info("Got 'DittoRuntimeException' message in <{}> session, telling EventAndResponsePublisher about it: {}", this.type, dittoRuntimeException);
            this.eventAndResponsePublisher.forward(SessionedJsonifiable.error(dittoRuntimeException), getContext());
        }).match(StartStreaming.class, startStreaming -> {
            this.authorizationContext = startStreaming.getAuthorizationContext();
            this.logger.setCorrelationId(this.connectionCorrelationId);
            try {
                this.streamingSessions.put(startStreaming.getStreamingType(), StreamingSession.of(startStreaming.getNamespaces(), (Criteria) startStreaming.getFilter().map(str -> {
                    return parseCriteria(str, DittoHeaders.newBuilder().correlationId(startStreaming.getConnectionCorrelationId()).build());
                }).orElse(null), startStreaming.getExtraFields().orElse(null)));
                this.logger.debug("Got 'StartStreaming' message in <{}> session, subscribing for <{}> in Cluster ...", this.type, startStreaming.getStreamingType().name());
                this.outstandingSubscriptionAcks.add(startStreaming.getStreamingType());
                AcknowledgeSubscription acknowledgeSubscription = new AcknowledgeSubscription(startStreaming.getStreamingType());
                this.dittoProtocolSub.subscribe(this.streamingSessions.keySet(), this.authorizationContext.getAuthorizationSubjectIds(), getSelf()).thenAccept(r6 -> {
                    getSelf().tell(acknowledgeSubscription, getSelf());
                });
            } catch (DittoRuntimeException e) {
                this.logger.info("Got 'DittoRuntimeException' <{}> session during 'StartStreaming' processing: {}: <{}>", this.type, e.getClass().getSimpleName(), e.getMessage());
                this.eventAndResponsePublisher.tell(SessionedJsonifiable.error(e), getSelf());
            }
        }).match(StopStreaming.class, stopStreaming -> {
            this.logger.debug("Got 'StopStreaming' message in <{}> session, unsubscribing from <{}> in Cluster ...", this.type, stopStreaming.getStreamingType().name());
            this.streamingSessions.remove(stopStreaming.getStreamingType());
            AcknowledgeUnsubscription acknowledgeUnsubscription = new AcknowledgeUnsubscription(stopStreaming.getStreamingType());
            Set<StreamingType> keySet = this.streamingSessions.keySet();
            if (stopStreaming.getStreamingType() != StreamingType.EVENTS) {
                this.dittoProtocolSub.updateLiveSubscriptions(keySet, this.authorizationContext.getAuthorizationSubjectIds(), getSelf()).thenAccept(r6 -> {
                    getSelf().tell(acknowledgeUnsubscription, getSelf());
                });
            } else {
                this.dittoProtocolSub.removeTwinSubscriber(getSelf(), this.authorizationContext.getAuthorizationSubjectIds()).thenAccept(r62 -> {
                    getSelf().tell(acknowledgeUnsubscription, getSelf());
                });
            }
        }).match(RefreshSession.class, refreshSession -> {
            cancelSessionTimeout();
            checkAuthorizationContextAndStartSessionTimer(refreshSession);
        }).match(InvalidJwt.class, invalidJwt -> {
            cancelSessionTimeout();
        }).match(AcknowledgeSubscription.class, acknowledgeSubscription -> {
            acknowledgeSubscription(acknowledgeSubscription.getStreamingType(), getSelf());
        }).match(AcknowledgeUnsubscription.class, acknowledgeUnsubscription -> {
            acknowledgeUnsubscription(acknowledgeUnsubscription.getStreamingType(), getSelf());
        }).match(Terminated.class, terminated -> {
            this.logger.setCorrelationId(this.connectionCorrelationId);
            this.logger.debug("EventAndResponsePublisher was terminated.");
            this.logger.info("<{}> connection was closed, unsubscribing from Streams in Cluster ...", this.type);
            this.dittoProtocolSub.removeSubscriber(getSelf());
            getContext().getSystem().scheduler().scheduleOnce(FiniteDuration.apply(1L, TimeUnit.SECONDS), getSelf(), PoisonPill.getInstance(), getContext().dispatcher(), getSelf());
        }).matchAny(obj -> {
            this.logger.withCorrelationId(this.connectionCorrelationId).warning("Got unknown message in '{}' session: '{}'", this.type, obj);
        }).build();
    }

    private void potentiallyForwardToAckregator(CommandResponse<?> commandResponse, Runnable runnable) {
        AbstractActor.ActorContext context = getContext();
        context.findChild(AcknowledgementAggregatorActor.determineActorName(commandResponse.getDittoHeaders())).ifPresentOrElse(actorRef -> {
            actorRef.forward(commandResponse, context);
        }, runnable);
    }

    private void handleResponse(CommandResponse<?> commandResponse) {
        this.logger.withCorrelationId(commandResponse).debug("Got 'CommandResponse' message in <{}> session, telling EventAndResponsePublisher about it: {}", this.type, commandResponse);
        this.eventAndResponsePublisher.forward(SessionedJsonifiable.response(commandResponse), getContext());
    }

    private void forwardAcknowledgement(Acknowledgement acknowledgement) {
        AbstractActor.ActorContext context = getContext();
        context.findChild(AcknowledgementForwarderActor.determineActorName(acknowledgement.getDittoHeaders())).ifPresentOrElse(actorRef -> {
            actorRef.forward(acknowledgement, context);
        }, () -> {
            this.logger.withCorrelationId(acknowledgement).info("Received Acknowledgement but no AcknowledgementForwarderActor was present: <{}>", acknowledgement);
        });
    }

    private void handleSignalsToStartAckForwarderFor(Signal<?> signal, EntityIdWithType entityIdWithType) {
        AcknowledgementForwarderActor.startAcknowledgementForwarder(getContext(), entityIdWithType, signal.getDittoHeaders(), this.acknowledgementConfig);
        handleSignal(signal);
    }

    private void handleSignal(Signal<?> signal) {
        this.logger.setCorrelationId(signal);
        DittoHeaders dittoHeaders = signal.getDittoHeaders();
        if ((signal instanceof CreateSubscription) || (signal instanceof RequestFromSubscription) || (signal instanceof CancelSubscription)) {
            this.subscriptionManager.tell(signal, getSelf());
        } else if (signal instanceof SubscriptionEvent) {
            this.logger.debug("Got SubscriptionEvent <{}> in <{}> session, telling EventAndResponsePublisher about it: {}", signal.getType(), this.type, signal);
            this.eventAndResponsePublisher.tell(SessionedJsonifiable.subscription((SubscriptionEvent) signal), getSelf());
        } else if (signal instanceof CommandResponse) {
            this.logger.debug("Got CommandResponse <{}> in <{}> session, telling EventAndResponsePublisher about it: {}", signal.getType(), this.type, signal);
            this.eventAndResponsePublisher.forward(SessionedJsonifiable.response((CommandResponse) signal), getContext());
        } else if (this.connectionCorrelationId.equals(dittoHeaders.getOrigin().orElse(null))) {
            this.logger.debug("Got Signal <{}> in <{}> session, but this was issued by this connection itself, not telling EventAndResponsePublisher about it", signal.getType(), this.type);
        } else {
            StreamingSession streamingSession = this.streamingSessions.get(determineStreamingType(signal));
            if (null != streamingSession && isSessionAllowedToReceiveSignal(signal, streamingSession)) {
                this.logger.debug("Got Signal <{}> in <{}> session, telling EventAndResponsePublisher about it: {}", signal.getType(), this.type, signal);
                this.eventAndResponsePublisher.tell(SessionedJsonifiable.signal(signal, DittoHeaders.newBuilder().authorizationContext(this.authorizationContext).schemaVersion(this.jsonSchemaVersion).build(), streamingSession), getSelf());
            }
        }
        this.logger.setCorrelationId(this.connectionCorrelationId);
    }

    private boolean isSessionAllowedToReceiveSignal(Signal<?> signal, StreamingSession streamingSession) {
        DittoHeaders dittoHeaders = signal.getDittoHeaders();
        return this.authorizationContext.isAuthorized(dittoHeaders.getReadGrantedSubjects(), dittoHeaders.getReadRevokedSubjects()) && matchesNamespaces(signal, streamingSession);
    }

    private Cancellable startSessionTimeout(Instant instant) {
        long epochMilli = instant.minusMillis(Instant.now().toEpochMilli()).toEpochMilli();
        this.logger.debug("Starting session timeout - session will expire in {} ms", Long.valueOf(epochMilli));
        return getContext().getSystem().getScheduler().scheduleOnce(FiniteDuration.apply(epochMilli, TimeUnit.MILLISECONDS), this::handleSessionTimeout, getContext().getDispatcher());
    }

    private void handleSessionTimeout() {
        this.logger.info("Stopping WebSocket session for connection with ID <{}>.", this.connectionCorrelationId);
        this.eventAndResponsePublisher.tell(CloseStreamExceptionally.getInstance(GatewayWebsocketSessionExpiredException.newBuilder().dittoHeaders(DittoHeaders.newBuilder().correlationId(this.connectionCorrelationId).build()).build(), this.connectionCorrelationId), getSelf());
    }

    private void cancelSessionTimeout() {
        if (null != this.sessionTerminationCancellable) {
            this.sessionTerminationCancellable.cancel();
        }
    }

    private void checkAuthorizationContextAndStartSessionTimer(RefreshSession refreshSession) {
        if (this.authorizationContext.equals(refreshSession.getAuthorizationContext())) {
            this.sessionTerminationCancellable = startSessionTimeout(refreshSession.getSessionTimeout());
            return;
        }
        this.logger.debug("Authorization Context changed for WebSocket session <{}>. Terminating the session.", this.connectionCorrelationId);
        this.eventAndResponsePublisher.tell(CloseStreamExceptionally.getInstance(GatewayWebsocketSessionClosedException.newBuilder().dittoHeaders(DittoHeaders.newBuilder().correlationId(this.connectionCorrelationId).build()).build(), this.connectionCorrelationId), getSelf());
    }

    private boolean matchesNamespaces(Signal<?> signal, StreamingSession streamingSession) {
        List<String> namespaces = streamingSession.getNamespaces();
        boolean z = namespaces.isEmpty() || namespaces.contains(namespaceFromId(signal));
        if (!z) {
            this.logger.debug("Signal does not match namespaces.");
        }
        return z;
    }

    private static StreamingType determineStreamingType(Signal<?> signal) {
        StreamingType streamingType;
        String str = (String) signal.getDittoHeaders().getChannel().orElse(TopicPath.Channel.TWIN.getName());
        if (signal instanceof Event) {
            streamingType = str.equals(TopicPath.Channel.TWIN.getName()) ? StreamingType.EVENTS : StreamingType.LIVE_EVENTS;
        } else {
            streamingType = signal instanceof MessageCommand ? StreamingType.MESSAGES : StreamingType.LIVE_COMMANDS;
        }
        return streamingType;
    }

    @Nullable
    private static String namespaceFromId(WithId withId) {
        return (String) NamespaceReader.fromEntityId(withId.getEntityId()).orElse(null);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Criteria parseCriteria(String str, DittoHeaders dittoHeaders) {
        return new QueryFilterCriteriaFactory(new CriteriaFactoryImpl(), new ModelBasedThingsFieldExpressionFactory()).filterCriteria(str, dittoHeaders);
    }

    private void acknowledgeSubscription(StreamingType streamingType, ActorRef actorRef) {
        if (!this.outstandingSubscriptionAcks.contains(streamingType)) {
            this.logger.debug("Subscription already acked for type <{}> in <{}> session.", streamingType, this.type);
            return;
        }
        this.outstandingSubscriptionAcks.remove(streamingType);
        this.eventAndResponsePublisher.tell(SessionedJsonifiable.ack(streamingType, true, this.connectionCorrelationId), actorRef);
        this.logger.debug("Subscribed to Cluster <{}> in <{}> session.", streamingType, this.type);
    }

    private void acknowledgeUnsubscription(StreamingType streamingType, ActorRef actorRef) {
        this.eventAndResponsePublisher.tell(SessionedJsonifiable.ack(streamingType, false, this.connectionCorrelationId), actorRef);
        this.logger.debug("Unsubscribed from Cluster <{}> in <{}> session.", streamingType, this.type);
    }
}
