/*
 * Decompiled with CFR 0.152.
 */
package io.atomix.cluster.impl;

import com.esotericsoftware.kryo.Serializer;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.atomix.cluster.ClusterEvent;
import io.atomix.cluster.ClusterEventListener;
import io.atomix.cluster.ClusterMetadata;
import io.atomix.cluster.ClusterMetadataEvent;
import io.atomix.cluster.ClusterMetadataEventListener;
import io.atomix.cluster.ClusterMetadataService;
import io.atomix.cluster.ClusterService;
import io.atomix.cluster.ManagedClusterService;
import io.atomix.cluster.Node;
import io.atomix.cluster.NodeId;
import io.atomix.cluster.impl.ClusterHeartbeat;
import io.atomix.cluster.impl.DefaultClusterMetadataService;
import io.atomix.cluster.impl.PhiAccrualFailureDetector;
import io.atomix.cluster.impl.StatefulNode;
import io.atomix.messaging.Endpoint;
import io.atomix.messaging.MessagingService;
import io.atomix.utils.concurrent.Threads;
import io.atomix.utils.event.AbstractListenerManager;
import io.atomix.utils.event.Event;
import io.atomix.utils.serializer.KryoNamespace;
import io.atomix.utils.serializer.KryoNamespaces;
import io.atomix.utils.serializer.Namespace;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultClusterService
extends AbstractListenerManager<ClusterEvent, ClusterEventListener>
implements ManagedClusterService {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultClusterService.class);
    private static final int DEFAULT_HEARTBEAT_INTERVAL = 100;
    private static final int DEFAULT_PHI_FAILURE_THRESHOLD = 10;
    private static final long DEFAULT_FAILURE_TIME = 1000L;
    private static final String HEARTBEAT_MESSAGE = "atomix-cluster-heartbeat";
    private int heartbeatInterval = 100;
    private int phiFailureThreshold = 10;
    private static final io.atomix.utils.serializer.Serializer SERIALIZER = io.atomix.utils.serializer.Serializer.using((Namespace)KryoNamespace.builder().register(KryoNamespaces.BASIC).nextId(500).register(new Class[]{NodeId.class}).register(new Class[]{Node.Type.class}).register(new Class[]{Node.State.class}).register(new Class[]{ClusterHeartbeat.class}).register(new Class[]{StatefulNode.class}).register((Serializer)new DefaultClusterMetadataService.EndpointSerializer(), new Class[]{Endpoint.class}).build("ClusterService"));
    private final MessagingService messagingService;
    private final ClusterMetadataService metadataService;
    private final AtomicBoolean started = new AtomicBoolean();
    private final StatefulNode localNode;
    private final Map<NodeId, StatefulNode> nodes = Maps.newConcurrentMap();
    private final Map<NodeId, PhiAccrualFailureDetector> failureDetectors = Maps.newConcurrentMap();
    private final ClusterMetadataEventListener metadataEventListener = this::handleMetadataEvent;
    private final ScheduledExecutorService heartbeatScheduler = Executors.newSingleThreadScheduledExecutor(Threads.namedThreads((String)"atomix-cluster-heartbeat-sender", (Logger)LOGGER));
    private final ExecutorService heartbeatExecutor = Executors.newSingleThreadExecutor(Threads.namedThreads((String)"atomix-cluster-heartbeat-receiver", (Logger)LOGGER));
    private ScheduledFuture<?> heartbeatFuture;

    public DefaultClusterService(Node localNode, ClusterMetadataService metadataService, MessagingService messagingService) {
        this.metadataService = (ClusterMetadataService)Preconditions.checkNotNull((Object)metadataService, (Object)"metadataService cannot be null");
        this.messagingService = (MessagingService)Preconditions.checkNotNull((Object)messagingService, (Object)"messagingService cannot be null");
        this.localNode = new StatefulNode(localNode.id(), localNode.type(), localNode.endpoint());
    }

    @Override
    public Node getLocalNode() {
        return this.localNode;
    }

    @Override
    public Set<Node> getNodes() {
        return ImmutableSet.copyOf((Collection)this.nodes.values().stream().filter(node -> node.type() == Node.Type.DATA || node.getState() == Node.State.ACTIVE).collect(Collectors.toList()));
    }

    @Override
    public Node getNode(NodeId nodeId) {
        Node node = this.nodes.get(nodeId);
        return node != null ? (node.type() == Node.Type.DATA || node.getState() == Node.State.ACTIVE ? node : null) : null;
    }

    private void sendHeartbeats() {
        try {
            Set<StatefulNode> peers = this.nodes.values().stream().filter(node -> !node.id().equals(this.getLocalNode().id())).collect(Collectors.toSet());
            byte[] payload = SERIALIZER.encode((Object)new ClusterHeartbeat(this.localNode.id(), this.localNode.type()));
            peers.forEach(node -> {
                this.sendHeartbeat(node.endpoint(), payload);
                PhiAccrualFailureDetector failureDetector = this.failureDetectors.computeIfAbsent(node.id(), n -> new PhiAccrualFailureDetector());
                double phi = failureDetector.phi();
                if (phi >= (double)this.phiFailureThreshold || System.currentTimeMillis() - failureDetector.lastUpdated() > 1000L) {
                    if (node.getState() == Node.State.ACTIVE) {
                        this.deactivateNode((StatefulNode)node);
                    }
                } else if (node.getState() == Node.State.INACTIVE) {
                    this.activateNode((StatefulNode)node);
                }
            });
        }
        catch (Exception e) {
            LOGGER.debug("Failed to send heartbeat", (Throwable)e);
        }
    }

    private void sendHeartbeat(Endpoint endpoint, byte[] payload) {
        this.messagingService.sendAndReceive(endpoint, HEARTBEAT_MESSAGE, payload).whenComplete((response, error) -> {
            if (error == null) {
                Collection nodes = (Collection)SERIALIZER.decode(response);
                boolean sendHeartbeats = false;
                for (StatefulNode node : nodes) {
                    if (this.nodes.putIfAbsent(node.id(), node) != null) continue;
                    this.post((Event)new ClusterEvent(ClusterEvent.Type.NODE_ADDED, node));
                    this.post((Event)new ClusterEvent(ClusterEvent.Type.NODE_ACTIVATED, node));
                    sendHeartbeats = true;
                }
                if (sendHeartbeats) {
                    this.sendHeartbeats();
                }
            } else {
                LOGGER.trace("Sending heartbeat to {} failed", (Object)endpoint, error);
            }
        });
    }

    private byte[] handleHeartbeat(Endpoint endpoint, byte[] message) {
        ClusterHeartbeat heartbeat = (ClusterHeartbeat)SERIALIZER.decode(message);
        this.failureDetectors.computeIfAbsent(heartbeat.nodeId(), n -> new PhiAccrualFailureDetector()).report();
        this.activateNode(new StatefulNode(heartbeat.nodeId(), heartbeat.nodeType(), endpoint));
        return SERIALIZER.encode(this.nodes.values().stream().filter(node -> node.type() == Node.Type.CLIENT).collect(Collectors.toList()));
    }

    private void activateNode(StatefulNode node) {
        StatefulNode existingNode = this.nodes.get(node.id());
        if (existingNode == null) {
            node.setState(Node.State.ACTIVE);
            this.nodes.put(node.id(), node);
            this.post((Event)new ClusterEvent(ClusterEvent.Type.NODE_ADDED, node));
            this.post((Event)new ClusterEvent(ClusterEvent.Type.NODE_ACTIVATED, node));
            this.sendHeartbeat(node.endpoint(), SERIALIZER.encode((Object)new ClusterHeartbeat(this.localNode.id(), this.localNode.type())));
        } else if (existingNode.getState() == Node.State.INACTIVE) {
            existingNode.setState(Node.State.ACTIVE);
            this.post((Event)new ClusterEvent(ClusterEvent.Type.NODE_ACTIVATED, existingNode));
        }
    }

    private void deactivateNode(StatefulNode node) {
        StatefulNode existingNode = this.nodes.get(node.id());
        if (existingNode != null && existingNode.getState() == Node.State.ACTIVE) {
            existingNode.setState(Node.State.INACTIVE);
            switch (existingNode.type()) {
                case DATA: {
                    this.post((Event)new ClusterEvent(ClusterEvent.Type.NODE_DEACTIVATED, existingNode));
                    break;
                }
                case CLIENT: {
                    this.post((Event)new ClusterEvent(ClusterEvent.Type.NODE_DEACTIVATED, existingNode));
                    this.post((Event)new ClusterEvent(ClusterEvent.Type.NODE_REMOVED, existingNode));
                    break;
                }
                default: {
                    throw new AssertionError();
                }
            }
        }
    }

    private void handleMetadataEvent(ClusterMetadataEvent event) {
        Set bootstrapNodes = ((ClusterMetadata)event.subject()).bootstrapNodes().stream().map(node -> {
            StatefulNode existingNode = this.nodes.get(node.id());
            if (existingNode == null) {
                StatefulNode newNode = new StatefulNode(node.id(), node.type(), node.endpoint());
                this.nodes.put(newNode.id(), newNode);
                this.post((Event)new ClusterEvent(ClusterEvent.Type.NODE_ADDED, newNode));
            }
            return node.id();
        }).collect(Collectors.toSet());
        Set dataNodes = this.nodes.entrySet().stream().filter(entry -> ((StatefulNode)entry.getValue()).type() == Node.Type.DATA).map(entry -> (NodeId)entry.getKey()).collect(Collectors.toSet());
        Sets.SetView missingNodes = Sets.difference(dataNodes, bootstrapNodes);
        for (NodeId nodeId : missingNodes) {
            StatefulNode existingNode = this.nodes.remove(nodeId);
            if (existingNode == null) continue;
            this.post((Event)new ClusterEvent(ClusterEvent.Type.NODE_REMOVED, existingNode));
        }
    }

    public CompletableFuture<ClusterService> start() {
        if (this.started.compareAndSet(false, true)) {
            this.metadataService.addListener(this.metadataEventListener);
            this.localNode.setState(Node.State.ACTIVE);
            this.nodes.put(this.localNode.id(), this.localNode);
            this.metadataService.getMetadata().bootstrapNodes().forEach(node -> this.nodes.putIfAbsent(node.id(), new StatefulNode(node.id(), node.type(), node.endpoint())));
            this.messagingService.registerHandler(HEARTBEAT_MESSAGE, this::handleHeartbeat, (Executor)this.heartbeatExecutor);
            this.heartbeatFuture = this.heartbeatScheduler.scheduleWithFixedDelay(this::sendHeartbeats, 0L, this.heartbeatInterval, TimeUnit.MILLISECONDS);
            LOGGER.info("Started");
        }
        return CompletableFuture.completedFuture(this);
    }

    public boolean isRunning() {
        return this.started.get();
    }

    public CompletableFuture<Void> stop() {
        if (this.started.compareAndSet(true, false)) {
            this.heartbeatScheduler.shutdownNow();
            this.heartbeatExecutor.shutdownNow();
            this.localNode.setState(Node.State.INACTIVE);
            this.nodes.clear();
            this.heartbeatFuture.cancel(true);
            this.messagingService.unregisterHandler(HEARTBEAT_MESSAGE);
            this.metadataService.removeListener(this.metadataEventListener);
            LOGGER.info("Stopped");
        }
        return CompletableFuture.completedFuture(null);
    }
}

