/*
 * Decompiled with CFR 0.152.
 */
package io.atomix.cluster.discovery;

import com.esotericsoftware.kryo.Serializer;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import io.atomix.cluster.BootstrapService;
import io.atomix.cluster.Node;
import io.atomix.cluster.NodeId;
import io.atomix.cluster.discovery.MulticastDiscoveryBuilder;
import io.atomix.cluster.discovery.MulticastDiscoveryConfig;
import io.atomix.cluster.discovery.NodeDiscoveryEvent;
import io.atomix.cluster.discovery.NodeDiscoveryEventListener;
import io.atomix.cluster.discovery.NodeDiscoveryProvider;
import io.atomix.cluster.impl.AddressSerializer;
import io.atomix.cluster.impl.PhiAccrualFailureDetector;
import io.atomix.utils.concurrent.Threads;
import io.atomix.utils.event.AbstractListenerManager;
import io.atomix.utils.event.Event;
import io.atomix.utils.net.Address;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MulticastDiscoveryProvider
extends AbstractListenerManager<NodeDiscoveryEvent, NodeDiscoveryEventListener>
implements NodeDiscoveryProvider {
    public static final Type TYPE = new Type();
    private static final Logger LOGGER = LoggerFactory.getLogger(MulticastDiscoveryProvider.class);
    private static final io.atomix.utils.serializer.Serializer SERIALIZER = io.atomix.utils.serializer.Serializer.builder().addType(Node.class).addType(NodeId.class).addSerializer((Serializer)new AddressSerializer(), new Class[]{Address.class}).build();
    private static final String DISCOVERY_SUBJECT = "atomix-discovery";
    private final MulticastDiscoveryConfig config;
    private volatile BootstrapService bootstrap;
    private final ScheduledExecutorService broadcastScheduler = Executors.newSingleThreadScheduledExecutor(Threads.namedThreads((String)"atomix-cluster-broadcast", (Logger)LOGGER));
    private volatile ScheduledFuture<?> broadcastFuture;
    private final Consumer<byte[]> broadcastListener = message -> this.broadcastScheduler.execute(() -> this.handleBroadcastMessage((byte[])message));
    private final Map<Address, Node> nodes = Maps.newConcurrentMap();
    private final Map<NodeId, PhiAccrualFailureDetector> failureDetectors = Maps.newConcurrentMap();
    private volatile ScheduledFuture<?> failureFuture;

    public static MulticastDiscoveryBuilder builder() {
        return new MulticastDiscoveryBuilder();
    }

    public MulticastDiscoveryProvider() {
        this(new MulticastDiscoveryConfig());
    }

    public MulticastDiscoveryProvider(MulticastDiscoveryConfig config) {
        this.config = (MulticastDiscoveryConfig)Preconditions.checkNotNull((Object)config);
    }

    public MulticastDiscoveryConfig config() {
        return this.config;
    }

    @Override
    public Set<Node> getNodes() {
        return ImmutableSet.copyOf(this.nodes.values());
    }

    private void handleBroadcastMessage(byte[] message) {
        Node node = (Node)SERIALIZER.decode(message);
        Node oldNode = this.nodes.put(node.address(), node);
        if (oldNode != null && !oldNode.id().equals(node.id())) {
            this.post((Event)new NodeDiscoveryEvent(NodeDiscoveryEvent.Type.LEAVE, oldNode));
            this.post((Event)new NodeDiscoveryEvent(NodeDiscoveryEvent.Type.JOIN, node));
        } else if (oldNode == null) {
            this.post((Event)new NodeDiscoveryEvent(NodeDiscoveryEvent.Type.JOIN, node));
        }
        this.failureDetectors.computeIfAbsent(node.id(), id -> new PhiAccrualFailureDetector()).report();
    }

    private void broadcastNode(Node localNode) {
        this.bootstrap.getBroadcastService().broadcast(DISCOVERY_SUBJECT, SERIALIZER.encode((Object)localNode));
    }

    private void detectFailures(Node localNode) {
        this.nodes.values().stream().filter(node -> !node.address().equals((Object)localNode.address())).forEach(this::detectFailure);
    }

    private void detectFailure(Node node) {
        PhiAccrualFailureDetector failureDetector = this.failureDetectors.computeIfAbsent(node.id(), n -> new PhiAccrualFailureDetector());
        double phi = failureDetector.phi();
        if (phi >= (double)this.config.getFailureThreshold() || phi == 0.0 && System.currentTimeMillis() - failureDetector.lastUpdated() > this.config.getFailureTimeout().toMillis()) {
            LOGGER.info("Lost contact with {}", (Object)node);
            this.nodes.remove(node.address());
            this.failureDetectors.remove(node.id());
            this.post((Event)new NodeDiscoveryEvent(NodeDiscoveryEvent.Type.LEAVE, node));
        }
    }

    @Override
    public CompletableFuture<Void> join(BootstrapService bootstrap, Node localNode) {
        if (this.nodes.putIfAbsent(localNode.address(), localNode) == null) {
            this.bootstrap = bootstrap;
            this.post((Event)new NodeDiscoveryEvent(NodeDiscoveryEvent.Type.JOIN, localNode));
            bootstrap.getBroadcastService().addListener(DISCOVERY_SUBJECT, this.broadcastListener);
            this.broadcastFuture = this.broadcastScheduler.scheduleAtFixedRate(() -> this.broadcastNode(localNode), this.config.getBroadcastInterval().toMillis(), this.config.getBroadcastInterval().toMillis(), TimeUnit.MILLISECONDS);
            this.failureFuture = this.broadcastScheduler.scheduleAtFixedRate(() -> this.detectFailures(localNode), this.config.getBroadcastInterval().toMillis() / 2L, this.config.getBroadcastInterval().toMillis() / 2L, TimeUnit.MILLISECONDS);
            this.broadcastNode(localNode);
            LOGGER.info("Joined");
        }
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public CompletableFuture<Void> leave(Node localNode) {
        if (this.nodes.remove(localNode.address()) != null) {
            ScheduledFuture<?> failureFuture;
            this.post((Event)new NodeDiscoveryEvent(NodeDiscoveryEvent.Type.LEAVE, localNode));
            this.bootstrap.getBroadcastService().removeListener(DISCOVERY_SUBJECT, this.broadcastListener);
            ScheduledFuture<?> broadcastFuture = this.broadcastFuture;
            if (broadcastFuture != null) {
                broadcastFuture.cancel(false);
            }
            if ((failureFuture = this.failureFuture) != null) {
                failureFuture.cancel(false);
            }
            this.broadcastScheduler.shutdownNow();
            LOGGER.info("Left");
        }
        return CompletableFuture.completedFuture(null);
    }

    public static class Type
    implements NodeDiscoveryProvider.Type<MulticastDiscoveryConfig> {
        private static final String NAME = "multicast";

        public String name() {
            return NAME;
        }

        public MulticastDiscoveryConfig newConfig() {
            return new MulticastDiscoveryConfig();
        }

        @Override
        public NodeDiscoveryProvider newProvider(MulticastDiscoveryConfig config) {
            return new MulticastDiscoveryProvider(config);
        }
    }
}

