/*
 * Decompiled with CFR 0.152.
 */
package tr.com.infumia.infumialib.messaging;

import com.google.protobuf.GeneratedMessageV3;
import io.lettuce.core.codec.ByteArrayCodec;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.pubsub.RedisPubSubAdapter;
import io.lettuce.core.pubsub.RedisPubSubListener;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Optional;
import java.util.function.BiConsumer;
import org.jetbrains.annotations.NotNull;
import tr.com.infumia.infumialib.definition.Definition;
import tr.com.infumia.infumialib.kubernetes.Kubernetes;
import tr.com.infumia.infumialib.messaging.proto.Messaging;
import tr.com.infumia.infumialib.registries.Registry;

public interface Messenger {
    public boolean canReceive(byte @NotNull [] var1);

    default public <T extends GeneratedMessageV3> void publish(@NotNull String target, @NotNull T value) {
        this.publish(target, value, false);
    }

    public <T extends GeneratedMessageV3> void publish(@NotNull String var1, @NotNull T var2, boolean var3);

    public <T extends GeneratedMessageV3> void subscribe(@NotNull T var1, @NotNull BiConsumer<Messaging.ServerMessage, T> var2);

    public <T extends GeneratedMessageV3> void unsubscribe(@NotNull T var1);

    public static final class Subscription<T extends GeneratedMessageV3>
    implements Definition.Key<String> {
        @NotNull
        private final BiConsumer<Messaging.ServerMessage, T> consumer;
        @NotNull
        private final T template;

        private Subscription(@NotNull T template) {
            this((serverMessage, t) -> {}, (GeneratedMessageV3)template);
        }

        @Override
        @NotNull
        public String key() {
            return this.template.getClass().toString();
        }

        private void onMessage(@NotNull Messaging.ServerMessage serverMessage) {
            this.consumer.accept(serverMessage, (Messaging.ServerMessage)((GeneratedMessageV3)this.template.getParserForType().parseFrom(serverMessage.getData())));
        }

        private Subscription(@NotNull BiConsumer<Messaging.ServerMessage, T> consumer, @NotNull T template) {
            if (consumer == null) {
                throw new NullPointerException("consumer is marked non-null but is null");
            }
            if (template == null) {
                throw new NullPointerException("template is marked non-null but is null");
            }
            this.consumer = consumer;
            this.template = template;
        }
    }

    public static abstract class Base
    extends RedisPubSubAdapter<byte[], byte[]>
    implements Messenger {
        private static final byte[] ANY_CHANNEL = "*".getBytes(StandardCharsets.UTF_8);
        private final byte @NotNull [] channel;
        private final Registry<String, Subscription<?>> subscribes = new Registry();

        protected Base(byte @NotNull [] channel) {
            this.channel = (byte[])channel.clone();
            StatefulRedisPubSubConnection connection = Kubernetes.redisClient().connectPubSub((RedisCodec)ByteArrayCodec.INSTANCE);
            connection.addListener((RedisPubSubListener)this);
            connection.sync().subscribe((Object[])new byte[][]{channel});
        }

        protected Base(@NotNull String channel) {
            this(channel.getBytes(StandardCharsets.UTF_8));
        }

        public final void message(byte[] channel, byte[] message) {
            if (!this.canReceive(channel)) {
                return;
            }
            try {
                Messaging.ServerMessage serverMessage = Messaging.ServerMessage.parseFrom(message);
                Optional<Subscription<?>> optional = this.subscribes.get(serverMessage.getType());
                if (optional.isEmpty()) {
                    return;
                }
                optional.get().onMessage(serverMessage);
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }

        @Override
        public boolean canReceive(byte @NotNull [] channel) {
            return Arrays.equals(channel, ANY_CHANNEL) || Arrays.equals(channel, this.channel);
        }

        @Override
        public final <T extends GeneratedMessageV3> void publish(@NotNull String target, @NotNull T value, boolean async) {
            Messaging.ServerMessage message = Messaging.ServerMessage.newBuilder().setType(value.getClass().toString()).setTarget(target).setData(value.toByteString()).build();
            StatefulRedisPubSubConnection connection = Kubernetes.redisClient().connectPubSub((RedisCodec)ByteArrayCodec.INSTANCE);
            byte[] targetBytes = target.getBytes(StandardCharsets.UTF_8);
            if (async) {
                connection.async().publish((Object)targetBytes, (Object)message.toByteArray());
            } else {
                connection.sync().publish((Object)targetBytes, (Object)message.toByteArray());
            }
        }

        @Override
        public final <T extends GeneratedMessageV3> void subscribe(@NotNull T template, @NotNull BiConsumer<Messaging.ServerMessage, T> coming) {
            this.subscribes.register(new Subscription<T>(coming, template));
        }

        @Override
        public final <T extends GeneratedMessageV3> void unsubscribe(@NotNull T template) {
            this.subscribes.unregister(new Subscription<T>(template));
        }
    }
}

