/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.ditto.services.gateway.streaming.actors;

import akka.actor.AbstractActor;
import akka.actor.Actor;
import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.event.DiagnosticLoggingAdapter;
import akka.japi.Creator;
import akka.japi.pf.FI;
import akka.japi.pf.ReceiveBuilder;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.base.headers.WithDittoHeaders;
import org.eclipse.ditto.protocoladapter.TopicPath;
import org.eclipse.ditto.services.gateway.streaming.StartStreaming;
import org.eclipse.ditto.services.gateway.streaming.StopStreaming;
import org.eclipse.ditto.services.gateway.streaming.StreamingAck;
import org.eclipse.ditto.services.gateway.streaming.StreamingType;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.signals.commands.base.Command;
import org.eclipse.ditto.signals.commands.base.CommandResponse;
import org.eclipse.ditto.signals.commands.messages.MessageCommand;
import org.eclipse.ditto.signals.commands.messages.MessageCommandResponse;
import org.eclipse.ditto.signals.events.base.Event;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.FiniteDuration;

final class StreamingSessionActor
extends AbstractActor {
    private static final int MAX_SUBSCRIBE_TIMEOUT_MS = 2500;
    private final DiagnosticLoggingAdapter logger = LogUtil.obtain((Actor)this);
    private final String connectionCorrelationId;
    private final String type;
    private final ActorRef pubSubMediator;
    private final ActorRef eventAndResponsePublisher;
    private final Set<String> outstandingLiveSignals;
    private final Map<String, ActorRef> responseAwaitingLiveSignals;
    private final Set<StreamingType> outstandingSubscriptionAcks;
    private List<String> authorizationSubjects;

    private StreamingSessionActor(String connectionCorrelationId, String type, ActorRef pubSubMediator, ActorRef eventAndResponsePublisher) {
        this.connectionCorrelationId = connectionCorrelationId;
        this.type = type;
        this.pubSubMediator = pubSubMediator;
        this.eventAndResponsePublisher = eventAndResponsePublisher;
        this.outstandingLiveSignals = new HashSet<String>();
        this.responseAwaitingLiveSignals = new HashMap<String, ActorRef>();
        this.outstandingSubscriptionAcks = new HashSet<StreamingType>();
        this.getContext().watch(eventAndResponsePublisher);
    }

    static Props props(final String connectionCorrelationId, final String type, final ActorRef pubSubMediator, final ActorRef eventAndResponsePublisher) {
        return Props.create(StreamingSessionActor.class, (Creator)new Creator<StreamingSessionActor>(){
            private static final long serialVersionUID = 1L;

            public StreamingSessionActor create() throws Exception {
                return new StreamingSessionActor(connectionCorrelationId, type, pubSubMediator, eventAndResponsePublisher);
            }
        });
    }

    public void postStop() throws Exception {
        LogUtil.enhanceLogWithCorrelationId((DiagnosticLoggingAdapter)this.logger, (String)this.connectionCorrelationId);
        this.logger.info("Closing '{}' streaming session: {}", (Object)this.type, (Object)this.connectionCorrelationId);
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(Signal.class, signal -> this.isLiveSignal((Signal<?>)signal) && this.wasIssuedByThisSession((Signal<?>)signal) && (signal instanceof MessageCommand || signal instanceof MessageCommandResponse), this.publishLiveSignal(StreamingType.MESSAGES.getDistributedPubSubTopic())).match(Signal.class, signal -> this.isLiveSignal((Signal<?>)signal) && this.wasIssuedByThisSession((Signal<?>)signal) && (signal instanceof Command || signal instanceof CommandResponse), this.publishLiveSignal(StreamingType.LIVE_COMMANDS.getDistributedPubSubTopic())).match(Signal.class, signal -> this.isLiveSignal((Signal<?>)signal) && this.wasIssuedByThisSession((Signal<?>)signal) && signal instanceof Event, this.publishLiveSignal(StreamingType.LIVE_EVENTS.getDistributedPubSubTopic())).match(Signal.class, this::isLiveSignal, liveSignal -> {
            LogUtil.enhanceLogWithCorrelationId((DiagnosticLoggingAdapter)this.logger, (WithDittoHeaders)liveSignal);
            this.acknowledgeSubscriptionForLiveSignal((Signal)liveSignal);
            DittoHeaders dittoHeaders = liveSignal.getDittoHeaders();
            Optional correlationId = dittoHeaders.getCorrelationId();
            if (correlationId.map(cId -> cId.startsWith(this.connectionCorrelationId)).orElse(false).booleanValue()) {
                this.logger.debug("Got 'Live' Signal <{}> in <{}> session, but this was issued by this connection itself, not telling eventAndResponsePublisher about it", (Object)liveSignal.getType(), (Object)this.type);
            } else if (this.authorizationSubjects != null && !Collections.disjoint(dittoHeaders.getReadSubjects(), this.authorizationSubjects)) {
                this.logger.debug("Got 'Live' Signal <{}> in <{}> session, telling eventAndResponsePublisher about it: {}", (Object)liveSignal.getType(), (Object)this.type, liveSignal);
                if (liveSignal instanceof Command) {
                    StreamingSessionActor.extractActualCorrelationId((WithDittoHeaders)liveSignal).ifPresent(cId -> this.responseAwaitingLiveSignals.put((String)cId, this.getSender()));
                }
                this.eventAndResponsePublisher.tell(liveSignal, this.getSelf());
            }
        }).match(Command.class, command -> {
            LogUtil.enhanceLogWithCorrelationId((DiagnosticLoggingAdapter)this.logger, (WithDittoHeaders)command);
            this.logger.debug("Got 'Command' message in <{}> session, telling eventAndResponsePublisher about it: {}", (Object)this.type, command);
            this.eventAndResponsePublisher.forward(command, (ActorContext)this.getContext());
        }).match(CommandResponse.class, response -> {
            LogUtil.enhanceLogWithCorrelationId((DiagnosticLoggingAdapter)this.logger, (WithDittoHeaders)response);
            this.logger.debug("Got 'CommandResponse' message in <{}> session, telling eventAndResponsePublisher about it: {}", (Object)this.type, response);
            this.eventAndResponsePublisher.forward(response, (ActorContext)this.getContext());
        }).match(DittoRuntimeException.class, cre -> {
            LogUtil.enhanceLogWithCorrelationId((DiagnosticLoggingAdapter)this.logger, (WithDittoHeaders)cre);
            this.logger.info("Got 'DittoRuntimeException' message in <{}> session, telling eventAndResponsePublisher about it: {}", (Object)this.type, cre);
            this.eventAndResponsePublisher.forward(cre, (ActorContext)this.getContext());
        }).match(Event.class, event -> {
            DittoHeaders dittoHeaders;
            Optional correlationId;
            LogUtil.enhanceLogWithCorrelationId((DiagnosticLoggingAdapter)this.logger, (WithDittoHeaders)event);
            if (this.outstandingSubscriptionAcks.contains((Object)StreamingType.EVENTS)) {
                this.acknowledgeSubscription(StreamingType.EVENTS, this.getSelf());
            }
            if ((correlationId = (dittoHeaders = event.getDittoHeaders()).getCorrelationId()).map(cId -> cId.startsWith(this.connectionCorrelationId)).orElse(false).booleanValue()) {
                this.logger.debug("Got 'Event' message in <{}> session, but this was issued by this connection itself, not telling eventAndResponsePublisher about it", (Object)this.type);
            } else {
                Set readSubjects = dittoHeaders.getReadSubjects();
                if (this.authorizationSubjects != null && !Collections.disjoint(readSubjects, this.authorizationSubjects)) {
                    this.logger.debug("Got 'Event' message in <{}> session, telling eventAndResponsePublisher about it: {}", (Object)this.type, event);
                    this.eventAndResponsePublisher.tell(event, this.getSelf());
                }
            }
        }).match(StartStreaming.class, startStreaming -> {
            this.authorizationSubjects = startStreaming.getAuthorizationContext().getAuthorizationSubjectIds();
            LogUtil.enhanceLogWithCorrelationId((DiagnosticLoggingAdapter)this.logger, (String)this.connectionCorrelationId);
            this.logger.debug("Got 'StartStreaming' message in <{}> session, subscribing for <{}> in Cluster..", (Object)this.type, (Object)startStreaming.getStreamingType().name());
            this.outstandingSubscriptionAcks.add(startStreaming.getStreamingType());
            this.pubSubMediator.tell((Object)new DistributedPubSubMediator.Subscribe(startStreaming.getStreamingType().getDistributedPubSubTopic(), this.connectionCorrelationId, this.getSelf()), this.getSelf());
        }).match(StopStreaming.class, stopStreaming -> {
            LogUtil.enhanceLogWithCorrelationId((DiagnosticLoggingAdapter)this.logger, (String)this.connectionCorrelationId);
            this.logger.debug("Got 'StopStreaming' message in <{}> session, unsubscribing from <{}> in Cluster..", (Object)this.type, (Object)stopStreaming.getStreamingType().name());
            this.pubSubMediator.tell((Object)new DistributedPubSubMediator.Unsubscribe(stopStreaming.getStreamingType().getDistributedPubSubTopic(), this.connectionCorrelationId, this.getSelf()), this.getSelf());
        }).match(DistributedPubSubMediator.SubscribeAck.class, subscribeAck -> {
            LogUtil.enhanceLogWithCorrelationId((DiagnosticLoggingAdapter)this.logger, (String)this.connectionCorrelationId);
            String topic = subscribeAck.subscribe().topic();
            StreamingType streamingType = StreamingType.fromTopic(topic);
            ActorRef self = this.getSelf();
            this.getContext().getSystem().scheduler().scheduleOnce(FiniteDuration.apply((long)2500L, (TimeUnit)TimeUnit.MILLISECONDS), () -> this.acknowledgeSubscription(streamingType, self), (ExecutionContext)this.getContext().getSystem().dispatcher());
        }).match(DistributedPubSubMediator.UnsubscribeAck.class, unsubscribeAck -> {
            LogUtil.enhanceLogWithCorrelationId((DiagnosticLoggingAdapter)this.logger, (String)this.connectionCorrelationId);
            String topic = unsubscribeAck.unsubscribe().topic();
            StreamingType streamingType = StreamingType.fromTopic(topic);
            ActorRef self = this.getSelf();
            this.getContext().getSystem().scheduler().scheduleOnce(FiniteDuration.apply((long)2500L, (TimeUnit)TimeUnit.MILLISECONDS), () -> this.acknowledgeUnsubscription(streamingType, self), (ExecutionContext)this.getContext().getSystem().dispatcher());
        }).match(Terminated.class, terminated -> {
            LogUtil.enhanceLogWithCorrelationId((DiagnosticLoggingAdapter)this.logger, (String)this.connectionCorrelationId);
            this.logger.debug("eventAndResponsePublisher was terminated");
            this.logger.info("<{}> connection was closed, unsubscribing from Streams in Cluster..", (Object)this.type);
            Arrays.stream(StreamingType.values()).map(StreamingType::getDistributedPubSubTopic).forEach(topic -> this.pubSubMediator.tell((Object)new DistributedPubSubMediator.Unsubscribe(topic, this.connectionCorrelationId, this.getSelf()), this.getSelf()));
            this.getContext().getSystem().scheduler().scheduleOnce(FiniteDuration.apply((long)1L, (TimeUnit)TimeUnit.SECONDS), this.getSelf(), (Object)PoisonPill.getInstance(), (ExecutionContext)this.getContext().dispatcher(), this.getSelf());
        }).matchAny(any -> {
            LogUtil.enhanceLogWithCorrelationId((DiagnosticLoggingAdapter)this.logger, (String)this.connectionCorrelationId);
            this.logger.warning("Got unknown message in '{}' session: '{}'", (Object)this.type, any);
        }).build();
    }

    private void acknowledgeSubscriptionForLiveSignal(Signal liveSignal) {
        if (liveSignal instanceof MessageCommand) {
            if (this.outstandingSubscriptionAcks.contains((Object)StreamingType.MESSAGES)) {
                this.acknowledgeSubscription(StreamingType.MESSAGES, this.getSelf());
            }
        } else if (liveSignal instanceof Command) {
            if (this.outstandingSubscriptionAcks.contains((Object)StreamingType.LIVE_COMMANDS)) {
                this.acknowledgeSubscription(StreamingType.LIVE_COMMANDS, this.getSelf());
            }
        } else if (liveSignal instanceof Event && this.outstandingSubscriptionAcks.contains((Object)StreamingType.LIVE_EVENTS)) {
            this.acknowledgeSubscription(StreamingType.LIVE_EVENTS, this.getSelf());
        }
    }

    private void acknowledgeSubscription(StreamingType streamingType, ActorRef self) {
        this.outstandingSubscriptionAcks.remove((Object)streamingType);
        this.eventAndResponsePublisher.tell((Object)new StreamingAck(streamingType, true), self);
        this.logger.debug("Subscribed to Cluster <{}> in <{}> session", (Object)streamingType, (Object)this.type);
    }

    private void acknowledgeUnsubscription(StreamingType streamingType, ActorRef self) {
        this.eventAndResponsePublisher.tell((Object)new StreamingAck(streamingType, false), self);
        this.logger.debug("Unsubscribed from Cluster <{}> in <{}> session", (Object)streamingType, (Object)this.type);
    }

    private FI.UnitApply<Signal> publishLiveSignal(String topic) {
        return liveSignal -> {
            this.acknowledgeSubscriptionForLiveSignal((Signal)liveSignal);
            LogUtil.enhanceLogWithCorrelationId((DiagnosticLoggingAdapter)this.logger, (WithDittoHeaders)liveSignal);
            if (this.notYetPublished((Signal<?>)liveSignal)) {
                this.logger.debug("Publishing 'Live' Signal <{}> on topic <{}> into cluster.", (Object)liveSignal.getType(), (Object)topic);
                this.pubSubMediator.forward((Object)new DistributedPubSubMediator.Publish(topic, liveSignal, true), (ActorContext)this.getContext());
                this.savePublished((Signal<?>)liveSignal);
            }
            if (liveSignal instanceof CommandResponse) {
                if (StreamingSessionActor.extractActualCorrelationId((WithDittoHeaders)liveSignal).filter(this.responseAwaitingLiveSignals::containsKey).isPresent()) {
                    StreamingSessionActor.extractActualCorrelationId((WithDittoHeaders)liveSignal).map(this.responseAwaitingLiveSignals::remove).ifPresent(sender -> {
                        this.logger.debug("Answering to a 'Live' Command with CommandResponse <{}> to sender: <{}>", (Object)liveSignal.getType(), sender);
                        sender.forward(liveSignal, (ActorContext)this.getContext());
                    });
                }
            }
        };
    }

    private boolean notYetPublished(Signal<?> liveSignal) {
        return !StreamingSessionActor.extractActualCorrelationId(liveSignal).filter(cId -> this.outstandingLiveSignals.contains(liveSignal.getType() + ":" + cId)).isPresent();
    }

    private boolean isLiveSignal(Signal<?> signal) {
        return signal.getDittoHeaders().getChannel().filter(TopicPath.Channel.LIVE.getName()::equals).isPresent();
    }

    private boolean wasIssuedByThisSession(Signal<?> signal) {
        return StreamingSessionActor.extractConnectionCorrelationId(signal).filter(this.connectionCorrelationId::equals).isPresent();
    }

    private void savePublished(Signal<?> signal) {
        StreamingSessionActor.extractActualCorrelationId(signal).ifPresent(cId -> this.outstandingLiveSignals.add(signal.getType() + ":" + cId));
    }

    private static Optional<String> extractConnectionCorrelationId(WithDittoHeaders withDittoHeaders) {
        return withDittoHeaders.getDittoHeaders().getCorrelationId().map(cId -> cId.split(":", 2)).map(cIds -> cIds[0]);
    }

    private static Optional<String> extractActualCorrelationId(WithDittoHeaders withDittoHeaders) {
        return withDittoHeaders.getDittoHeaders().getCorrelationId().map(cId -> cId.split(":", 2)).filter(cIds -> ((String[])cIds).length >= 1).map(cIds -> ((String[])cIds).length == 2 ? cIds[1] : cIds[0]);
    }
}

