/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.replicator;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.ComponentStoppingException;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessage;
import org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessageResponse;
import org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory;
import org.apache.ignite.internal.placementdriver.message.PlacementDriverReplicaMessage;
import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.ReplicatorRecoverableExceptions;
import org.apache.ignite.internal.replicator.message.PrimaryReplicaChangeCommand;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.internal.util.TrackerClosedException;
import org.jetbrains.annotations.Nullable;

public class PlacementDriverMessageProcessor {
    private static final IgniteLogger LOG = Loggers.forClass(PlacementDriverMessageProcessor.class);
    private static final PlacementDriverMessagesFactory PLACEMENT_DRIVER_MESSAGES_FACTORY = new PlacementDriverMessagesFactory();
    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory();
    private final ReplicationGroupId groupId;
    private final InternalClusterNode localNode;
    private final PlacementDriver placementDriver;
    private final ClockService clockService;
    private final BiConsumer<ReplicationGroupId, HybridTimestamp> replicaReservationClosure;
    private final Executor executor;
    private volatile HybridTimestamp leaseExpirationTime;
    private final PendingComparableValuesTracker<Long, Void> storageIndexTracker;
    private final CompletableFuture<Void> leaderReadyFuture = new CompletableFuture();
    @Nullable
    private volatile InternalClusterNode leaderRef = null;
    private final TopologyAwareRaftGroupService raftClient;

    PlacementDriverMessageProcessor(ReplicationGroupId groupId, InternalClusterNode localNode, PlacementDriver placementDriver, ClockService clockService, BiConsumer<ReplicationGroupId, HybridTimestamp> replicaReservationClosure, Executor executor, PendingComparableValuesTracker<Long, Void> storageIndexTracker, TopologyAwareRaftGroupService raftClient) {
        this.groupId = groupId;
        this.localNode = localNode;
        this.placementDriver = placementDriver;
        this.clockService = clockService;
        this.replicaReservationClosure = replicaReservationClosure;
        this.executor = executor;
        this.storageIndexTracker = storageIndexTracker;
        this.raftClient = raftClient;
        raftClient.subscribeLeader(this::onLeaderElected);
    }

    CompletableFuture<? extends NetworkMessage> processPlacementDriverMessage(PlacementDriverReplicaMessage msg) {
        if (msg instanceof LeaseGrantedMessage) {
            return this.processLeaseGrantedMessage((LeaseGrantedMessage)msg).handle((v, e) -> {
                if (e != null) {
                    if (!ExceptionUtils.hasCause((Throwable)e, (Class[])new Class[]{NodeStoppingException.class, ComponentStoppingException.class, TrackerClosedException.class}) && !ReplicatorRecoverableExceptions.isRecoverable(e)) {
                        LOG.warn("Failed to process the lease granted message, lease negotiation will be retried [msg={}].", e, new Object[]{msg});
                    }
                    return PLACEMENT_DRIVER_MESSAGES_FACTORY.leaseGrantedMessageResponse().accepted(false).build();
                }
                return v;
            });
        }
        return CompletableFuture.failedFuture((Throwable)((Object)new AssertionError((Object)("Unknown message type, msg=" + String.valueOf(msg)))));
    }

