/*
 * Decompiled with CFR 0.152.
 */
package io.atomix.cluster.messaging.impl;

import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import io.atomix.cluster.ClusterService;
import io.atomix.cluster.Node;
import io.atomix.cluster.NodeId;
import io.atomix.cluster.messaging.ClusterMessagingService;
import io.atomix.cluster.messaging.ManagedClusterMessagingService;
import io.atomix.messaging.MessagingService;
import io.atomix.utils.concurrent.Futures;
import io.atomix.utils.net.Address;
import java.net.ConnectException;
import java.time.Duration;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultClusterMessagingService
implements ManagedClusterMessagingService {
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private static final Exception CONNECT_EXCEPTION = new ConnectException();
    protected final ClusterService cluster;
    protected final MessagingService messagingService;
    private final AtomicBoolean started = new AtomicBoolean();

    public DefaultClusterMessagingService(ClusterService cluster, MessagingService messagingService) {
        this.cluster = (ClusterService)Preconditions.checkNotNull((Object)cluster, (Object)"clusterService cannot be null");
        this.messagingService = (MessagingService)Preconditions.checkNotNull((Object)messagingService, (Object)"messagingService cannot be null");
    }

    @Override
    public <M> void broadcast(String subject, M message, Function<M, byte[]> encoder) {
        this.multicast(subject, message, encoder, this.cluster.getNodes().stream().filter(node -> !Objects.equal((Object)node, (Object)this.cluster.getLocalNode())).map(Node::id).collect(Collectors.toSet()));
    }

    @Override
    public <M> void broadcastIncludeSelf(String subject, M message, Function<M, byte[]> encoder) {
        this.multicast(subject, message, encoder, this.cluster.getNodes().stream().map(Node::id).collect(Collectors.toSet()));
    }

    @Override
    public <M> CompletableFuture<Void> unicast(String subject, M message, Function<M, byte[]> encoder, NodeId toNodeId) {
        try {
            return this.doUnicast(subject, encoder.apply(message), toNodeId);
        }
        catch (Exception e) {
            return Futures.exceptionalFuture((Throwable)e);
        }
    }

    @Override
    public <M> void multicast(String subject, M message, Function<M, byte[]> encoder, Set<NodeId> nodes) {
        byte[] payload = encoder.apply(message);
        nodes.forEach(nodeId -> this.doUnicast(subject, payload, (NodeId)nodeId));
    }

    @Override
    public <M, R> CompletableFuture<R> send(String subject, M message, Function<M, byte[]> encoder, Function<byte[], R> decoder, NodeId toNodeId, Duration timeout) {
        try {
            return this.sendAndReceive(subject, encoder.apply(message), toNodeId, timeout).thenApply(decoder);
        }
        catch (Exception e) {
            return Futures.exceptionalFuture((Throwable)e);
        }
    }

    private CompletableFuture<Void> doUnicast(String subject, byte[] payload, NodeId toNodeId) {
        Node node = this.cluster.getNode(toNodeId);
        if (node == null) {
            return Futures.exceptionalFuture((Throwable)CONNECT_EXCEPTION);
        }
        return this.messagingService.sendAsync(node.address(), subject, payload);
    }

    private CompletableFuture<byte[]> sendAndReceive(String subject, byte[] payload, NodeId toNodeId, Duration timeout) {
        Node node = this.cluster.getNode(toNodeId);
        if (node == null) {
            return Futures.exceptionalFuture((Throwable)CONNECT_EXCEPTION);
        }
        return this.messagingService.sendAndReceive(node.address(), subject, payload, timeout);
    }

    @Override
    public void unsubscribe(String subject) {
        this.messagingService.unregisterHandler(subject);
    }

    @Override
    public <M, R> CompletableFuture<Void> subscribe(String subject, Function<byte[], M> decoder, Function<M, R> handler, Function<R, byte[]> encoder, Executor executor) {
        this.messagingService.registerHandler(subject, new InternalMessageResponder<Object, R>(decoder, encoder, m -> {
            CompletableFuture responseFuture = new CompletableFuture();
            executor.execute(() -> {
                try {
                    responseFuture.complete(handler.apply(m));
                }
                catch (Exception e) {
                    responseFuture.completeExceptionally(e);
                }
            });
            return responseFuture;
        }));
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public <M, R> CompletableFuture<Void> subscribe(String subject, Function<byte[], M> decoder, Function<M, CompletableFuture<R>> handler, Function<R, byte[]> encoder) {
        this.messagingService.registerHandler(subject, new InternalMessageResponder<M, R>(decoder, encoder, handler));
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public <M> CompletableFuture<Void> subscribe(String subject, Function<byte[], M> decoder, Consumer<M> handler, Executor executor) {
        this.messagingService.registerHandler(subject, new InternalMessageConsumer<M>(decoder, handler), executor);
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public <M> CompletableFuture<Void> subscribe(String subject, Function<byte[], M> decoder, BiConsumer<Address, M> handler, Executor executor) {
        this.messagingService.registerHandler(subject, new InternalMessageBiConsumer<M>(decoder, handler), executor);
        return CompletableFuture.completedFuture(null);
    }

    public CompletableFuture<ClusterMessagingService> start() {
        if (this.started.compareAndSet(false, true)) {
            this.log.info("Started");
        }
        return CompletableFuture.completedFuture(this);
    }

    public boolean isRunning() {
        return this.started.get();
    }

    public CompletableFuture<Void> stop() {
        if (this.started.compareAndSet(true, false)) {
            this.log.info("Stopped");
        }
        return CompletableFuture.completedFuture(null);
    }

    static {
        CONNECT_EXCEPTION.setStackTrace(new StackTraceElement[0]);
    }

    private static class InternalMessageConsumer<M>
    implements BiConsumer<Address, byte[]> {
        private final Function<byte[], M> decoder;
        private final Consumer<M> consumer;

        InternalMessageConsumer(Function<byte[], M> decoder, Consumer<M> consumer) {
            this.decoder = decoder;
            this.consumer = consumer;
        }

        @Override
        public void accept(Address sender, byte[] bytes) {
            this.consumer.accept(this.decoder.apply(bytes));
        }
    }

    private static class InternalMessageBiConsumer<M>
    implements BiConsumer<Address, byte[]> {
        private final Function<byte[], M> decoder;
        private final BiConsumer<Address, M> consumer;

        InternalMessageBiConsumer(Function<byte[], M> decoder, BiConsumer<Address, M> consumer) {
            this.decoder = decoder;
            this.consumer = consumer;
        }

        @Override
        public void accept(Address sender, byte[] bytes) {
            this.consumer.accept(sender, this.decoder.apply(bytes));
        }
    }

    private static class InternalMessageResponder<M, R>
    implements BiFunction<Address, byte[], CompletableFuture<byte[]>> {
        private final Function<byte[], M> decoder;
        private final Function<R, byte[]> encoder;
        private final Function<M, CompletableFuture<R>> handler;

        InternalMessageResponder(Function<byte[], M> decoder, Function<R, byte[]> encoder, Function<M, CompletableFuture<R>> handler) {
            this.decoder = decoder;
            this.encoder = encoder;
            this.handler = handler;
        }

        @Override
        public CompletableFuture<byte[]> apply(Address sender, byte[] bytes) {
            return this.handler.apply(this.decoder.apply(bytes)).thenApply(this.encoder);
        }
    }
}

