/*
 * Decompiled with CFR 0.152.
 */
package io.atomix.protocols.log.impl;

import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import io.atomix.cluster.ClusterMembershipEvent;
import io.atomix.cluster.ClusterMembershipEventListener;
import io.atomix.cluster.ClusterMembershipService;
import io.atomix.cluster.Member;
import io.atomix.cluster.MemberId;
import io.atomix.primitive.PrimitiveException;
import io.atomix.primitive.PrimitiveState;
import io.atomix.primitive.log.LogConsumer;
import io.atomix.primitive.log.LogProducer;
import io.atomix.primitive.log.LogRecord;
import io.atomix.primitive.log.LogSession;
import io.atomix.primitive.partition.PartitionId;
import io.atomix.primitive.partition.PrimaryElection;
import io.atomix.primitive.partition.PrimaryElectionEventListener;
import io.atomix.primitive.partition.PrimaryTerm;
import io.atomix.primitive.session.SessionId;
import io.atomix.protocols.log.protocol.AppendRequest;
import io.atomix.protocols.log.protocol.ConsumeRequest;
import io.atomix.protocols.log.protocol.LogClientProtocol;
import io.atomix.protocols.log.protocol.LogResponse;
import io.atomix.protocols.log.protocol.RecordsRequest;
import io.atomix.protocols.log.protocol.ResetRequest;
import io.atomix.utils.concurrent.ThreadContext;
import io.atomix.utils.event.EventListener;
import io.atomix.utils.logging.ContextualLoggerFactory;
import io.atomix.utils.logging.LoggerContext;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import org.slf4j.Logger;

