/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.ditto.services.gateway.streaming.actors;

import akka.Done;
import akka.actor.AbstractActor;
import akka.actor.AbstractActorWithTimers;
import akka.actor.Actor;
import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.Cancellable;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.japi.pf.PFBuilder;
import akka.japi.pf.ReceiveBuilder;
import akka.stream.javadsl.SourceQueueWithComplete;
import java.time.Instant;
import java.util.Collection;
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.eclipse.ditto.model.base.acks.AbstractCommandAckRequestSetter;
import org.eclipse.ditto.model.base.acks.AcknowledgementLabel;
import org.eclipse.ditto.model.base.acks.AcknowledgementLabelNotDeclaredException;
import org.eclipse.ditto.model.base.acks.AcknowledgementLabelNotUniqueException;
import org.eclipse.ditto.model.base.acks.FatalPubSubException;
import org.eclipse.ditto.model.base.auth.AuthorizationContext;
import org.eclipse.ditto.model.base.auth.AuthorizationModelFactory;
import org.eclipse.ditto.model.base.entity.id.EntityId;
import org.eclipse.ditto.model.base.entity.id.EntityIdWithType;
import org.eclipse.ditto.model.base.exceptions.DittoHeaderInvalidException;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.headers.DittoHeaderDefinition;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.base.headers.WithDittoHeaders;
import org.eclipse.ditto.model.base.json.JsonSchemaVersion;
import org.eclipse.ditto.model.jwt.ImmutableJsonWebToken;
import org.eclipse.ditto.model.jwt.JsonWebToken;
import org.eclipse.ditto.model.namespaces.NamespaceReader;
import org.eclipse.ditto.model.query.criteria.Criteria;
import org.eclipse.ditto.model.query.criteria.CriteriaFactory;
import org.eclipse.ditto.model.query.criteria.CriteriaFactoryImpl;
import org.eclipse.ditto.model.query.expression.ThingsFieldExpressionFactory;
import org.eclipse.ditto.model.query.filter.QueryFilterCriteriaFactory;
import org.eclipse.ditto.model.query.things.ModelBasedThingsFieldExpressionFactory;
import org.eclipse.ditto.model.things.ThingId;
import org.eclipse.ditto.model.things.WithThingId;
import org.eclipse.ditto.protocoladapter.HeaderTranslator;
import org.eclipse.ditto.protocoladapter.TopicPath;
import org.eclipse.ditto.services.gateway.security.authentication.AuthenticationResult;
import org.eclipse.ditto.services.gateway.security.authentication.jwt.JwtAuthenticationResultProvider;
import org.eclipse.ditto.services.gateway.security.authentication.jwt.JwtValidator;
import org.eclipse.ditto.services.gateway.streaming.Connect;
import org.eclipse.ditto.services.gateway.streaming.IncomingSignal;
import org.eclipse.ditto.services.gateway.streaming.InvalidJwt;
import org.eclipse.ditto.services.gateway.streaming.Jwt;
import org.eclipse.ditto.services.gateway.streaming.RefreshSession;
import org.eclipse.ditto.services.gateway.streaming.StartStreaming;
import org.eclipse.ditto.services.gateway.streaming.StopStreaming;
import org.eclipse.ditto.services.gateway.streaming.actors.SessionedJsonifiable;
import org.eclipse.ditto.services.gateway.streaming.actors.StreamingSession;
import org.eclipse.ditto.services.models.acks.AcknowledgementAggregatorActorStarter;
import org.eclipse.ditto.services.models.acks.AcknowledgementForwarderActor;
import org.eclipse.ditto.services.models.acks.config.AcknowledgementConfig;
import org.eclipse.ditto.services.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.services.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.services.utils.pubsub.DittoProtocolSub;
import org.eclipse.ditto.services.utils.pubsub.StreamingType;
import org.eclipse.ditto.signals.acks.base.Acknowledgement;
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.signals.base.WithId;
import org.eclipse.ditto.signals.commands.base.Command;
import org.eclipse.ditto.signals.commands.base.CommandResponse;
import org.eclipse.ditto.signals.commands.base.exceptions.GatewayInternalErrorException;
import org.eclipse.ditto.signals.commands.base.exceptions.GatewayWebsocketSessionClosedException;
import org.eclipse.ditto.signals.commands.base.exceptions.GatewayWebsocketSessionExpiredException;
import org.eclipse.ditto.signals.commands.messages.MessageCommand;
import org.eclipse.ditto.signals.commands.messages.acks.MessageCommandAckRequestSetter;
import org.eclipse.ditto.signals.commands.things.acks.ThingLiveCommandAckRequestSetter;
import org.eclipse.ditto.signals.commands.things.acks.ThingModifyCommandAckRequestSetter;
import org.eclipse.ditto.signals.commands.thingsearch.ThingSearchCommand;
import org.eclipse.ditto.signals.events.base.Event;
import org.eclipse.ditto.signals.events.thingsearch.SubscriptionEvent;
import scala.PartialFunction;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.FiniteDuration;

