package org.terracotta.nomad.client;

import java.time.Clock;
import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.terracotta.inet.HostPort;
import org.terracotta.nomad.client.change.NomadChange;
import org.terracotta.nomad.client.results.AllResultsReceiver;
import org.terracotta.nomad.client.results.CommitResultsReceiver;
import org.terracotta.nomad.client.results.DiscoverResultsReceiver;
import org.terracotta.nomad.client.results.PrepareResultsReceiver;
import org.terracotta.nomad.client.results.RollbackResultsReceiver;
import org.terracotta.nomad.client.results.TakeoverResultsReceiver;
import org.terracotta.nomad.messages.CommitMessage;
import org.terracotta.nomad.messages.DiscoverResponse;
import org.terracotta.nomad.messages.PrepareMessage;
import org.terracotta.nomad.messages.RejectionReason;
import org.terracotta.nomad.messages.RollbackMessage;
import org.terracotta.nomad.messages.TakeoverMessage;
import org.terracotta.nomad.server.NomadException;

/* loaded from: input_file:org/terracotta/nomad/client/NomadMessageSender.class */
public class NomadMessageSender<T> implements AllResultsReceiver<T> {
    private final List<NomadEndpoint<T>> servers;
    private final Clock clock;
    private final String host;
    private final String user;
    private final Map<HostPort, Long> mutativeMessageCounts = new ConcurrentHashMap();
    private final AtomicLong maxVersionNumber = new AtomicLong();
    private final List<NomadEndpoint<T>> preparedServers = new CopyOnWriteArrayList();
    protected volatile UUID changeUuid;

    public NomadMessageSender(List<NomadEndpoint<T>> list, String str, String str2, Clock clock) {
        this.host = str;
        this.user = str2;
        this.servers = list;
        this.clock = clock;
    }

    public void sendDiscovers(DiscoverResultsReceiver<T> discoverResultsReceiver) {
        discoverResultsReceiver.startDiscovery((Collection) this.servers.stream().map((v0) -> {
            return v0.getHostPort();
        }).collect(Collectors.toList()));
        for (NomadEndpoint<T> nomadEndpoint : this.servers) {
            nomadEndpoint.getClass();
            runSync(nomadEndpoint::discover, discoverResponse -> {
                discoverResultsReceiver.discovered(nomadEndpoint.getHostPort(), discoverResponse);
            }, unwrap(th -> {
                discoverResultsReceiver.discoverFail(nomadEndpoint.getHostPort(), th);
            }));
        }
        discoverResultsReceiver.endDiscovery();
    }

    public void sendSecondDiscovers(DiscoverResultsReceiver<T> discoverResultsReceiver) {
        discoverResultsReceiver.startSecondDiscovery();
        for (NomadEndpoint<T> nomadEndpoint : this.servers) {
            long longValue = this.mutativeMessageCounts.get(nomadEndpoint.getHostPort()).longValue();
            nomadEndpoint.getClass();
            runSync(nomadEndpoint::discover, discoverResponse -> {
                if (discoverResponse.getMutativeMessageCount() == longValue) {
                    discoverResultsReceiver.discoverRepeated(nomadEndpoint.getHostPort());
                    return;
                }
                discoverResultsReceiver.discoverOtherClient(nomadEndpoint.getHostPort(), discoverResponse.getLastMutationHost(), discoverResponse.getLastMutationUser());
            }, unwrap(th -> {
                discoverResultsReceiver.discoverFail(nomadEndpoint.getHostPort(), th);
            }));
        }
    }

    public void sendPrepares(PrepareResultsReceiver prepareResultsReceiver, UUID uuid, NomadChange nomadChange) {
        prepareResultsReceiver.startPrepare(uuid);
        long j = this.maxVersionNumber.get() + 1;
        Instant instant = this.clock.instant();
        for (NomadEndpoint<T> nomadEndpoint : this.servers) {
            long longValue = this.mutativeMessageCounts.get(nomadEndpoint.getHostPort()).longValue();
            runSync(() -> {
                return nomadEndpoint.prepare(new PrepareMessage(longValue, this.host, this.user, instant, uuid, j, nomadChange));
            }, acceptRejectResponse -> {
                if (acceptRejectResponse.isAccepted()) {
                    prepareResultsReceiver.prepared(nomadEndpoint.getHostPort());
                    return;
                }
                RejectionReason rejectionReason = acceptRejectResponse.getRejectionReason();
                switch (rejectionReason) {
                    case UNACCEPTABLE:
                        prepareResultsReceiver.prepareChangeUnacceptable(nomadEndpoint.getHostPort(), acceptRejectResponse.getRejectionMessage());
                        return;
                    case DEAD:
                        prepareResultsReceiver.prepareOtherClient(nomadEndpoint.getHostPort(), acceptRejectResponse.getLastMutationHost(), acceptRejectResponse.getLastMutationUser());
                        return;
                    case BAD:
                        throw new AssertionError("A server rejected a message as bad: " + nomadEndpoint.getHostPort());
                    default:
                        throw new AssertionError("Unexpected RejectionReason: " + rejectionReason);
                }
            }, unwrap(th -> {
                prepareResultsReceiver.prepareFail(nomadEndpoint.getHostPort(), th);
            }));
        }
        prepareResultsReceiver.endPrepare();
    }