public class DistributedLogSession
implements LogSession {
    private final PartitionId partitionId;
    private final SessionId sessionId;
    private final LogClientProtocol protocol;
    private final PrimaryElection primaryElection;
    private final ThreadContext threadContext;
    private final Set<Consumer<PrimitiveState>> stateChangeListeners = Sets.newIdentityHashSet();
    private final ClusterMembershipEventListener membershipEventListener = this::handleClusterEvent;
    private final PrimaryElectionEventListener primaryElectionListener = event -> this.changeReplicas(event.term());
    private final DistributedLogProducer producer = new DistributedLogProducer();
    private final DistributedLogConsumer consumer = new DistributedLogConsumer();
    private final MemberId memberId;
    private final String subject;
    private PrimaryTerm term;
    private volatile PrimitiveState state = PrimitiveState.CONNECTED;
    private final Logger log;

    public DistributedLogSession(PartitionId partitionId, SessionId sessionId, ClusterMembershipService clusterMembershipService, LogClientProtocol protocol, PrimaryElection primaryElection, ThreadContext threadContext) {
        this.partitionId = (PartitionId)Preconditions.checkNotNull((Object)partitionId, (Object)"partitionId cannot be null");
        this.sessionId = (SessionId)Preconditions.checkNotNull((Object)sessionId, (Object)"sessionId cannot be null");
        this.protocol = (LogClientProtocol)Preconditions.checkNotNull((Object)protocol, (Object)"protocol cannot be null");
        this.primaryElection = (PrimaryElection)Preconditions.checkNotNull((Object)primaryElection, (Object)"primaryElection cannot be null");
        this.threadContext = (ThreadContext)Preconditions.checkNotNull((Object)threadContext, (Object)"threadContext cannot be null");
        this.memberId = clusterMembershipService.getLocalMember().id();
        this.subject = String.format("%s-%s-%s", partitionId.group(), partitionId.id(), sessionId);
        clusterMembershipService.addListener((EventListener)this.membershipEventListener);
        primaryElection.addListener((EventListener)this.primaryElectionListener);
        this.log = ContextualLoggerFactory.getLogger(this.getClass(), (LoggerContext)LoggerContext.builder(DistributedLogProducer.class).addValue(partitionId.group() != null ? String.format("%s-%d", partitionId.group(), partitionId.id()) : partitionId.id()).build());
    }

    public PartitionId partitionId() {
        return this.partitionId;
    }

    public SessionId sessionId() {
        return this.sessionId;
    }

    public ThreadContext context() {
        return this.threadContext;
    }

    public PrimitiveState getState() {
        return this.state;
    }

    public LogProducer producer() {
        return this.producer;
    }

    public LogConsumer consumer() {
        return this.consumer;
    }

    private void handleClusterEvent(ClusterMembershipEvent event) {
        PrimaryTerm term = this.term;
        if (term != null && event.type() == ClusterMembershipEvent.Type.MEMBER_REMOVED && ((Member)event.subject()).id().equals((Object)term.primary().memberId())) {
            this.changeState(PrimitiveState.SUSPENDED);
        }
    }

    private void changeReplicas(PrimaryTerm term) {
        this.threadContext.execute(() -> {
            if (this.term == null || term.term() > this.term.term()) {
                this.term = term;
                this.consumer.register(term.primary().memberId());
            }
        });
    }

    private synchronized void changeState(PrimitiveState state) {
        if (this.state != state) {
            this.state = state;
            this.stateChangeListeners.forEach(l -> this.threadContext.execute(() -> l.accept(state)));
        }
    }

    public void addStateChangeListener(Consumer<PrimitiveState> listener) {
        this.stateChangeListeners.add((Consumer<PrimitiveState>)Preconditions.checkNotNull(listener));
    }

    public void removeStateChangeListener(Consumer<PrimitiveState> listener) {
        this.stateChangeListeners.remove(Preconditions.checkNotNull(listener));
    }

    public CompletableFuture<LogSession> connect() {
        return ((CompletableFuture)this.term().thenRun(() -> this.changeState(PrimitiveState.CONNECTED))).thenApply(v -> this);
    }

    public CompletableFuture<Void> close() {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.threadContext.execute(() -> {
            this.changeState(PrimitiveState.CLOSED);
            future.complete(null);
        });
        return future;
    }

    private CompletableFuture<PrimaryTerm> term() {
        CompletableFuture<PrimaryTerm> future = new CompletableFuture<PrimaryTerm>();
        this.threadContext.execute(() -> {
            if (this.term != null) {
                future.complete(this.term);
            } else {
                this.primaryElection.getTerm().whenCompleteAsync((term, error) -> {
                    if (term != null) {
                        this.term = term;
                        future.complete((PrimaryTerm)term);
                    } else {
                        future.completeExceptionally((Throwable)new PrimitiveException.Unavailable());
                    }
                });
            }
        });
        return future;
    }

    private class DistributedLogConsumer
    implements LogConsumer {
        private MemberId leader;
        private long index;
        private volatile Consumer<LogRecord> consumer;

        private DistributedLogConsumer() {
        }

        private CompletableFuture<Void> register(MemberId leader) {
            CompletableFuture<Void> future = new CompletableFuture<Void>();
            this.leader = leader;
            DistributedLogSession.this.protocol.consume(leader, ConsumeRequest.request(DistributedLogSession.this.memberId, DistributedLogSession.this.subject, this.index + 1L)).whenCompleteAsync((response, error) -> {
                if (error == null) {
                    if (response.status() == LogResponse.Status.OK) {
                        future.complete(null);
                    } else {
                        future.completeExceptionally((Throwable)new PrimitiveException.Unavailable());
                    }
                } else {
                    future.completeExceptionally((Throwable)error);
                }
            }, (Executor)DistributedLogSession.this.threadContext);
            return future;
        }

        private void handleRecords(RecordsRequest request) {
            if (request.reset()) {
                this.index = request.record().index() - 1L;
            }
            if (request.record().index() == this.index + 1L) {
                Consumer<LogRecord> consumer = this.consumer;
                if (consumer != null) {
                    consumer.accept(request.record());
                    this.index = request.record().index();
                }
            } else {
                DistributedLogSession.this.protocol.reset(this.leader, ResetRequest.request(DistributedLogSession.this.memberId, DistributedLogSession.this.subject, this.index + 1L));
            }
        }

        public CompletableFuture<Void> consume(long index, Consumer<LogRecord> consumer) {
            return DistributedLogSession.this.term().thenCompose(term -> {
                DistributedLogSession.this.protocol.registerRecordsConsumer(DistributedLogSession.this.subject, this::handleRecords, (Executor)DistributedLogSession.this.threadContext);
                this.consumer = consumer;
                this.index = index - 1L;
                return this.register(term.primary().memberId());
            });
        }
    }

    private class DistributedLogProducer
    implements LogProducer {
        private DistributedLogProducer() {
        }

        public CompletableFuture<Long> append(byte[] value) {
            CompletableFuture<Long> future = new CompletableFuture<Long>();
            ((CompletableFuture)DistributedLogSession.this.term().thenCompose(term -> DistributedLogSession.this.protocol.append(term.primary().memberId(), AppendRequest.request(value)))).whenCompleteAsync((response, error) -> {
                if (error == null) {
                    if (response.status() == LogResponse.Status.OK) {
                        future.complete(response.index());
                    } else {
                        future.completeExceptionally((Throwable)new PrimitiveException.Unavailable());
                    }
                } else {
                    future.completeExceptionally((Throwable)error);
                }
            }, (Executor)DistributedLogSession.this.threadContext);
            return future;
        }
    }
}

