/*
 * Decompiled with CFR 0.152.
 */
package io.atomix.protocols.backup.roles;

import io.atomix.cluster.NodeId;
import io.atomix.protocols.backup.protocol.BackupOperation;
import io.atomix.protocols.backup.protocol.BackupRequest;
import io.atomix.protocols.backup.protocol.PrimaryBackupResponse;
import io.atomix.protocols.backup.roles.Replicator;
import io.atomix.protocols.backup.service.impl.PrimaryBackupServiceContext;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.slf4j.Logger;

class SynchronousReplicator
implements Replicator {
    private final PrimaryBackupServiceContext context;
    private final Logger log;
    private final Map<NodeId, BackupQueue> queues = new HashMap<NodeId, BackupQueue>();
    private final Map<Long, CompletableFuture<Void>> futures = new LinkedHashMap<Long, CompletableFuture<Void>>();

    SynchronousReplicator(PrimaryBackupServiceContext context, Logger log) {
        this.context = context;
        this.log = log;
    }

    @Override
    public CompletableFuture<Void> replicate(BackupOperation operation) {
        if (this.context.descriptor().backups() == 0) {
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.futures.put(operation.index(), future);
        for (NodeId backup : this.context.backups()) {
            this.queues.computeIfAbsent(backup, x$0 -> new BackupQueue((NodeId)x$0)).add(operation);
        }
        return future;
    }

    private void completeFutures() {
        long commitIndex = this.queues.values().stream().map(queue -> ((BackupQueue)queue).ackedIndex).reduce(Math::min).orElse(0L);
        for (long i = this.context.getCommitIndex() + 1L; i <= commitIndex; ++i) {
            CompletableFuture<Void> future = this.futures.remove(i);
            if (future == null) continue;
            future.complete(null);
        }
        this.context.setCommitIndex(commitIndex);
    }

    @Override
    public void close() {
        this.futures.values().forEach(f -> f.completeExceptionally(new IllegalStateException("Not the primary")));
    }

    private final class BackupQueue {
        private final Queue<BackupOperation> operations = new LinkedList<BackupOperation>();
        private final NodeId nodeId;
        private boolean inProgress;
        private long ackedIndex;

        BackupQueue(NodeId nodeId) {
            this.nodeId = nodeId;
        }

        void add(BackupOperation operation) {
            this.operations.add(operation);
            this.maybeBackup();
        }

        private void maybeBackup() {
            if (!this.inProgress && !this.operations.isEmpty()) {
                this.inProgress = true;
                this.backup();
            }
        }

        private void backup() {
            LinkedList<BackupOperation> operations = new LinkedList<BackupOperation>();
            long index = 0L;
            while (operations.size() < 100 && !this.operations.isEmpty()) {
                BackupOperation operation = this.operations.remove();
                operations.add(operation);
                index = operation.index();
            }
            long lastIndex = index;
            BackupRequest request = BackupRequest.request(SynchronousReplicator.this.context.descriptor(), SynchronousReplicator.this.context.nodeId(), SynchronousReplicator.this.context.currentTerm(), SynchronousReplicator.this.context.getCommitIndex(), operations);
            SynchronousReplicator.this.log.trace("Sending {} to {}", (Object)request, (Object)this.nodeId);
            SynchronousReplicator.this.context.protocol().backup(this.nodeId, request).whenCompleteAsync((response, error) -> {
                if (error == null) {
                    SynchronousReplicator.this.log.trace("Received {} from {}", response, (Object)this.nodeId);
                    if (response.status() == PrimaryBackupResponse.Status.OK) {
                        this.ackedIndex = lastIndex;
                        SynchronousReplicator.this.completeFutures();
                    } else {
                        SynchronousReplicator.this.log.trace("Replication to {} failed!", (Object)this.nodeId);
                    }
                } else {
                    SynchronousReplicator.this.log.trace("Replication to {} failed! {}", (Object)this.nodeId, error);
                }
                this.inProgress = false;
                this.maybeBackup();
            }, (Executor)SynchronousReplicator.this.context.threadContext());
            operations.clear();
        }
    }
}

