/*
 * Decompiled with CFR 0.152.
 */
package io.atomix.protocols.log.roles;

import com.google.common.collect.Maps;
import io.atomix.cluster.MemberId;
import io.atomix.primitive.log.LogRecord;
import io.atomix.protocols.log.DistributedLogServer;
import io.atomix.protocols.log.impl.DistributedLogServerContext;
import io.atomix.protocols.log.protocol.AppendRequest;
import io.atomix.protocols.log.protocol.AppendResponse;
import io.atomix.protocols.log.protocol.BackupOperation;
import io.atomix.protocols.log.protocol.ConsumeRequest;
import io.atomix.protocols.log.protocol.ConsumeResponse;
import io.atomix.protocols.log.protocol.LogEntry;
import io.atomix.protocols.log.protocol.RecordsRequest;
import io.atomix.protocols.log.protocol.ResetRequest;
import io.atomix.protocols.log.roles.AsynchronousReplicator;
import io.atomix.protocols.log.roles.LogServerRole;
import io.atomix.protocols.log.roles.Replicator;
import io.atomix.protocols.log.roles.SynchronousReplicator;
import io.atomix.storage.StorageException;
import io.atomix.storage.journal.Indexed;
import io.atomix.storage.journal.JournalReader;
import io.atomix.storage.journal.SegmentedJournalReader;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;

public class LeaderRole
extends LogServerRole {
    private final Replicator replicator;
    private final Map<ConsumerKey, ConsumerSender> consumers = Maps.newHashMap();

    public LeaderRole(DistributedLogServerContext context) {
        super(DistributedLogServer.Role.LEADER, context);
        switch (context.replicationStrategy()) {
            case SYNCHRONOUS: {
                this.replicator = new SynchronousReplicator(context, this.log);
                break;
            }
            case ASYNCHRONOUS: {
                this.replicator = new AsynchronousReplicator(context, this.log);
                break;
            }
            default: {
                throw new AssertionError();
            }
        }
    }

    @Override
    public CompletableFuture<AppendResponse> append(AppendRequest request) {
        this.logRequest(request);
        try {
            Indexed entry = this.context.journal().writer().append((Object)new LogEntry(this.context.currentTerm(), System.currentTimeMillis(), request.value()));
            return this.replicator.replicate(new BackupOperation(entry.index(), ((LogEntry)entry.entry()).term(), ((LogEntry)entry.entry()).timestamp(), ((LogEntry)entry.entry()).value())).thenApply(v -> {
                this.consumers.values().forEach(consumer -> consumer.next());
                return this.logResponse(AppendResponse.ok(entry.index()));
            });
        }
        catch (StorageException e) {
            return CompletableFuture.completedFuture(this.logResponse(AppendResponse.error()));
        }
    }

    @Override
    public CompletableFuture<ConsumeResponse> consume(ConsumeRequest request) {
        this.logRequest(request);
        SegmentedJournalReader reader = this.context.journal().openReader(request.index(), JournalReader.Mode.COMMITS);
        ConsumerSender consumer = new ConsumerSender(request.memberId(), request.subject(), (JournalReader<LogEntry>)reader);
        this.consumers.put(new ConsumerKey(request.memberId(), request.subject()), consumer);
        consumer.next();
        return CompletableFuture.completedFuture(this.logResponse(ConsumeResponse.ok()));
    }

    @Override
    public void reset(ResetRequest request) {
        this.logRequest(request);
        ConsumerSender consumer = this.consumers.get(new ConsumerKey(request.memberId(), request.subject()));
        if (consumer != null) {
            consumer.reset(request.index());
        }
    }

    @Override
    public void close() {
        this.replicator.close();
        this.consumers.values().forEach(consumer -> consumer.close());
    }

    class ConsumerKey {
        private final MemberId memberId;
        private final String subject;

        ConsumerKey(MemberId memberId, String subject) {
            this.memberId = memberId;
            this.subject = subject;
        }

        public int hashCode() {
            return Objects.hash(this.memberId, this.subject);
        }

        public boolean equals(Object object) {
            if (object instanceof ConsumerKey) {
                ConsumerKey that = (ConsumerKey)object;
                return this.memberId.equals((Object)that.memberId) && this.subject.equals(that.subject);
            }
            return false;
        }
    }

    class ConsumerSender {
        private final MemberId memberId;
        private final String subject;
        private final JournalReader<LogEntry> reader;
        private boolean open = true;

        ConsumerSender(MemberId memberId, String subject, JournalReader<LogEntry> reader) {
            this.memberId = memberId;
            this.subject = subject;
            this.reader = reader;
        }

        void reset(long index) {
            this.reader.reset(index);
            this.next();
        }

        void next() {
            if (!this.open) {
                return;
            }
            LeaderRole.this.context.threadContext().execute(() -> {
                if (this.reader.hasNext()) {
                    Indexed entry = this.reader.next();
                    LogRecord record = new LogRecord(entry.index(), ((LogEntry)entry.entry()).timestamp(), ((LogEntry)entry.entry()).value());
                    boolean reset = this.reader.getFirstIndex() == entry.index();
                    RecordsRequest request = RecordsRequest.request(record, reset);
                    LeaderRole.this.log.trace("Sending {} to {} at {}", new Object[]{request, this.memberId, this.subject});
                    LeaderRole.this.context.protocol().produce(this.memberId, this.subject, request);
                    this.next();
                }
            });
        }

        void close() {
            this.reader.close();
            this.open = false;
        }
    }
}