    private CompletableFuture<LeaseGrantedMessageResponse> processLeaseGrantedMessage(LeaseGrantedMessage msg) {
        LOG.info("Received LeaseGrantedMessage for replica [groupId={}, leaseStartTime={}, force={}].", new Object[]{this.groupId, msg.leaseStartTime(), msg.force()});
        return this.placementDriver.previousPrimaryExpired(this.groupId).thenCompose(unused -> this.leaderFuture().thenComposeAsync(leader -> {
            HybridTimestamp leaseExpirationTime = this.leaseExpirationTime;
            assert (leaseExpirationTime == null || this.clockService.after(msg.leaseExpirationTime(), leaseExpirationTime)) : "Invalid lease expiration time in message, msg=" + String.valueOf(msg);
            if (msg.force()) {
                return ((CompletableFuture)this.waitForActualState(msg.leaseStartTime(), msg.leaseExpirationTime().getPhysical()).thenCompose(v -> this.sendPrimaryReplicaChangeToReplicationGroup(msg.leaseStartTime().longValue(), this.localNode.id(), this.localNode.name()))).thenCompose(v -> {
                    CompletableFuture<LeaseGrantedMessageResponse> respFut = this.acceptLease(msg.leaseStartTime(), msg.leaseExpirationTime());
                    if (leader.equals((Object)this.localNode)) {
                        return respFut;
                    }
                    return this.raftClient.transferLeadership(new Peer(this.localNode.name())).thenCompose(ignored -> respFut);
                });
            }
            if (leader.equals((Object)this.localNode)) {
                return ((CompletableFuture)this.waitForActualState(msg.leaseStartTime(), msg.leaseExpirationTime().getPhysical()).thenCompose(v -> this.sendPrimaryReplicaChangeToReplicationGroup(msg.leaseStartTime().longValue(), this.localNode.id(), this.localNode.name()))).thenCompose(v -> this.acceptLease(msg.leaseStartTime(), msg.leaseExpirationTime()));
            }
            return this.proposeLeaseRedirect((InternalClusterNode)leader);
        }, this.executor));
    }

    private CompletableFuture<Void> sendPrimaryReplicaChangeToReplicationGroup(long leaseStartTime, UUID primaryReplicaNodeId, String primaryReplicaNodeName) {
        PrimaryReplicaChangeCommand cmd = REPLICA_MESSAGES_FACTORY.primaryReplicaChangeCommand().leaseStartTime(leaseStartTime).primaryReplicaNodeId(primaryReplicaNodeId).primaryReplicaNodeName(primaryReplicaNodeName).build();
        return this.raftClient.run((Command)cmd);
    }

    private CompletableFuture<LeaseGrantedMessageResponse> acceptLease(HybridTimestamp leaseStartTime, HybridTimestamp leaseExpirationTime) {
        LOG.info("Lease accepted [group=" + String.valueOf(this.groupId) + ", leaseStartTime=" + String.valueOf(leaseStartTime) + "].", new Object[0]);
        this.leaseExpirationTime = leaseExpirationTime;
        LeaseGrantedMessageResponse resp = PLACEMENT_DRIVER_MESSAGES_FACTORY.leaseGrantedMessageResponse().accepted(true).build();
        return CompletableFuture.completedFuture(resp);
    }

    private CompletableFuture<LeaseGrantedMessageResponse> proposeLeaseRedirect(InternalClusterNode groupLeader) {
        LOG.info("Proposing lease redirection [groupId={}, proposed node={}].", new Object[]{this.groupId, groupLeader});
        LeaseGrantedMessageResponse resp = PLACEMENT_DRIVER_MESSAGES_FACTORY.leaseGrantedMessageResponse().accepted(false).redirectProposal(groupLeader.name()).build();
        return CompletableFuture.completedFuture(resp);
    }

    private CompletableFuture<Void> waitForActualState(HybridTimestamp startTime, long expirationTime) {
        LOG.info("Waiting for actual storage state, group=" + String.valueOf(this.groupId), new Object[0]);
        this.replicaReservationClosure.accept(this.groupId, startTime);
        long timeout = expirationTime - System.currentTimeMillis();
        if (timeout <= 0L) {
            return CompletableFuture.failedFuture(new TimeoutException());
        }
        return IgniteUtils.retryOperationUntilSuccess(this.raftClient::readIndex, e -> System.currentTimeMillis() > expirationTime, (Executor)this.executor).orTimeout(timeout, TimeUnit.MILLISECONDS).thenCompose(arg_0 -> this.storageIndexTracker.waitFor(arg_0));
    }

    private void onLeaderElected(InternalClusterNode clusterNode, long term) {
        this.leaderRef = clusterNode;
        this.leaderReadyFuture.complete(null);
    }

    private CompletableFuture<InternalClusterNode> leaderFuture() {
        return this.leaderReadyFuture.thenApply(ignored -> this.leaderRef);
    }
}

