/*
 * Decompiled with CFR 0.152.
 */
package io.axoniq.axonserver.connector.impl;

import io.axoniq.axonserver.connector.InstructionHandler;
import io.axoniq.axonserver.connector.Registration;
import io.axoniq.axonserver.connector.ReplyChannel;
import io.axoniq.axonserver.connector.control.ControlChannel;
import io.axoniq.axonserver.connector.control.ProcessorInstructionHandler;
import io.axoniq.axonserver.connector.impl.AbstractAxonServerChannel;
import io.axoniq.axonserver.connector.impl.AbstractIncomingInstructionStream;
import io.axoniq.axonserver.connector.impl.AxonServerManagedChannel;
import io.axoniq.axonserver.connector.impl.HeartbeatMonitor;
import io.axoniq.axonserver.connector.impl.ObjectUtils;
import io.axoniq.axonserver.connector.impl.ProcessorInstructions;
import io.axoniq.axonserver.connector.impl.SyncRegistration;
import io.axoniq.axonserver.grpc.FlowControl;
import io.axoniq.axonserver.grpc.InstructionAck;
import io.axoniq.axonserver.grpc.InstructionResult;
import io.axoniq.axonserver.grpc.control.ClientIdentification;
import io.axoniq.axonserver.grpc.control.EventProcessorInfo;
import io.axoniq.axonserver.grpc.control.Heartbeat;
import io.axoniq.axonserver.grpc.control.PlatformInboundInstruction;
import io.axoniq.axonserver.grpc.control.PlatformOutboundInstruction;
import io.axoniq.axonserver.grpc.control.PlatformServiceGrpc;
import io.axoniq.axonserver.grpc.control.SubscribeTopologyChanges;
import io.axoniq.axonserver.grpc.control.TopologyChange;
import io.axoniq.axonserver.grpc.control.UnsubscribeTopologyChanges;
import io.axoniq.axonserver.grpc.control.UpdateType;
import io.grpc.Channel;
import io.grpc.Status;
import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.util.Collection;
import java.util.EnumMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ControlChannelImpl
extends AbstractAxonServerChannel<PlatformInboundInstruction>
implements ControlChannel {
    private static final Logger logger = LoggerFactory.getLogger(ControlChannelImpl.class);
    private static final TopologyChange RESET_ALL = TopologyChange.newBuilder().setUpdateType(UpdateType.RESET_ALL).build();
    private final ClientIdentification clientIdentification;
    private final ScheduledExecutorService executor;
    private final long processorInfoUpdateFrequency;
    private final Runnable reconnectHandler;
    private final AtomicReference<CallStreamObserver<PlatformInboundInstruction>> instructionDispatcher = new AtomicReference();
    private final Map<PlatformOutboundInstruction.RequestCase, InstructionHandler<PlatformOutboundInstruction, PlatformInboundInstruction>> instructionHandlers = new EnumMap<PlatformOutboundInstruction.RequestCase, InstructionHandler<PlatformOutboundInstruction, PlatformInboundInstruction>>(PlatformOutboundInstruction.RequestCase.class);
    private final HeartbeatMonitor heartbeatMonitor;
    private final String context;
    private final Map<String, ProcessorInstructionHandler> processorInstructionHandlers = new ConcurrentHashMap<String, ProcessorInstructionHandler>();
    private final Map<String, Supplier<EventProcessorInfo>> processorInfoSuppliers = new ConcurrentHashMap<String, Supplier<EventProcessorInfo>>();
    private final AtomicBoolean infoSupplierActive = new AtomicBoolean();
    private final PlatformServiceGrpc.PlatformServiceStub platformServiceStub;
    private final AxonServerManagedChannel channel;
    private final Set<Consumer<TopologyChange>> topologyChangeListeners = ConcurrentHashMap.newKeySet();

    public ControlChannelImpl(ClientIdentification clientIdentification, String context, ScheduledExecutorService executor, AxonServerManagedChannel channel, long processorInfoUpdateFrequency, Runnable reconnectHandler) {
        super(clientIdentification, executor, channel);
        this.channel = channel;
        this.clientIdentification = clientIdentification;
        this.context = context;
        this.executor = executor;
        this.processorInfoUpdateFrequency = processorInfoUpdateFrequency;
        this.reconnectHandler = reconnectHandler;
        this.heartbeatMonitor = new HeartbeatMonitor(executor, this::sendHeartBeat, channel::requestReconnect);
        this.instructionHandlers.put(PlatformOutboundInstruction.RequestCase.ACK, this::handleAck);
        this.instructionHandlers.put(PlatformOutboundInstruction.RequestCase.HEARTBEAT, (msg, reply) -> this.heartbeatMonitor.handleIncomingBeat(reply));
        this.instructionHandlers.put(PlatformOutboundInstruction.RequestCase.MERGE_EVENT_PROCESSOR_SEGMENT, ProcessorInstructions.mergeHandler(this.processorInstructionHandlers));
        this.instructionHandlers.put(PlatformOutboundInstruction.RequestCase.SPLIT_EVENT_PROCESSOR_SEGMENT, ProcessorInstructions.splitHandler(this.processorInstructionHandlers));
        this.instructionHandlers.put(PlatformOutboundInstruction.RequestCase.START_EVENT_PROCESSOR, ProcessorInstructions.startHandler(this.processorInstructionHandlers));
        this.instructionHandlers.put(PlatformOutboundInstruction.RequestCase.PAUSE_EVENT_PROCESSOR, ProcessorInstructions.pauseHandler(this.processorInstructionHandlers));
        this.instructionHandlers.put(PlatformOutboundInstruction.RequestCase.RELEASE_SEGMENT, ProcessorInstructions.releaseSegmentHandler(this.processorInstructionHandlers));
        this.instructionHandlers.put(PlatformOutboundInstruction.RequestCase.REQUEST_EVENT_PROCESSOR_INFO, ProcessorInstructions.requestInfoHandler(this.processorInfoSuppliers));
        this.instructionHandlers.put(PlatformOutboundInstruction.RequestCase.REQUEST_RECONNECT, this::handleReconnectRequest);
        this.instructionHandlers.put(PlatformOutboundInstruction.RequestCase.TOPOLOGY_CHANGE, this::handleTopologyChange);
        this.platformServiceStub = PlatformServiceGrpc.newStub((Channel)channel);
    }

    private void handleTopologyChange(PlatformOutboundInstruction platformOutboundInstruction, ReplyChannel<PlatformInboundInstruction> platformInboundInstructionReplyChannel) {
        this.invokeTopologyChangeListeners(platformOutboundInstruction.getTopologyChange());
    }

    private void invokeTopologyChangeListeners(TopologyChange platformOutboundInstruction) {
        this.topologyChangeListeners.forEach(listener -> {
            try {
                listener.accept(platformOutboundInstruction);
            }
            catch (Exception e) {
                logger.warn("Error invoking topology change listener for context '{}': {}", (Object)this.context, (Object)e.getMessage());
            }
        });
    }

    private void handleAck(PlatformOutboundInstruction instruction, ReplyChannel<PlatformInboundInstruction> replyChannel) {
        this.processAck(instruction.getAck());
        replyChannel.complete();
    }

    private CompletableFuture<Void> sendHeartBeat() {
        if (!this.isReady()) {
            return CompletableFuture.completedFuture(null);
        }
        PlatformInboundInstruction heartbeatMessage = PlatformInboundInstruction.newBuilder().setInstructionId(UUID.randomUUID().toString()).setHeartbeat(Heartbeat.getDefaultInstance()).build();
        return this.sendInstruction(heartbeatMessage);
    }

    void handleReconnectRequest(PlatformOutboundInstruction platformOutboundInstruction, ReplyChannel<PlatformInboundInstruction> replyChannel) {
        logger.info("AxonServer requested reconnect for context '{}'", (Object)this.context);
        this.reconnectHandler.run();
    }

    @Override
    public synchronized void connect() {
        StreamObserver existing = (StreamObserver)this.instructionDispatcher.get();
        if (existing != null) {
            logger.info("ControlChannel for context '{}' is already connected", (Object)this.context);
        } else {
            PlatformOutboundInstructionHandler responseObserver = new PlatformOutboundInstructionHandler(this.clientIdentification.getClientId(), this::handleDisconnect, this::registerOutboundStream);
            logger.debug("Opening instruction stream for context '{}'", (Object)this.context);
            this.platformServiceStub.openStream((StreamObserver<PlatformOutboundInstruction>)responseObserver);
            ClientCallStreamObserver instructionsForPlatform = responseObserver.getInstructionsForPlatform();
            try {
                logger.info("Connected instruction stream for context '{}'. Sending client identification", (Object)this.context);
                instructionsForPlatform.onNext((Object)PlatformInboundInstruction.newBuilder().setRegister(this.clientIdentification).build());
                this.heartbeatMonitor.resume();
                if (!this.topologyChangeListeners.isEmpty()) {
                    this.subscribeTopologyChanges();
                }
            }
            catch (Exception e) {
                this.instructionDispatcher.set(null);
                instructionsForPlatform.onError((Throwable)e);
            }
        }
    }

    private void registerOutboundStream(CallStreamObserver<PlatformInboundInstruction> upstream) {
        StreamObserver previous = (StreamObserver)this.instructionDispatcher.getAndSet(upstream);
        ObjectUtils.silently(previous, StreamObserver::onCompleted);
    }

    @Override
    public synchronized void reconnect() {
        ObjectUtils.doIfNotNull(this.instructionDispatcher.getAndSet(null), StreamObserver::onCompleted);
        this.scheduleImmediateReconnect();
    }

    private void handleDisconnect(Throwable cause) {
        this.heartbeatMonitor.pause();
        Status status = Status.fromThrowable((Throwable)cause);
        if (status == Status.UNAVAILABLE && this.channel.isReady()) {
            logger.info("Upstream unavailable. Forcing new connection.");
            this.reconnectHandler.run();
        }
        this.invokeTopologyChangeListeners(RESET_ALL);
        this.scheduleReconnect(cause);
    }

    @Override
    public synchronized void disconnect() {
        this.heartbeatMonitor.disableHeartbeat();
        StreamObserver dispatcher = this.instructionDispatcher.getAndSet(null);
        if (dispatcher != null) {
            dispatcher.onCompleted();
        }
    }

    @Override
    public Registration registerInstructionHandler(PlatformOutboundInstruction.RequestCase type, InstructionHandler<PlatformOutboundInstruction, PlatformInboundInstruction> handler) {
        this.instructionHandlers.put(type, handler);
        return new SyncRegistration(() -> this.instructionHandlers.remove((Object)type, handler));
    }

    @Override
    public Registration registerTopologyChangeHandler(Consumer<TopologyChange> handler) {
        boolean first = this.topologyChangeListeners.isEmpty();
        this.topologyChangeListeners.add(handler);
        if (first) {
            this.subscribeTopologyChanges();
        }
        return new SyncRegistration(() -> {
            this.topologyChangeListeners.remove(handler);
            this.sendInstruction(PlatformInboundInstruction.newBuilder().setUnsubscribeTopologyChanges(UnsubscribeTopologyChanges.getDefaultInstance()).build()).exceptionally(ex -> {
                logger.warn("Failed to unsubscribe from topology changes for context '{}': {}", (Object)this.context, (Object)ex.getMessage());
                return null;
            });
        });
    }

    private void subscribeTopologyChanges() {
        this.sendInstruction(PlatformInboundInstruction.newBuilder().setSubscribeTopologyChanges(SubscribeTopologyChanges.getDefaultInstance()).build()).exceptionally(ex -> {
            logger.warn("Failed to subscribe to topology changes for context '{}': {}", (Object)this.context, (Object)ex.getMessage());
            return null;
        });
    }

    @Override
    public Registration registerEventProcessor(String processorName, Supplier<EventProcessorInfo> infoSupplier, ProcessorInstructionHandler instructionHandler) {
        this.processorInstructionHandlers.put(processorName, instructionHandler);
        this.processorInfoSuppliers.put(processorName, infoSupplier);
        if (this.infoSupplierActive.compareAndSet(false, true)) {
            this.sendScheduledProcessorInfo();
        }
        return new SyncRegistration(() -> {
            this.processorInstructionHandlers.remove(processorName, instructionHandler);
            this.processorInfoSuppliers.remove(processorName, infoSupplier);
        });
    }

    private void sendScheduledProcessorInfo() {
        Collection<Supplier<EventProcessorInfo>> infoSupplies = this.processorInfoSuppliers.values();
        if (infoSupplies.isEmpty()) {
            this.infoSupplierActive.set(false);
            if (!this.processorInfoSuppliers.isEmpty() && this.infoSupplierActive.compareAndSet(false, true)) {
                this.sendScheduledProcessorInfo();
            }
        } else {
            CallStreamObserver<PlatformInboundInstruction> outbound = this.instructionDispatcher.get();
            if (outbound != null && outbound.isReady()) {
                infoSupplies.forEach(info -> ObjectUtils.doIfNotNull(info.get(), this::sendProcessorInfo));
            } else {
                logger.debug("Not sending processor info for context '{}'. Channel not ready...", (Object)this.context);
            }
            this.executor.schedule(this::sendScheduledProcessorInfo, this.processorInfoUpdateFrequency, TimeUnit.MILLISECONDS);
        }
    }

    private CompletableFuture<Void> sendProcessorInfo(EventProcessorInfo processorInfo) {
        return this.sendInstruction(PlatformInboundInstruction.newBuilder().setEventProcessorInfo(processorInfo).build());
    }

    @Override
    public void enableHeartbeat(long interval, long timeout, TimeUnit timeUnit) {
        this.heartbeatMonitor.enableHeartbeat(interval, timeout, timeUnit);
    }

    @Override
    public void disableHeartbeat() {
        this.heartbeatMonitor.disableHeartbeat();
    }

    @Override
    public CompletableFuture<Void> sendInstruction(PlatformInboundInstruction instruction) {
        return this.sendInstruction(instruction, PlatformInboundInstruction::getInstructionId, (StreamObserver)this.instructionDispatcher.get());
    }

    @Override
    public boolean isReady() {
        return this.instructionDispatcher.get() != null;
    }

    private class PlatformOutboundInstructionHandler
    extends AbstractIncomingInstructionStream<PlatformOutboundInstruction, PlatformInboundInstruction> {
        public PlatformOutboundInstructionHandler(String clientId, Consumer<Throwable> disconnectHandler, Consumer<CallStreamObserver<PlatformInboundInstruction>> beforeStartHandler) {
            super(clientId, 64, 8, disconnectHandler, beforeStartHandler);
        }

        @Override
        protected PlatformInboundInstruction buildAckMessage(InstructionAck ack) {
            return PlatformInboundInstruction.newBuilder().setAck(ack).build();
        }

        @Override
        protected Optional<PlatformInboundInstruction> buildResultMessage(InstructionResult result) {
            return Optional.of(PlatformInboundInstruction.newBuilder().setResult(result).build());
        }

        @Override
        protected String getInstructionId(PlatformOutboundInstruction instruction) {
            return instruction.getInstructionId();
        }

        @Override
        protected InstructionHandler<PlatformOutboundInstruction, PlatformInboundInstruction> getHandler(PlatformOutboundInstruction platformOutboundInstruction) {
            return (InstructionHandler)ControlChannelImpl.this.instructionHandlers.get((Object)platformOutboundInstruction.getRequestCase());
        }

        @Override
        protected boolean unregisterOutboundStream(CallStreamObserver<PlatformInboundInstruction> expected) {
            return ControlChannelImpl.this.instructionDispatcher.compareAndSet(expected, null);
        }

        @Override
        protected PlatformInboundInstruction buildFlowControlMessage(FlowControl flowControl) {
            return null;
        }
    }
}

