package org.eclipse.ditto.services.gateway.streaming.actors;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.event.DiagnosticLoggingAdapter;
import akka.japi.Creator;
import akka.japi.pf.FI;
import akka.japi.pf.ReceiveBuilder;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.base.headers.WithDittoHeaders;
import org.eclipse.ditto.protocoladapter.TopicPath;
import org.eclipse.ditto.services.gateway.streaming.StartStreaming;
import org.eclipse.ditto.services.gateway.streaming.StopStreaming;
import org.eclipse.ditto.services.gateway.streaming.StreamingAck;
import org.eclipse.ditto.services.gateway.streaming.StreamingType;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.signals.commands.base.Command;
import org.eclipse.ditto.signals.commands.base.CommandResponse;
import org.eclipse.ditto.signals.commands.messages.MessageCommand;
import org.eclipse.ditto.signals.commands.messages.MessageCommandResponse;
import org.eclipse.ditto.signals.events.base.Event;
import scala.concurrent.duration.FiniteDuration;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/eclipse/ditto/services/gateway/streaming/actors/StreamingSessionActor.class */
public final class StreamingSessionActor extends AbstractActor {
    private static final int MAX_SUBSCRIBE_TIMEOUT_MS = 2500;
    private final DiagnosticLoggingAdapter logger;
    private final String connectionCorrelationId;
    private final String type;
    private final ActorRef pubSubMediator;
    private final ActorRef eventAndResponsePublisher;
    private final Set<String> outstandingLiveSignals;
    private final Map<String, ActorRef> responseAwaitingLiveSignals;
    private final Set<StreamingType> outstandingSubscriptionAcks;
    private List<String> authorizationSubjects;

