/*
 * Decompiled with CFR 0.152.
 */
package io.atomix.cluster.impl;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.atomix.cluster.ClusterMetadata;
import io.atomix.cluster.ClusterMetadataEvent;
import io.atomix.cluster.ClusterMetadataEventListener;
import io.atomix.cluster.ClusterMetadataService;
import io.atomix.cluster.ManagedClusterMetadataService;
import io.atomix.cluster.Node;
import io.atomix.cluster.NodeId;
import io.atomix.cluster.impl.ClusterMetadataAdvertisement;
import io.atomix.cluster.impl.NodeDigest;
import io.atomix.cluster.impl.NodeUpdate;
import io.atomix.cluster.impl.ReplicatedNode;
import io.atomix.messaging.Endpoint;
import io.atomix.messaging.MessagingService;
import io.atomix.utils.concurrent.Threads;
import io.atomix.utils.event.AbstractListenerManager;
import io.atomix.utils.event.Event;
import io.atomix.utils.serializer.KryoNamespace;
import io.atomix.utils.serializer.KryoNamespaces;
import io.atomix.utils.serializer.Namespace;
import io.atomix.utils.time.LogicalClock;
import io.atomix.utils.time.LogicalTimestamp;
import io.atomix.utils.time.Timestamp;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultClusterMetadataService
extends AbstractListenerManager<ClusterMetadataEvent, ClusterMetadataEventListener>
implements ManagedClusterMetadataService {
    private static final String BOOTSTRAP_MESSAGE = "atomix-cluster-metadata-bootstrap";
    private static final String UPDATE_MESSAGE = "atomix-cluster-metadata-update";
    private static final String ADVERTISEMENT_MESSAGE = "atomix-cluster-metadata-advertisement";
    private static final int HEARTBEAT_INTERVAL = 1000;
    private static final io.atomix.utils.serializer.Serializer SERIALIZER = io.atomix.utils.serializer.Serializer.using((Namespace)KryoNamespace.builder().register(KryoNamespaces.BASIC).nextId(500).register(new Class[]{ReplicatedNode.class}).register(new Class[]{NodeId.class}).register(new Class[]{Node.Type.class}).register((Serializer)new EndpointSerializer(), new Class[]{Endpoint.class}).register(new Class[]{LogicalTimestamp.class}).register(new Class[]{NodeUpdate.class}).register(new Class[]{ClusterMetadataAdvertisement.class}).register(new Class[]{NodeDigest.class}).build("ClusterMetadataService"));
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private final Map<NodeId, ReplicatedNode> nodes = Maps.newConcurrentMap();
    private final MessagingService messagingService;
    private final LogicalClock clock = new LogicalClock();
    private final AtomicBoolean started = new AtomicBoolean();
    private final ScheduledExecutorService messageScheduler = Executors.newSingleThreadScheduledExecutor(Threads.namedThreads((String)"atomix-cluster-metadata-sender", (Logger)this.log));
    private final ExecutorService messageExecutor = Executors.newSingleThreadExecutor(Threads.namedThreads((String)"atomix-cluster-metadata-receiver", (Logger)this.log));
    private ScheduledFuture<?> metadataFuture;

    public DefaultClusterMetadataService(ClusterMetadata metadata, MessagingService messagingService) {
        metadata.bootstrapNodes().forEach(node -> this.nodes.put(node.id(), new ReplicatedNode(node.id(), node.type(), node.endpoint(), new LogicalTimestamp(0L), false)));
        this.messagingService = messagingService;
    }

    @Override
    public ClusterMetadata getMetadata() {
        return new ClusterMetadata((Collection<Node>)ImmutableList.copyOf((Collection)this.nodes.values().stream().filter(node -> !node.tombstone()).collect(Collectors.toList())));
    }

    @Override
    public void addNode(Node node) {
        ReplicatedNode replicatedNode;
        if (node.type() != Node.Type.CLIENT && (replicatedNode = this.nodes.get(node.id())) == null) {
            LogicalTimestamp timestamp = this.clock.increment();
            replicatedNode = new ReplicatedNode(node.id(), node.type(), node.endpoint(), timestamp, false);
            this.nodes.put(replicatedNode.id(), replicatedNode);
            this.broadcastUpdate(new NodeUpdate(replicatedNode, timestamp));
            this.post((Event)new ClusterMetadataEvent(ClusterMetadataEvent.Type.METADATA_CHANGED, this.getMetadata()));
        }
    }

    @Override
    public void removeNode(Node node) {
        ReplicatedNode replicatedNode = this.nodes.get(node.id());
        if (replicatedNode != null) {
            LogicalTimestamp timestamp = this.clock.increment();
            replicatedNode = new ReplicatedNode(node.id(), node.type(), node.endpoint(), timestamp, true);
            this.nodes.put(replicatedNode.id(), replicatedNode);
            this.broadcastUpdate(new NodeUpdate(replicatedNode, timestamp));
            this.post((Event)new ClusterMetadataEvent(ClusterMetadataEvent.Type.METADATA_CHANGED, this.getMetadata()));
        }
    }

    private CompletableFuture<Void> bootstrap() {
        Set<Endpoint> peers = this.nodes.values().stream().map(Node::endpoint).filter(endpoint -> !endpoint.equals((Object)this.messagingService.endpoint())).collect(Collectors.toSet());
        int totalPeers = peers.size();
        if (totalPeers == 0) {
            return CompletableFuture.completedFuture(null);
        }
        AtomicBoolean successful = new AtomicBoolean();
        AtomicInteger totalCount = new AtomicInteger();
        AtomicReference lastError = new AtomicReference();
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        peers.forEach(peer -> this.bootstrap((Endpoint)peer).whenComplete((result, error) -> {
            if (error == null) {
                Throwable e;
                if (successful.compareAndSet(false, true)) {
                    future.complete(null);
                } else if (totalCount.incrementAndGet() == totalPeers && (e = (Throwable)lastError.get()) != null) {
                    future.completeExceptionally(e);
                }
            } else if (!successful.get() && totalCount.incrementAndGet() == totalPeers) {
                future.completeExceptionally((Throwable)error);
            } else {
                lastError.set(error);
            }
        }));
        return future;
    }

    private CompletableFuture<Void> bootstrap(Endpoint endpoint) {
        return this.messagingService.sendAndReceive(endpoint, BOOTSTRAP_MESSAGE, new byte[0]).thenAccept(response -> this.nodes.putAll((Map)SERIALIZER.decode(response)));
    }

    private byte[] handleBootstrap(Endpoint endpoint, byte[] payload) {
        return SERIALIZER.encode(this.nodes);
    }

    private void broadcastUpdate(NodeUpdate update) {
        this.nodes.values().stream().map(Node::endpoint).filter(endpoint -> !endpoint.equals((Object)this.messagingService.endpoint())).forEach(endpoint -> this.sendUpdate((Endpoint)endpoint, update));
    }

    private void sendUpdate(Endpoint endpoint, NodeUpdate update) {
        this.messagingService.sendAsync(endpoint, UPDATE_MESSAGE, SERIALIZER.encode((Object)update));
    }

    private void handleUpdate(Endpoint endpoint, byte[] payload) {
        NodeUpdate update = (NodeUpdate)SERIALIZER.decode(payload);
        this.clock.incrementAndUpdate(update.timestamp());
        ReplicatedNode node = this.nodes.get(update.node().id());
        if (node == null || node.timestamp().isOlderThan((Timestamp)update.timestamp())) {
            this.nodes.put(update.node().id(), update.node());
            this.post((Event)new ClusterMetadataEvent(ClusterMetadataEvent.Type.METADATA_CHANGED, this.getMetadata()));
        }
    }

    private void sendAdvertisement() {
        this.pickRandomPeer().ifPresent(this::sendAdvertisement);
    }

    private void sendAdvertisement(Endpoint endpoint) {
        this.clock.increment();
        ClusterMetadataAdvertisement advertisement = new ClusterMetadataAdvertisement(Maps.newHashMap((Map)Maps.transformValues(this.nodes, node -> new NodeDigest(node.timestamp(), node.tombstone()))));
        this.messagingService.sendAndReceive(endpoint, ADVERTISEMENT_MESSAGE, SERIALIZER.encode((Object)advertisement)).whenComplete((response, error) -> {
            if (error == null) {
                Set nodes = (Set)SERIALIZER.decode(response);
                for (NodeId nodeId : nodes) {
                    ReplicatedNode node = this.nodes.get(nodeId);
                    if (node == null) continue;
                    this.sendUpdate(endpoint, new NodeUpdate(node, this.clock.increment()));
                }
            } else {
                this.log.warn("Anti-entropy advertisement to {} failed!", (Object)endpoint);
            }
        });
    }

    private Optional<Endpoint> pickRandomPeer() {
        List nodes = this.nodes.values().stream().filter(replicatedNode -> !replicatedNode.tombstone() && !replicatedNode.endpoint().equals((Object)this.messagingService.endpoint())).map(Node::endpoint).collect(Collectors.toList());
        Collections.shuffle(nodes);
        return nodes.stream().findFirst();
    }

    private byte[] handleAdvertisement(Endpoint endpoint, byte[] payload) {
        LogicalTimestamp timestamp = this.clock.increment();
        ClusterMetadataAdvertisement advertisement = (ClusterMetadataAdvertisement)SERIALIZER.decode(payload);
        Set staleNodes = this.nodes.values().stream().map(node -> {
            NodeDigest digest = advertisement.digest(node.id());
            if (digest == null || node.isNewerThan((Timestamp)digest.timestamp())) {
                this.sendUpdate(endpoint, new NodeUpdate((ReplicatedNode)node, timestamp));
            } else if (digest.isNewerThan((Timestamp)node.timestamp())) {
                if (digest.tombstone()) {
                    if (!node.tombstone()) {
                        this.nodes.put(node.id(), new ReplicatedNode(node.id(), node.type(), node.endpoint(), digest.timestamp(), true));
                        this.post((Event)new ClusterMetadataEvent(ClusterMetadataEvent.Type.METADATA_CHANGED, this.getMetadata()));
                    }
                } else {
                    return node.id();
                }
            }
            return null;
        }).filter(Objects::nonNull).collect(Collectors.toSet());
        return SERIALIZER.encode((Object)Sets.newHashSet((Iterable)Sets.union((Set)Sets.difference(advertisement.digests(), this.nodes.keySet()), staleNodes)));
    }

    public CompletableFuture<ClusterMetadataService> start() {
        if (this.started.compareAndSet(false, true)) {
            this.registerMessageHandlers();
            return this.bootstrap().handle((result, error) -> {
                this.metadataFuture = this.messageScheduler.scheduleWithFixedDelay(this::sendAdvertisement, 0L, 1000L, TimeUnit.MILLISECONDS);
                this.log.info("Started");
                return this;
            });
        }
        return CompletableFuture.completedFuture(this);
    }

    public boolean isRunning() {
        return this.started.get();
    }

    private void registerMessageHandlers() {
        this.messagingService.registerHandler(BOOTSTRAP_MESSAGE, this::handleBootstrap, (Executor)this.messageExecutor);
        this.messagingService.registerHandler(UPDATE_MESSAGE, this::handleUpdate, (Executor)this.messageExecutor);
        this.messagingService.registerHandler(ADVERTISEMENT_MESSAGE, this::handleAdvertisement, (Executor)this.messageExecutor);
    }

    private void unregisterMessageHandlers() {
        this.messagingService.unregisterHandler(BOOTSTRAP_MESSAGE);
        this.messagingService.unregisterHandler(UPDATE_MESSAGE);
        this.messagingService.unregisterHandler(ADVERTISEMENT_MESSAGE);
    }

    public CompletableFuture<Void> stop() {
        if (this.started.compareAndSet(true, false)) {
            this.messageScheduler.shutdownNow();
            this.messageExecutor.shutdownNow();
            this.metadataFuture.cancel(true);
            this.unregisterMessageHandlers();
        }
        this.log.info("Stopped");
        return CompletableFuture.completedFuture(null);
    }

    static class EndpointSerializer
    extends Serializer<Endpoint> {
        EndpointSerializer() {
        }

        public void write(Kryo kryo, Output output, Endpoint endpoint) {
            output.writeString(endpoint.host().getHostAddress());
            output.writeInt(endpoint.port());
        }

        public Endpoint read(Kryo kryo, Input input, Class<Endpoint> type) {
            String host = input.readString();
            int port = input.readInt();
            return Endpoint.from((String)host, (int)port);
        }
    }
}

