/*
 * Decompiled with CFR 0.152.
 */
package io.atomix.copycat.client.session;

import io.atomix.catalyst.concurrent.Listener;
import io.atomix.catalyst.concurrent.ThreadContext;
import io.atomix.catalyst.transport.Connection;
import io.atomix.catalyst.util.Assert;
import io.atomix.copycat.client.session.ClientSequencer;
import io.atomix.copycat.client.session.ClientSessionState;
import io.atomix.copycat.protocol.PublishRequest;
import io.atomix.copycat.protocol.ResetRequest;
import io.atomix.copycat.session.Event;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.function.Consumer;

final class ClientSessionListener {
    private final Connection connection;
    private final ClientSessionState state;
    private final ThreadContext context;
    private final Map<String, Set<Consumer>> eventListeners = new ConcurrentHashMap<String, Set<Consumer>>();
    private final ClientSequencer sequencer;

    public ClientSessionListener(Connection connection, ClientSessionState state, ClientSequencer sequencer, ThreadContext context) {
        this.connection = (Connection)Assert.notNull((Object)connection, (String)"connection");
        this.state = (ClientSessionState)Assert.notNull((Object)state, (String)"state");
        this.context = (ThreadContext)Assert.notNull((Object)context, (String)"context");
        this.sequencer = (ClientSequencer)Assert.notNull((Object)sequencer, (String)"sequencer");
        connection.handler(PublishRequest.class, this::handlePublish);
    }

    public Listener<Void> onEvent(String event, Runnable callback) {
        return this.onEvent(event, (T v) -> callback.run());
    }

    public <T> Listener<T> onEvent(String event, final Consumer listener) {
        final Set listeners = this.eventListeners.computeIfAbsent(event, e -> new CopyOnWriteArraySet());
        listeners.add(listener);
        return new Listener<T>(){

            public void accept(T event) {
                listener.accept(event);
            }

            public void close() {
                listeners.remove(listener);
            }
        };
    }

    private void handlePublish(PublishRequest request) {
        this.state.getLogger().trace("{} - Received {}", (Object)this.state.getSessionId(), (Object)request);
        if (request.session() != this.state.getSessionId()) {
            this.state.getLogger().trace("{} - Inconsistent session ID: {}", (Object)this.state.getSessionId(), (Object)request.session());
            return;
        }
        if (request.eventIndex() <= this.state.getEventIndex()) {
            return;
        }
        if (request.previousIndex() != this.state.getEventIndex()) {
            this.state.getLogger().trace("{} - Inconsistent event index: {}", (Object)this.state.getSessionId(), (Object)request.previousIndex());
            this.connection.send((Object)((ResetRequest.Builder)ResetRequest.builder().withSession(this.state.getSessionId())).withIndex(this.state.getEventIndex()).build());
            return;
        }
        this.state.setEventIndex(request.eventIndex());
        this.sequencer.sequenceEvent(request, () -> {
            for (Event event : request.events()) {
                Set<Consumer> listeners = this.eventListeners.get(event.name());
                if (listeners == null) continue;
                for (Consumer listener : listeners) {
                    listener.accept(event.message());
                }
            }
        });
    }

    public CompletableFuture<Void> close() {
        return CompletableFuture.completedFuture(null);
    }
}