    private StreamingSessionActor(String str, String str2, ActorRef actorRef, ActorRef actorRef2) {
        this.logger = LogUtil.obtain(this);
        this.connectionCorrelationId = str;
        this.type = str2;
        this.pubSubMediator = actorRef;
        this.eventAndResponsePublisher = actorRef2;
        this.outstandingLiveSignals = new HashSet();
        this.responseAwaitingLiveSignals = new HashMap();
        this.outstandingSubscriptionAcks = new HashSet();
        getContext().watch(actorRef2);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Props props(final String str, final String str2, final ActorRef actorRef, final ActorRef actorRef2) {
        return Props.create(StreamingSessionActor.class, new Creator<StreamingSessionActor>() { // from class: org.eclipse.ditto.services.gateway.streaming.actors.StreamingSessionActor.1
            private static final long serialVersionUID = 1;

            /* renamed from: create, reason: merged with bridge method [inline-methods] */
            public StreamingSessionActor m7create() throws Exception {
                return new StreamingSessionActor(str, str2, actorRef, actorRef2);
            }
        });
    }

    public void postStop() throws Exception {
        LogUtil.enhanceLogWithCorrelationId(this.logger, this.connectionCorrelationId);
        this.logger.info("Closing '{}' streaming session: {}", this.type, this.connectionCorrelationId);
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(Signal.class, signal -> {
            return isLiveSignal(signal) && wasIssuedByThisSession(signal) && (signal instanceof MessageCommand);
        }, signal2 -> {
            this.logger.debug("Don't publishing message again - was already published: <{}>", signal2);
        }).match(Signal.class, signal3 -> {
            return isLiveSignal(signal3) && wasIssuedByThisSession(signal3) && (signal3 instanceof MessageCommandResponse);
        }, publishLiveSignal(StreamingType.MESSAGES.getDistributedPubSubTopic())).match(Signal.class, signal4 -> {
            return isLiveSignal(signal4) && wasIssuedByThisSession(signal4) && ((signal4 instanceof Command) || (signal4 instanceof CommandResponse));
        }, publishLiveSignal(StreamingType.LIVE_COMMANDS.getDistributedPubSubTopic())).match(Signal.class, signal5 -> {
            return isLiveSignal(signal5) && wasIssuedByThisSession(signal5) && (signal5 instanceof Event);
        }, publishLiveSignal(StreamingType.LIVE_EVENTS.getDistributedPubSubTopic())).match(Signal.class, this::isLiveSignal, signal6 -> {
            LogUtil.enhanceLogWithCorrelationId(this.logger, signal6);
            acknowledgeSubscriptionForLiveSignal(signal6);
            DittoHeaders dittoHeaders = signal6.getDittoHeaders();
            if (((Boolean) dittoHeaders.getCorrelationId().map(str -> {
                return Boolean.valueOf(str.startsWith(this.connectionCorrelationId));
            }).orElse(false)).booleanValue()) {
                this.logger.debug("Got 'Live' Signal <{}> in <{}> session, but this was issued by this connection itself, not telling eventAndResponsePublisher about it", signal6.getType(), this.type);
                return;
            }
            if (this.authorizationSubjects == null || Collections.disjoint(dittoHeaders.getReadSubjects(), this.authorizationSubjects)) {
                return;
            }
            this.logger.debug("Got 'Live' Signal <{}> in <{}> session, telling eventAndResponsePublisher about it: {}", signal6.getType(), this.type, signal6);
            if (signal6 instanceof Command) {
                extractActualCorrelationId(signal6).ifPresent(str2 -> {
                    this.responseAwaitingLiveSignals.put(str2, getSender());
                });
            }
            this.eventAndResponsePublisher.tell(signal6, getSelf());
        }).match(Command.class, command -> {
            LogUtil.enhanceLogWithCorrelationId(this.logger, command);
            this.logger.debug("Got 'Command' message in <{}> session, telling eventAndResponsePublisher about it: {}", this.type, command);
            this.eventAndResponsePublisher.forward(command, getContext());
        }).match(CommandResponse.class, commandResponse -> {
            LogUtil.enhanceLogWithCorrelationId(this.logger, commandResponse);
            this.logger.debug("Got 'CommandResponse' message in <{}> session, telling eventAndResponsePublisher about it: {}", this.type, commandResponse);
            this.eventAndResponsePublisher.forward(commandResponse, getContext());
        }).match(DittoRuntimeException.class, dittoRuntimeException -> {
            LogUtil.enhanceLogWithCorrelationId(this.logger, dittoRuntimeException);
            this.logger.info("Got 'DittoRuntimeException' message in <{}> session, telling eventAndResponsePublisher about it: {}", this.type, dittoRuntimeException);
            this.eventAndResponsePublisher.forward(dittoRuntimeException, getContext());
        }).match(Event.class, event -> {
            LogUtil.enhanceLogWithCorrelationId(this.logger, event);
            if (this.outstandingSubscriptionAcks.contains(StreamingType.EVENTS)) {
                acknowledgeSubscription(StreamingType.EVENTS, getSelf());
            }
            DittoHeaders dittoHeaders = event.getDittoHeaders();
            if (((Boolean) dittoHeaders.getCorrelationId().map(str -> {
                return Boolean.valueOf(str.startsWith(this.connectionCorrelationId));
            }).orElse(false)).booleanValue()) {
                this.logger.debug("Got 'Event' message in <{}> session, but this was issued by this connection itself, not telling eventAndResponsePublisher about it", this.type);
                return;
            }
            Set readSubjects = dittoHeaders.getReadSubjects();
            if (this.authorizationSubjects == null || Collections.disjoint(readSubjects, this.authorizationSubjects)) {
                return;
            }
            this.logger.debug("Got 'Event' message in <{}> session, telling eventAndResponsePublisher about it: {}", this.type, event);
            this.eventAndResponsePublisher.tell(event, getSelf());
        }).match(StartStreaming.class, startStreaming -> {
            this.authorizationSubjects = startStreaming.getAuthorizationContext().getAuthorizationSubjectIds();
            LogUtil.enhanceLogWithCorrelationId(this.logger, this.connectionCorrelationId);
            this.logger.debug("Got 'StartStreaming' message in <{}> session, subscribing for <{}> in Cluster..", this.type, startStreaming.getStreamingType().name());
            this.outstandingSubscriptionAcks.add(startStreaming.getStreamingType());
            this.pubSubMediator.tell(new DistributedPubSubMediator.Subscribe(startStreaming.getStreamingType().getDistributedPubSubTopic(), this.connectionCorrelationId, getSelf()), getSelf());
        }).match(StopStreaming.class, stopStreaming -> {
            LogUtil.enhanceLogWithCorrelationId(this.logger, this.connectionCorrelationId);
            this.logger.debug("Got 'StopStreaming' message in <{}> session, unsubscribing from <{}> in Cluster..", this.type, stopStreaming.getStreamingType().name());
            this.pubSubMediator.tell(new DistributedPubSubMediator.Unsubscribe(stopStreaming.getStreamingType().getDistributedPubSubTopic(), this.connectionCorrelationId, getSelf()), getSelf());
        }).match(DistributedPubSubMediator.SubscribeAck.class, subscribeAck -> {
            LogUtil.enhanceLogWithCorrelationId(this.logger, this.connectionCorrelationId);
            StreamingType fromTopic = StreamingType.fromTopic(subscribeAck.subscribe().topic());
            ActorRef self = getSelf();
            getContext().getSystem().scheduler().scheduleOnce(FiniteDuration.apply(2500L, TimeUnit.MILLISECONDS), () -> {
                acknowledgeSubscription(fromTopic, self);
            }, getContext().getSystem().dispatcher());
        }).match(DistributedPubSubMediator.UnsubscribeAck.class, unsubscribeAck -> {
            LogUtil.enhanceLogWithCorrelationId(this.logger, this.connectionCorrelationId);
            StreamingType fromTopic = StreamingType.fromTopic(unsubscribeAck.unsubscribe().topic());
            ActorRef self = getSelf();
            getContext().getSystem().scheduler().scheduleOnce(FiniteDuration.apply(2500L, TimeUnit.MILLISECONDS), () -> {
                acknowledgeUnsubscription(fromTopic, self);
            }, getContext().getSystem().dispatcher());
        }).match(Terminated.class, terminated -> {
            LogUtil.enhanceLogWithCorrelationId(this.logger, this.connectionCorrelationId);
            this.logger.debug("eventAndResponsePublisher was terminated");
            this.logger.info("<{}> connection was closed, unsubscribing from Streams in Cluster..", this.type);
            Arrays.stream(StreamingType.values()).map((v0) -> {
                return v0.getDistributedPubSubTopic();
            }).forEach(str -> {
                this.pubSubMediator.tell(new DistributedPubSubMediator.Unsubscribe(str, this.connectionCorrelationId, getSelf()), getSelf());
            });
            getContext().getSystem().scheduler().scheduleOnce(FiniteDuration.apply(1L, TimeUnit.SECONDS), getSelf(), PoisonPill.getInstance(), getContext().dispatcher(), getSelf());
        }).matchAny(obj -> {
            LogUtil.enhanceLogWithCorrelationId(this.logger, this.connectionCorrelationId);
            this.logger.warning("Got unknown message in '{}' session: '{}'", this.type, obj);
        }).build();
    }

    private void acknowledgeSubscriptionForLiveSignal(Signal signal) {
        if (signal instanceof MessageCommand) {
            if (this.outstandingSubscriptionAcks.contains(StreamingType.MESSAGES)) {
                acknowledgeSubscription(StreamingType.MESSAGES, getSelf());
            }
        } else if (signal instanceof Command) {
            if (this.outstandingSubscriptionAcks.contains(StreamingType.LIVE_COMMANDS)) {
                acknowledgeSubscription(StreamingType.LIVE_COMMANDS, getSelf());
            }
        } else if ((signal instanceof Event) && this.outstandingSubscriptionAcks.contains(StreamingType.LIVE_EVENTS)) {
            acknowledgeSubscription(StreamingType.LIVE_EVENTS, getSelf());
        }
    }

    private void acknowledgeSubscription(StreamingType streamingType, ActorRef actorRef) {
        this.outstandingSubscriptionAcks.remove(streamingType);
        this.eventAndResponsePublisher.tell(new StreamingAck(streamingType, true), actorRef);
        this.logger.debug("Subscribed to Cluster <{}> in <{}> session", streamingType, this.type);
    }

    private void acknowledgeUnsubscription(StreamingType streamingType, ActorRef actorRef) {
        this.eventAndResponsePublisher.tell(new StreamingAck(streamingType, false), actorRef);
        this.logger.debug("Unsubscribed from Cluster <{}> in <{}> session", streamingType, this.type);
    }

    private FI.UnitApply<Signal> publishLiveSignal(String str) {
        return signal -> {
            acknowledgeSubscriptionForLiveSignal(signal);
            LogUtil.enhanceLogWithCorrelationId(this.logger, signal);
            if (notYetPublished(signal)) {
                this.logger.debug("Publishing 'Live' Signal <{}> on topic <{}> into cluster.", signal.getType(), str);
                this.pubSubMediator.forward(new DistributedPubSubMediator.Publish(str, signal, true), getContext());
                savePublished(signal);
            }
            if (signal instanceof CommandResponse) {
                Optional<String> extractActualCorrelationId = extractActualCorrelationId(signal);
                Map<String, ActorRef> map = this.responseAwaitingLiveSignals;
                map.getClass();
                if (extractActualCorrelationId.filter((v1) -> {
                    return r1.containsKey(v1);
                }).isPresent()) {
                    Optional<String> extractActualCorrelationId2 = extractActualCorrelationId(signal);
                    Map<String, ActorRef> map2 = this.responseAwaitingLiveSignals;
                    map2.getClass();
                    extractActualCorrelationId2.map((v1) -> {
                        return r1.remove(v1);
                    }).ifPresent(actorRef -> {
                        this.logger.debug("Answering to a 'Live' Command with CommandResponse <{}> to sender: <{}>", signal.getType(), actorRef);
                        actorRef.forward(signal, getContext());
                    });
                }
            }
        };
    }

    private boolean notYetPublished(Signal<?> signal) {
        return !extractActualCorrelationId(signal).filter(str -> {
            return this.outstandingLiveSignals.contains(signal.getType() + ":" + str);
        }).isPresent();
    }

    private boolean isLiveSignal(Signal<?> signal) {
        Optional channel = signal.getDittoHeaders().getChannel();
        String name = TopicPath.Channel.LIVE.getName();
        name.getClass();
        return channel.filter((v1) -> {
            return r1.equals(v1);
        }).isPresent();
    }

    private boolean wasIssuedByThisSession(Signal<?> signal) {
        Optional<String> extractConnectionCorrelationId = extractConnectionCorrelationId(signal);
        String str = this.connectionCorrelationId;
        str.getClass();
        return extractConnectionCorrelationId.filter((v1) -> {
            return r1.equals(v1);
        }).isPresent();
    }

    private void savePublished(Signal<?> signal) {
        extractActualCorrelationId(signal).ifPresent(str -> {
            this.outstandingLiveSignals.add(signal.getType() + ":" + str);
        });
    }

    private static Optional<String> extractConnectionCorrelationId(WithDittoHeaders withDittoHeaders) {
        return withDittoHeaders.getDittoHeaders().getCorrelationId().map(str -> {
            return str.split(":", 2);
        }).map(strArr -> {
            return strArr[0];
        });
    }

    private static Optional<String> extractActualCorrelationId(WithDittoHeaders withDittoHeaders) {
        return withDittoHeaders.getDittoHeaders().getCorrelationId().map(str -> {
            return str.split(":", 2);
        }).filter(strArr -> {
            return strArr.length >= 1;
        }).map(strArr2 -> {
            return strArr2.length == 2 ? strArr2[1] : strArr2[0];
        });
    }
}