final class StreamingSessionActor
extends AbstractActorWithTimers {
    private final JsonSchemaVersion jsonSchemaVersion;
    private final String connectionCorrelationId;
    private final String type;
    private final DittoProtocolSub dittoProtocolSub;
    private final SourceQueueWithComplete<SessionedJsonifiable> eventAndResponsePublisher;
    private final ActorRef commandRouter;
    private final AcknowledgementConfig acknowledgementConfig;
    private final ActorRef subscriptionManager;
    private final Set<StreamingType> outstandingSubscriptionAcks;
    private final Map<StreamingType, StreamingSession> streamingSessions;
    private final JwtValidator jwtValidator;
    private final JwtAuthenticationResultProvider jwtAuthenticationResultProvider;
    private final AcknowledgementAggregatorActorStarter ackregatorStarter;
    private final Set<AcknowledgementLabel> declaredAcks;
    private final ThreadSafeDittoLoggingAdapter logger;
    @Nullable
    private Cancellable sessionTerminationCancellable;
    private AuthorizationContext authorizationContext;

    private StreamingSessionActor(Connect connect, DittoProtocolSub dittoProtocolSub, ActorRef commandRouter, AcknowledgementConfig acknowledgementConfig, HeaderTranslator headerTranslator, Props subscriptionManagerProps, JwtValidator jwtValidator, JwtAuthenticationResultProvider jwtAuthenticationResultProvider) {
        this.jsonSchemaVersion = connect.getJsonSchemaVersion();
        this.connectionCorrelationId = connect.getConnectionCorrelationId();
        this.type = connect.getType();
        this.dittoProtocolSub = dittoProtocolSub;
        this.eventAndResponsePublisher = connect.getEventAndResponsePublisher();
        this.commandRouter = commandRouter;
        this.acknowledgementConfig = acknowledgementConfig;
        this.jwtValidator = jwtValidator;
        this.jwtAuthenticationResultProvider = jwtAuthenticationResultProvider;
        this.outstandingSubscriptionAcks = EnumSet.noneOf(StreamingType.class);
        this.authorizationContext = AuthorizationModelFactory.emptyAuthContext();
        this.streamingSessions = new EnumMap<StreamingType, StreamingSession>(StreamingType.class);
        this.ackregatorStarter = AcknowledgementAggregatorActorStarter.of((ActorContext)this.getContext(), (AcknowledgementConfig)acknowledgementConfig, (HeaderTranslator)headerTranslator, (AbstractCommandAckRequestSetter[])new AbstractCommandAckRequestSetter[]{ThingModifyCommandAckRequestSetter.getInstance(), ThingLiveCommandAckRequestSetter.getInstance(), MessageCommandAckRequestSetter.getInstance()});
        this.logger = DittoLoggerFactory.getThreadSafeDittoLoggingAdapter((Actor)this).withCorrelationId((CharSequence)this.connectionCorrelationId);
        connect.getSessionExpirationTime().ifPresent(expiration -> {
            this.sessionTerminationCancellable = this.startSessionTimeout((Instant)expiration);
        });
        this.subscriptionManager = this.getContext().actorOf(subscriptionManagerProps, "subscriptionManager");
        this.declaredAcks = connect.getDeclaredAcknowledgementLabels();
    }

    static Props props(Connect connect, DittoProtocolSub dittoProtocolSub, ActorRef commandRouter, AcknowledgementConfig acknowledgementConfig, HeaderTranslator headerTranslator, Props subscriptionManagerProps, JwtValidator jwtValidator, JwtAuthenticationResultProvider jwtAuthenticationResultProvider) {
        return Props.create(StreamingSessionActor.class, (Object[])new Object[]{connect, dittoProtocolSub, commandRouter, acknowledgementConfig, headerTranslator, subscriptionManagerProps, jwtValidator, jwtAuthenticationResultProvider});
    }

    public void preStart() {
        this.eventAndResponsePublisher.watchCompletion().whenComplete((done, error) -> this.getSelf().tell((Object)Control.TERMINATED, this.getSelf()));
        this.declareAcknowledgementLabels(this.declaredAcks);
    }

    public void postStop() {
        this.logger.info("Closing <{}> streaming session.", (Object)this.type);
        this.cancelSessionTimeout();
        this.eventAndResponsePublisher.complete();
    }

    public AbstractActor.Receive createReceive() {
        return this.createIncomingSignalBehavior().orElse(this.createPubSubBehavior()).orElse(this.createSelfTerminationBehavior()).orElse(this.createOutgoingSignalBehavior()).orElse(this.logUnknownMessage());
    }

    private AbstractActor.Receive createIncomingSignalBehavior() {
        PartialFunction stripEnvelope = new PFBuilder().match(IncomingSignal.class, IncomingSignal::getSignal).build();
        PartialFunction setAckRequestAndStartAckregator = new PFBuilder().match(Signal.class, this::startAckregatorAndForward).matchAny(x -> x).build();
        AbstractActor.Receive signalBehavior = ReceiveBuilder.create().match(Acknowledgement.class, this::hasUndeclaredAckLabel, this::ackLabelNotDeclared).match(CommandResponse.class, this::forwardAcknowledgementOrLiveCommandResponse).match(ThingSearchCommand.class, this::forwardSearchCommand).match(Signal.class, signal -> this.commandRouter.tell(signal, this.getReturnAddress((Signal<?>)signal))).matchEquals((Object)Done.getInstance(), done -> {}).build();
        return this.addPreprocessors(List.of(stripEnvelope, setAckRequestAndStartAckregator), signalBehavior);
    }

    private AbstractActor.Receive createOutgoingSignalBehavior() {
        PartialFunction setCorrelationIdAndStartAckForwarder = new PFBuilder().match(Signal.class, this::startAckForwarder).match(DittoRuntimeException.class, x -> x).build();
        AbstractActor.Receive publishSignal = ReceiveBuilder.create().match(SubscriptionEvent.class, signal -> {
            this.logger.debug("Got SubscriptionEvent in <{}> session, publishing: {}", (Object)this.type, signal);
            this.eventAndResponsePublisher.offer((Object)SessionedJsonifiable.subscription(signal));
        }).match(CommandResponse.class, this::publishResponseOrError).match(DittoRuntimeException.class, this::publishResponseOrError).match(Signal.class, this::isSameOrigin, signal -> this.logger.withCorrelationId((WithDittoHeaders)signal).debug("Got Signal <{}> in <{}> session, but this was issued by  this connection itself, not publishing", (Object)signal.getType(), (Object)this.type)).match(Signal.class, signal -> {
            StreamingSession session = this.streamingSessions.get(StreamingSessionActor.determineStreamingType(signal));
            if (null != session && this.isSessionAllowedToReceiveSignal((Signal<?>)signal, session)) {
                this.logger.withCorrelationId((WithDittoHeaders)signal).debug("Got Signal in <{}> session, publishing: {}", (Object)this.type, signal);
                DittoHeaders sessionHeaders = DittoHeaders.newBuilder().authorizationContext(this.authorizationContext).schemaVersion(this.jsonSchemaVersion).build();
                SessionedJsonifiable sessionedJsonifiable = SessionedJsonifiable.signal(signal, sessionHeaders, session);
                this.eventAndResponsePublisher.offer((Object)sessionedJsonifiable);
            }
        }).matchEquals((Object)Done.getInstance(), done -> {}).build();
        return this.addPreprocessors(List.of(setCorrelationIdAndStartAckForwarder), publishSignal);
    }

    private AbstractActor.Receive createPubSubBehavior() {
        return ReceiveBuilder.create().match(StartStreaming.class, startStreaming -> {
            Criteria criteria;
            this.authorizationContext = startStreaming.getAuthorizationContext();
            try {
                criteria = startStreaming.getFilter().map(f -> StreamingSessionActor.parseCriteria(f, DittoHeaders.newBuilder().correlationId(startStreaming.getCorrelationId().orElse(startStreaming.getConnectionCorrelationId())).build())).orElse(null);
            }
            catch (DittoRuntimeException e) {
                this.logger.info("Got 'DittoRuntimeException' <{}> session during 'StartStreaming' processing: {}: <{}>", (Object)this.type, (Object)((Object)((Object)e)).getClass().getSimpleName(), (Object)e.getMessage());
                this.eventAndResponsePublisher.offer((Object)SessionedJsonifiable.error(e));
                return;
            }
            StreamingSession session = StreamingSession.of(startStreaming.getNamespaces(), criteria, startStreaming.getExtraFields().orElse(null), this.self());
            this.streamingSessions.put(startStreaming.getStreamingType(), session);
            this.logger.debug("Got 'StartStreaming' message in <{}> session, subscribing for <{}> in Cluster ...", (Object)this.type, (Object)startStreaming.getStreamingType().name());
            this.outstandingSubscriptionAcks.add(startStreaming.getStreamingType());
            ConfirmSubscription subscribeConfirmation = new ConfirmSubscription(startStreaming.getStreamingType());
            Set<StreamingType> currentStreamingTypes = this.streamingSessions.keySet();
            this.dittoProtocolSub.subscribe(currentStreamingTypes, (Collection)this.authorizationContext.getAuthorizationSubjectIds(), this.getSelf()).whenComplete((ack, throwable) -> {
                if (null == throwable) {
                    this.logger.debug("subscription to Ditto pubsub succeeded");
                    this.getSelf().tell((Object)subscribeConfirmation, this.getSelf());
                } else {
                    this.logger.error(throwable, "subscription to Ditto pubsub failed: {}", (Object)throwable.getMessage());
                    DittoRuntimeException dittoRuntimeException = DittoRuntimeException.asDittoRuntimeException((Throwable)throwable, cause -> GatewayInternalErrorException.newBuilder().dittoHeaders(DittoHeaders.newBuilder().correlationId((CharSequence)startStreaming.getConnectionCorrelationId()).build()).cause(cause).build());
                    this.eventAndResponsePublisher.offer((Object)SessionedJsonifiable.error(dittoRuntimeException));
                    this.terminateWebsocketStream();
                }
            });
        }).match(StopStreaming.class, stopStreaming -> {
            this.logger.debug("Got 'StopStreaming' message in <{}> session, unsubscribing from <{}> in Cluster ...", (Object)this.type, (Object)stopStreaming.getStreamingType().name());
            this.streamingSessions.remove(stopStreaming.getStreamingType());
            ConfirmUnsubscription unsubscribeConfirmation = new ConfirmUnsubscription(stopStreaming.getStreamingType());
            Set<StreamingType> currentStreamingTypes = this.streamingSessions.keySet();
            if (stopStreaming.getStreamingType() != StreamingType.EVENTS) {
                this.dittoProtocolSub.updateLiveSubscriptions(currentStreamingTypes, (Collection)this.authorizationContext.getAuthorizationSubjectIds(), this.getSelf()).thenAccept(ack -> this.getSelf().tell((Object)unsubscribeConfirmation, this.getSelf()));
            } else {
                this.dittoProtocolSub.removeTwinSubscriber(this.getSelf(), (Collection)this.authorizationContext.getAuthorizationSubjectIds()).thenAccept(ack -> this.getSelf().tell((Object)unsubscribeConfirmation, this.getSelf()));
            }
        }).match(ConfirmSubscription.class, msg -> this.confirmSubscription(msg.getStreamingType())).match(ConfirmUnsubscription.class, msg -> this.confirmUnsubscription(msg.getStreamingType())).build();
    }

    private AbstractActor.Receive createSelfTerminationBehavior() {
        return ReceiveBuilder.create().match(Jwt.class, this::refreshWebSocketSession).match(RefreshSession.class, refreshSession -> {
            this.cancelSessionTimeout();
            this.checkAuthorizationContextAndStartSessionTimer((RefreshSession)refreshSession);
        }).match(InvalidJwt.class, invalidJwtToken -> this.cancelSessionTimeout()).match(FatalPubSubException.class, this::pubsubFailed).match(Terminated.class, this::handleTerminated).matchEquals((Object)Control.TERMINATED, this::handleTerminated).build();
    }

    private void handleTerminated(Object terminated) {
        this.logger.debug("EventAndResponsePublisher was terminated: {}", terminated);
        this.logger.info("<{}> connection was closed, unsubscribing from Streams in Cluster ...", (Object)this.type);
        this.terminateWebsocketStream();
    }

    private AbstractActor.Receive logUnknownMessage() {
        return ReceiveBuilder.create().matchAny(any -> this.logger.warning("Got unknown message in '{}' session: {} '{}'", (Object)this.type, (Object)any.getClass().getName(), any)).build();
    }

    private AbstractActor.Receive addPreprocessors(List<PartialFunction<Object, Object>> preprocessors, AbstractActor.Receive receive) {
        return preprocessors.stream().reduce(PartialFunction::andThen).map(preprocessor -> new AbstractActor.Receive(preprocessor.andThen(receive.onMessage()))).orElse(receive);
    }

    private boolean hasUndeclaredAckLabel(Acknowledgement acknowledgement) {
        return !this.declaredAcks.contains(acknowledgement.getLabel());
    }

    private void ackLabelNotDeclared(Acknowledgement ack) {
        this.publishResponseOrError(AcknowledgementLabelNotDeclaredException.of((CharSequence)ack.getLabel(), (DittoHeaders)ack.getDittoHeaders()));
    }

    private ActorRef getReturnAddress(Signal<?> signal) {
        boolean publishResponse = signal instanceof Command && signal.getDittoHeaders().isResponseRequired();
        return publishResponse ? this.getSelf() : ActorRef.noSender();
    }

    private boolean isSameOrigin(Signal<?> signal) {
        return signal.getDittoHeaders().getOrigin().stream().anyMatch(this.connectionCorrelationId::equals);
    }

    private void pubsubFailed(FatalPubSubException fatalPubSubException) {
        DittoRuntimeException exception = fatalPubSubException.asDittoRuntimeException();
        this.logger.withCorrelationId((WithDittoHeaders)exception).info("pubsubFailed cause=<{}>", (Object)exception);
        this.eventAndResponsePublisher.offer((Object)SessionedJsonifiable.error(exception));
        this.terminateWebsocketStream();
    }

    private void terminateWebsocketStream() {
        this.dittoProtocolSub.removeSubscriber(this.getSelf());
        this.getContext().stop(this.getSelf());
    }

    private Object startAckregatorAndForward(Signal<?> signal) {
        return this.ackregatorStarter.preprocess(signal, (s, shouldStart) -> {
            if (shouldStart.booleanValue()) {
                Optional<DittoHeaderInvalidException> headerInvalid = StreamingSessionActor.checkForAcksWithoutResponse(s);
                return headerInvalid.map(this::publishResponseOrError).orElseGet(() -> this.ackregatorStarter.doStart(s, this::publishResponseOrError, ackregator -> this.forwardToCommandRouterAndReturnDone((Signal<?>)s, (ActorRef)ackregator)));
            }
            return this.doNothing(s);
        }, this::publishResponseOrError);
    }

    private Object forwardToCommandRouterAndReturnDone(Signal<?> signalToForward, ActorRef ackregator) {
        this.commandRouter.tell(signalToForward, ackregator);
        return Done.getInstance();
    }

    private <T> Object doNothing(T result) {
        return result;
    }

    private Signal<?> startAckForwarder(Signal<?> signal) {
        if (signal instanceof WithThingId) {
            ThingId entityIdWithType = ((WithThingId)signal).getThingEntityId();
            return AcknowledgementForwarderActor.startAcknowledgementForwarder((ActorContext)this.getContext(), (EntityIdWithType)entityIdWithType, signal, (AcknowledgementConfig)this.acknowledgementConfig, this.declaredAcks::contains);
        }
        return signal;
    }

    private Object publishResponseOrError(Object responseOrError) {
        if (responseOrError instanceof CommandResponse) {
            CommandResponse response = (CommandResponse)responseOrError;
            this.logger.withCorrelationId((WithDittoHeaders)response).debug("Got 'CommandResponse' message in <{}> session, telling EventAndResponsePublisher about it: {}", (Object)this.type, (Object)response);
            this.eventAndResponsePublisher.offer((Object)SessionedJsonifiable.response(response));
        } else if (responseOrError instanceof DittoRuntimeException) {
            DittoRuntimeException error = (DittoRuntimeException)((Object)responseOrError);
            this.logger.withCorrelationId((WithDittoHeaders)error).debug("Got 'DittoRuntimeException' message in <{}> session, telling EventAndResponsePublisher about it: {}", (Object)this.type, (Object)error);
            this.eventAndResponsePublisher.offer((Object)SessionedJsonifiable.error(error));
        } else {
            this.logger.error("Unexpected result from AcknowledgementAggregatorActor: <{}>", responseOrError);
        }
        return Done.getInstance();
    }

    private void forwardAcknowledgementOrLiveCommandResponse(CommandResponse<?> response) {
        ActorRef sender = this.getSender();
        try {
            this.getContext().findChild(AcknowledgementForwarderActor.determineActorName((DittoHeaders)response.getDittoHeaders())).ifPresentOrElse(forwarder -> forwarder.tell((Object)response, sender), () -> {
                String template = "No AcknowledgementForwarderActor found, forwarding to concierge: <{}>";
                if (this.logger.isDebugEnabled()) {
                    this.logger.withCorrelationId((WithDittoHeaders)response).debug("No AcknowledgementForwarderActor found, forwarding to concierge: <{}>", (Object)response);
                } else {
                    this.logger.withCorrelationId((WithDittoHeaders)response).info("No AcknowledgementForwarderActor found, forwarding to concierge: <{}>", (Object)response.getType());
                }
                this.commandRouter.tell((Object)response, ActorRef.noSender());
            });
        }
        catch (DittoRuntimeException e) {
            this.eventAndResponsePublisher.offer((Object)SessionedJsonifiable.error(e));
        }
    }

    private void forwardSearchCommand(ThingSearchCommand<?> searchCommand) {
        this.subscriptionManager.tell(searchCommand, this.getSelf());
    }

    private boolean isSessionAllowedToReceiveSignal(Signal<?> signal, StreamingSession session) {
        DittoHeaders headers = signal.getDittoHeaders();
        boolean isAuthorizedToRead = this.authorizationContext.isAuthorized((Collection)headers.getReadGrantedSubjects(), (Collection)headers.getReadRevokedSubjects());
        boolean matchesNamespace = this.matchesNamespaces(signal, session);
        return isAuthorizedToRead && matchesNamespace;
    }

    private Cancellable startSessionTimeout(Instant sessionExpirationTime) {
        long timeout = sessionExpirationTime.minusMillis(Instant.now().toEpochMilli()).toEpochMilli();
        this.logger.debug("Starting session timeout - session will expire in {} ms", (Object)timeout);
        FiniteDuration sessionExpirationTimeMillis = FiniteDuration.apply((long)timeout, (TimeUnit)TimeUnit.MILLISECONDS);
        return this.getContext().getSystem().getScheduler().scheduleOnce(sessionExpirationTimeMillis, this::handleSessionTimeout, (ExecutionContext)this.getContext().getDispatcher());
    }

    private void handleSessionTimeout() {
        this.logger.info("Stopping WebSocket session for connection with ID <{}>.", (Object)this.connectionCorrelationId);
        GatewayWebsocketSessionExpiredException gatewayWebsocketSessionExpiredException = (GatewayWebsocketSessionExpiredException)GatewayWebsocketSessionExpiredException.newBuilder().dittoHeaders(DittoHeaders.newBuilder().correlationId((CharSequence)this.connectionCorrelationId).build()).build();
        this.eventAndResponsePublisher.fail((Throwable)gatewayWebsocketSessionExpiredException);
    }

    private void cancelSessionTimeout() {
        if (null != this.sessionTerminationCancellable) {
            this.sessionTerminationCancellable.cancel();
        }
    }

    private void checkAuthorizationContextAndStartSessionTimer(RefreshSession refreshSession) {
        AuthorizationContext newAuthorizationContext = refreshSession.getAuthorizationContext();
        if (!this.authorizationContext.equals(newAuthorizationContext)) {
            this.logger.debug("Authorization Context changed for WebSocket session <{}>. Terminating the session.", (Object)this.connectionCorrelationId);
            GatewayWebsocketSessionClosedException gatewayWebsocketSessionClosedException = (GatewayWebsocketSessionClosedException)GatewayWebsocketSessionClosedException.newBuilder().dittoHeaders(DittoHeaders.newBuilder().correlationId((CharSequence)this.connectionCorrelationId).build()).build();
            this.eventAndResponsePublisher.fail((Throwable)gatewayWebsocketSessionClosedException);
        } else {
            this.sessionTerminationCancellable = this.startSessionTimeout(refreshSession.getSessionTimeout());
        }
    }

    private boolean matchesNamespaces(Signal<?> signal, StreamingSession session) {
        boolean result;
        List<String> namespaces = session.getNamespaces();
        boolean bl = result = namespaces.isEmpty() || namespaces.contains(StreamingSessionActor.namespaceFromId(signal));
        if (!result) {
            this.logger.withCorrelationId(signal).debug("Signal does not match namespaces.");
        }
        return result;
    }

    private void refreshWebSocketSession(Jwt jwt) {
        String jwtConnectionCorrelationId = jwt.getConnectionCorrelationId();
        JsonWebToken jsonWebToken = ImmutableJsonWebToken.fromToken((String)jwt.toString());
        this.jwtValidator.validate(jsonWebToken).thenAccept(binaryValidationResult -> {
            if (binaryValidationResult.isValid()) {
                try {
                    AuthenticationResult authorizationResult = this.jwtAuthenticationResultProvider.getAuthenticationResult(jsonWebToken, DittoHeaders.empty());
                    AuthorizationContext jwtAuthorizationContext = authorizationResult.getDittoHeaders().getAuthorizationContext();
                    this.getSelf().tell((Object)new RefreshSession(jwtConnectionCorrelationId, jsonWebToken.getExpirationTime(), jwtAuthorizationContext), ActorRef.noSender());
                }
                catch (Exception exception) {
                    this.logger.info("Got exception when handling refreshed JWT for WebSocket session <{}>: {}", (Object)jwtConnectionCorrelationId, (Object)exception.getMessage());
                    this.getSelf().tell((Object)InvalidJwt.getInstance(), ActorRef.noSender());
                }
            } else {
                this.logger.debug("Received invalid JWT for WebSocket session <{}>. Terminating the session.", (Object)this.connectionCorrelationId);
                GatewayWebsocketSessionClosedException gatewayWebsocketSessionClosedException = (GatewayWebsocketSessionClosedException)GatewayWebsocketSessionClosedException.newBuilder().dittoHeaders(DittoHeaders.newBuilder().correlationId((CharSequence)this.connectionCorrelationId).build()).build();
                this.eventAndResponsePublisher.fail((Throwable)gatewayWebsocketSessionClosedException);
                this.getSelf().tell((Object)InvalidJwt.getInstance(), ActorRef.noSender());
            }
        });
    }

    private void declareAcknowledgementLabels(Collection<AcknowledgementLabel> acknowledgementLabels) {
        ActorRef self = this.getSelf();
        this.logger.info("Declaring acknowledgement labels <{}>", acknowledgementLabels);
        this.dittoProtocolSub.declareAcknowledgementLabels(acknowledgementLabels, self, null).thenAccept(_void -> this.logger.info("Acknowledgement label declaration successful for labels: <{}>", (Object)acknowledgementLabels)).exceptionally(error -> {
            AcknowledgementLabelNotUniqueException template = AcknowledgementLabelNotUniqueException.getInstance();
            DittoRuntimeException dittoRuntimeException = DittoRuntimeException.asDittoRuntimeException((Throwable)error, arg_0 -> StreamingSessionActor.lambda$declareAcknowledgementLabels$30((DittoRuntimeException)template, arg_0));
            this.logger.info("Acknowledgement label declaration failed for labels: <{}> - cause: {} {}", (Object)acknowledgementLabels, (Object)error.getClass().getSimpleName(), (Object)error.getMessage());
            self.tell((Object)dittoRuntimeException, ActorRef.noSender());
            return null;
        });
    }

    private static StreamingType determineStreamingType(Signal<?> signal) {
        String channel = signal.getDittoHeaders().getChannel().orElse(TopicPath.Channel.TWIN.getName());
        StreamingType streamingType = signal instanceof Event ? (channel.equals(TopicPath.Channel.TWIN.getName()) ? StreamingType.EVENTS : StreamingType.LIVE_EVENTS) : (signal instanceof MessageCommand ? StreamingType.MESSAGES : StreamingType.LIVE_COMMANDS);
        return streamingType;
    }

    @Nullable
    private static String namespaceFromId(WithId withId) {
        return NamespaceReader.fromEntityId((EntityId)withId.getEntityId()).orElse(null);
    }

    private static Criteria parseCriteria(String filter, DittoHeaders dittoHeaders) {
        CriteriaFactoryImpl criteriaFactory = new CriteriaFactoryImpl();
        ModelBasedThingsFieldExpressionFactory fieldExpressionFactory = new ModelBasedThingsFieldExpressionFactory();
        QueryFilterCriteriaFactory queryFilterCriteriaFactory = new QueryFilterCriteriaFactory((CriteriaFactory)criteriaFactory, (ThingsFieldExpressionFactory)fieldExpressionFactory);
        return queryFilterCriteriaFactory.filterCriteria(filter, dittoHeaders);
    }

    private void confirmSubscription(StreamingType streamingType) {
        if (this.outstandingSubscriptionAcks.contains(streamingType)) {
            this.outstandingSubscriptionAcks.remove(streamingType);
            this.eventAndResponsePublisher.offer((Object)SessionedJsonifiable.ack(streamingType, true, this.connectionCorrelationId));
            this.logger.debug("Subscribed to Cluster <{}> in <{}> session.", (Object)streamingType, (Object)this.type);
        } else {
            this.logger.debug("Subscription already acked for type <{}> in <{}> session.", (Object)streamingType, (Object)this.type);
        }
    }

    private void confirmUnsubscription(StreamingType streamingType) {
        this.eventAndResponsePublisher.offer((Object)SessionedJsonifiable.ack(streamingType, false, this.connectionCorrelationId));
        this.logger.debug("Unsubscribed from Cluster <{}> in <{}> session.", (Object)streamingType, (Object)this.type);
    }

    private static Optional<DittoHeaderInvalidException> checkForAcksWithoutResponse(Signal<?> signal) {
        DittoHeaders dittoHeaders = signal.getDittoHeaders();
        if (!dittoHeaders.isResponseRequired() && !dittoHeaders.getAcknowledgementRequests().isEmpty()) {
            String message = String.format("For websocket, it is forbidden to request acknowledgements while '%s' is set to false.", DittoHeaderDefinition.RESPONSE_REQUIRED.getKey());
            String description = String.format("Please set '%s' to [] or '%s' to true.", DittoHeaderDefinition.REQUESTED_ACKS.getKey(), DittoHeaderDefinition.RESPONSE_REQUIRED.getKey());
            return Optional.of((DittoHeaderInvalidException)DittoHeaderInvalidException.newCustomMessageBuilder((String)message).description(description).dittoHeaders(signal.getDittoHeaders()).build());
        }
        return Optional.empty();
    }

    private static /* synthetic */ DittoRuntimeException lambda$declareAcknowledgementLabels$30(DittoRuntimeException template, Throwable cause) {
        return DittoRuntimeException.newBuilder((DittoRuntimeException)template).cause(cause).build();
    }

    private static enum Control {
        TERMINATED;

    }

    private static final class ConfirmUnsubscription
    extends WithStreamingType {
        private ConfirmUnsubscription(StreamingType streamingType) {
            super(streamingType);
        }
    }

    private static final class ConfirmSubscription
    extends WithStreamingType {
        private ConfirmSubscription(StreamingType streamingType) {
            super(streamingType);
        }
    }

    private static abstract class WithStreamingType {
        private final StreamingType streamingType;

        private WithStreamingType(StreamingType streamingType) {
            this.streamingType = streamingType;
        }

        StreamingType getStreamingType() {
            return this.streamingType;
        }
    }
}