    public void sendCommits(CommitResultsReceiver commitResultsReceiver) {
        commitResultsReceiver.startCommit();
        Instant instant = this.clock.instant();
        for (NomadEndpoint<T> nomadEndpoint : this.preparedServers) {
            long longValue = this.mutativeMessageCounts.get(nomadEndpoint.getHostPort()).longValue();
            runSync(() -> {
                return nomadEndpoint.commit(new CommitMessage(longValue + 1, this.host, this.user, instant, this.changeUuid));
            }, acceptRejectResponse -> {
                if (acceptRejectResponse.isAccepted()) {
                    commitResultsReceiver.committed(nomadEndpoint.getHostPort());
                    return;
                }
                RejectionReason rejectionReason = acceptRejectResponse.getRejectionReason();
                switch (rejectionReason) {
                    case UNACCEPTABLE:
                        throw new AssertionError("Commit should not return UNACCEPTABLE");
                    case DEAD:
                        commitResultsReceiver.commitOtherClient(nomadEndpoint.getHostPort(), acceptRejectResponse.getLastMutationHost(), acceptRejectResponse.getLastMutationUser());
                        return;
                    case BAD:
                        throw new AssertionError("A server rejected a message as bad: " + nomadEndpoint.getHostPort());
                    default:
                        throw new AssertionError("Unexpected RejectionReason: " + rejectionReason);
                }
            }, unwrap(th -> {
                commitResultsReceiver.commitFail(nomadEndpoint.getHostPort(), th);
            }));
        }
        commitResultsReceiver.endCommit();
    }

    public void sendRollbacks(RollbackResultsReceiver rollbackResultsReceiver) {
        rollbackResultsReceiver.startRollback();
        Instant instant = this.clock.instant();
        for (NomadEndpoint<T> nomadEndpoint : this.preparedServers) {
            long longValue = this.mutativeMessageCounts.get(nomadEndpoint.getHostPort()).longValue();
            runSync(() -> {
                return nomadEndpoint.rollback(new RollbackMessage(longValue + 1, this.host, this.user, instant, this.changeUuid));
            }, acceptRejectResponse -> {
                if (acceptRejectResponse.isAccepted()) {
                    rollbackResultsReceiver.rolledBack(nomadEndpoint.getHostPort());
                    return;
                }
                RejectionReason rejectionReason = acceptRejectResponse.getRejectionReason();
                switch (rejectionReason) {
                    case UNACCEPTABLE:
                        throw new AssertionError("Rollback should not return UNACCEPTABLE");
                    case DEAD:
                        rollbackResultsReceiver.rollbackOtherClient(nomadEndpoint.getHostPort(), acceptRejectResponse.getLastMutationHost(), acceptRejectResponse.getLastMutationUser());
                        return;
                    case BAD:
                        throw new AssertionError("A server rejected a message as bad: " + nomadEndpoint.getHostPort());
                    default:
                        throw new AssertionError("Unexpected RejectionReason: " + rejectionReason);
                }
            }, unwrap(th -> {
                rollbackResultsReceiver.rollbackFail(nomadEndpoint.getHostPort(), th);
            }));
        }
        rollbackResultsReceiver.endRollback();
    }

    public void sendTakeovers(TakeoverResultsReceiver takeoverResultsReceiver) {
        takeoverResultsReceiver.startTakeover();
        Instant instant = this.clock.instant();
        for (NomadEndpoint<T> nomadEndpoint : this.servers) {
            long longValue = this.mutativeMessageCounts.get(nomadEndpoint.getHostPort()).longValue();
            runSync(() -> {
                return nomadEndpoint.takeover(new TakeoverMessage(longValue, this.host, this.user, instant));
            }, acceptRejectResponse -> {
                if (acceptRejectResponse.isAccepted()) {
                    takeoverResultsReceiver.takeover(nomadEndpoint.getHostPort());
                    return;
                }
                RejectionReason rejectionReason = acceptRejectResponse.getRejectionReason();
                switch (rejectionReason) {
                    case UNACCEPTABLE:
                        throw new AssertionError("Takeover should not return UNACCEPTABLE");
                    case DEAD:
                        takeoverResultsReceiver.takeoverOtherClient(nomadEndpoint.getHostPort(), acceptRejectResponse.getLastMutationHost(), acceptRejectResponse.getLastMutationUser());
                        return;
                    case BAD:
                        throw new AssertionError("A server rejected a message as bad: " + nomadEndpoint.getHostPort());
                    default:
                        throw new AssertionError("Unexpected RejectionReason: " + rejectionReason);
                }
            }, unwrap(th -> {
                takeoverResultsReceiver.takeoverFail(nomadEndpoint.getHostPort(), th);
            }));
        }
        takeoverResultsReceiver.endTakeover();
    }

    @Override // org.terracotta.nomad.client.results.DiscoverResultsReceiver
    public void discovered(HostPort hostPort, DiscoverResponse<T> discoverResponse) {
        long mutativeMessageCount = discoverResponse.getMutativeMessageCount();
        long highestVersion = discoverResponse.getHighestVersion();
        this.mutativeMessageCounts.put(hostPort, Long.valueOf(mutativeMessageCount));
        this.maxVersionNumber.accumulateAndGet(highestVersion, Long::max);
    }

    public final void registerPreparedServer(HostPort hostPort) {
        this.preparedServers.add(this.servers.stream().filter(nomadEndpoint -> {
            return nomadEndpoint.getHostPort().equals(hostPort);
        }).findAny().get());
    }

    private <T> void runSync(Callable<T> callable, Consumer<T> consumer, Consumer<Throwable> consumer2) {
        try {
            T call = callable.call();
            if (call == null) {
                throw new AssertionError("Response expected. Bug or wrong mocking ?");
            }
            consumer.accept(call);
        } catch (Exception e) {
            consumer2.accept(e);
        }
    }

    private static Consumer<Throwable> unwrap(Consumer<Throwable> consumer) {
        return th -> {
            consumer.accept((!(th instanceof NomadException) || th.getCause() == null || th.getCause() == th) ? th : th.getCause());
        };
    }
}
