/*
 * Decompiled with CFR 0.152.
 */
package io.axoniq.axonserver.connector.impl;

import io.axoniq.axonserver.connector.AxonServerException;
import io.axoniq.axonserver.connector.ErrorCategory;
import io.axoniq.axonserver.connector.InstructionHandler;
import io.axoniq.axonserver.connector.Registration;
import io.axoniq.axonserver.connector.ReplyChannel;
import io.axoniq.axonserver.connector.control.ControlChannel;
import io.axoniq.axonserver.connector.control.ProcessorInstructionHandler;
import io.axoniq.axonserver.connector.impl.AbstractAxonServerChannel;
import io.axoniq.axonserver.connector.impl.AbstractIncomingInstructionStream;
import io.axoniq.axonserver.connector.impl.AxonServerManagedChannel;
import io.axoniq.axonserver.connector.impl.HeartbeatMonitor;
import io.axoniq.axonserver.connector.impl.ObjectUtils;
import io.axoniq.axonserver.connector.impl.ProcessorInstructions;
import io.axoniq.axonserver.grpc.FlowControl;
import io.axoniq.axonserver.grpc.InstructionAck;
import io.axoniq.axonserver.grpc.control.ClientIdentification;
import io.axoniq.axonserver.grpc.control.EventProcessorInfo;
import io.axoniq.axonserver.grpc.control.Heartbeat;
import io.axoniq.axonserver.grpc.control.PlatformInboundInstruction;
import io.axoniq.axonserver.grpc.control.PlatformOutboundInstruction;
import io.axoniq.axonserver.grpc.control.PlatformServiceGrpc;
import io.grpc.Channel;
import io.grpc.stub.StreamObserver;
import java.util.Collection;
import java.util.EnumMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ControlChannelImpl
extends AbstractAxonServerChannel
implements ControlChannel {
    private static final Logger logger = LoggerFactory.getLogger(ControlChannelImpl.class);
    private final ClientIdentification clientIdentification;
    private final ScheduledExecutorService executor;
    private final long processorUpdateFrequency;
    private final AtomicReference<StreamObserver<PlatformInboundInstruction>> instructionDispatcher = new AtomicReference();
    private final Map<PlatformOutboundInstruction.RequestCase, InstructionHandler<PlatformOutboundInstruction, PlatformInboundInstruction>> instructionHandlers = new EnumMap<PlatformOutboundInstruction.RequestCase, InstructionHandler<PlatformOutboundInstruction, PlatformInboundInstruction>>(PlatformOutboundInstruction.RequestCase.class);
    private final HeartbeatMonitor heartbeatMonitor;
    private final Map<String, CompletableFuture<InstructionAck>> awaitingAck = new ConcurrentHashMap<String, CompletableFuture<InstructionAck>>();
    private final String context;
    private final Map<String, ProcessorInstructionHandler> processorInstructionHandlers = new ConcurrentHashMap<String, ProcessorInstructionHandler>();
    private final Map<String, Supplier<EventProcessorInfo>> processorInfoSuppliers = new ConcurrentHashMap<String, Supplier<EventProcessorInfo>>();
    private final AtomicBoolean infoSupplierActive = new AtomicBoolean();
    private final PlatformServiceGrpc.PlatformServiceStub platformServiceStub;

    public ControlChannelImpl(ClientIdentification clientIdentification, String context, ScheduledExecutorService executor, AxonServerManagedChannel channel, long processorUpdateFrequency) {
        super(executor, channel);
        this.clientIdentification = clientIdentification;
        this.context = context;
        this.executor = executor;
        this.processorUpdateFrequency = processorUpdateFrequency;
        this.heartbeatMonitor = new HeartbeatMonitor(executor, this::sendHeartBeat, channel::forceReconnect);
        this.instructionHandlers.computeIfAbsent(PlatformOutboundInstruction.RequestCase.ACK, i -> new AckHandler());
        this.instructionHandlers.computeIfAbsent(PlatformOutboundInstruction.RequestCase.HEARTBEAT, i -> (msg, reply) -> this.heartbeatMonitor.handleIncomingBeat(reply));
        this.instructionHandlers.computeIfAbsent(PlatformOutboundInstruction.RequestCase.MERGE_EVENT_PROCESSOR_SEGMENT, i -> ProcessorInstructions.mergeHandler(this.processorInstructionHandlers));
        this.instructionHandlers.computeIfAbsent(PlatformOutboundInstruction.RequestCase.SPLIT_EVENT_PROCESSOR_SEGMENT, i -> ProcessorInstructions.splitHandler(this.processorInstructionHandlers));
        this.instructionHandlers.computeIfAbsent(PlatformOutboundInstruction.RequestCase.START_EVENT_PROCESSOR, i -> ProcessorInstructions.startHandler(this.processorInstructionHandlers));
        this.instructionHandlers.computeIfAbsent(PlatformOutboundInstruction.RequestCase.PAUSE_EVENT_PROCESSOR, i -> ProcessorInstructions.pauseHandler(this.processorInstructionHandlers));
        this.instructionHandlers.computeIfAbsent(PlatformOutboundInstruction.RequestCase.RELEASE_SEGMENT, i -> ProcessorInstructions.releaseSegmentHandler(this.processorInstructionHandlers));
        this.instructionHandlers.computeIfAbsent(PlatformOutboundInstruction.RequestCase.REQUEST_EVENT_PROCESSOR_INFO, i -> ProcessorInstructions.requestInfoHandler(this.processorInfoSuppliers));
        this.platformServiceStub = PlatformServiceGrpc.newStub((Channel)channel);
    }

    @Override
    public synchronized void connect() {
        StreamObserver<PlatformInboundInstruction> existing = this.instructionDispatcher.get();
        if (existing != null) {
            logger.info("Not connecting - connection already present");
        } else {
            PlatformOutboundInstructionHandler responseObserver = new PlatformOutboundInstructionHandler(this.clientIdentification.getClientId(), this::handleDisconnect);
            logger.debug("Opening instruction stream");
            this.platformServiceStub.openStream((StreamObserver<PlatformOutboundInstruction>)responseObserver);
            StreamObserver instructionsForPlatform = responseObserver.getInstructionsForPlatform();
            StreamObserver previous = this.instructionDispatcher.getAndSet(instructionsForPlatform);
            ObjectUtils.silently(previous, StreamObserver::onCompleted);
            try {
                logger.info("Connected instruction stream for context '{}'. Sending client identification", (Object)this.context);
                instructionsForPlatform.onNext((Object)PlatformInboundInstruction.newBuilder().setRegister(this.clientIdentification).build());
                this.heartbeatMonitor.resume();
            }
            catch (Exception e) {
                this.instructionDispatcher.set(null);
                instructionsForPlatform.onError((Throwable)e);
            }
        }
    }

    private void handleDisconnect(Throwable cause) {
        this.failOpenInstructions(cause);
        this.scheduleReconnect();
    }

    private void failOpenInstructions(Throwable cause) {
        while (!this.awaitingAck.isEmpty()) {
            this.awaitingAck.keySet().forEach(k -> ObjectUtils.doIfNotNull(this.awaitingAck.remove(k), cf -> cf.completeExceptionally(cause)));
        }
    }

    @Override
    public void disconnect() {
        this.heartbeatMonitor.disableHeartbeat();
        StreamObserver dispatcher = this.instructionDispatcher.getAndSet(null);
        if (dispatcher != null) {
            dispatcher.onCompleted();
        }
    }

    public Registration registerInstructionHandler(PlatformOutboundInstruction.RequestCase type, InstructionHandler handler) {
        this.instructionHandlers.put(type, handler);
        return () -> this.instructionHandlers.remove((Object)type, handler);
    }

    @Override
    public Registration registerEventProcessor(String processorName, Supplier<EventProcessorInfo> infoSupplier, ProcessorInstructionHandler instructionHandler) {
        this.processorInstructionHandlers.put(processorName, instructionHandler);
        this.processorInfoSuppliers.put(processorName, infoSupplier);
        if (this.infoSupplierActive.compareAndSet(false, true)) {
            this.sendScheduledProcessorInfo();
        }
        return () -> {
            this.processorInstructionHandlers.remove(processorName, instructionHandler);
            this.processorInfoSuppliers.remove(processorName, infoSupplier);
        };
    }

    private void sendScheduledProcessorInfo() {
        Collection<Supplier<EventProcessorInfo>> infoSupplies = this.processorInfoSuppliers.values();
        if (infoSupplies.isEmpty()) {
            this.infoSupplierActive.set(false);
            if (!this.processorInfoSuppliers.isEmpty() && this.infoSupplierActive.compareAndSet(false, true)) {
                this.sendScheduledProcessorInfo();
            }
        } else {
            infoSupplies.forEach(info -> ObjectUtils.doIfNotNull(info.get(), this::sendProcessorInfo));
            this.executor.schedule(this::sendScheduledProcessorInfo, this.processorUpdateFrequency, TimeUnit.MILLISECONDS);
        }
    }

    @Override
    public void enableHeartbeat(long interval, long timeout, TimeUnit timeUnit) {
        this.heartbeatMonitor.enableHeartbeat(interval, timeout, timeUnit);
    }

    @Override
    public void disableHeartbeat() {
        this.heartbeatMonitor.disableHeartbeat();
    }

    public CompletableFuture<InstructionAck> sendProcessorInfo(EventProcessorInfo processorInfo) {
        return this.sendInstruction(PlatformInboundInstruction.newBuilder().setEventProcessorInfo(processorInfo).build());
    }

    public CompletableFuture<InstructionAck> sendHeartBeat() {
        if (!this.isConnected()) {
            return CompletableFuture.completedFuture(null);
        }
        return this.sendInstruction(PlatformInboundInstruction.newBuilder().setInstructionId(UUID.randomUUID().toString()).setHeartbeat(Heartbeat.getDefaultInstance()).build());
    }

    @Override
    public CompletableFuture<InstructionAck> sendInstruction(PlatformInboundInstruction instruction) {
        StreamObserver<PlatformInboundInstruction> dispatcher;
        CompletableFuture<InstructionAck> result = new CompletableFuture<InstructionAck>();
        String instructionId = instruction.getInstructionId();
        if (logger.isDebugEnabled()) {
            logger.debug("Sending instruction: {} {}", (Object)instruction.getRequestCase().name(), (Object)instructionId);
        }
        if ((dispatcher = this.instructionDispatcher.get()) == null) {
            result.completeExceptionally(new AxonServerException(ErrorCategory.INSTRUCTION_EXECUTION_ERROR, "Unable to send instruction", this.clientIdentification.getClientId()));
        } else {
            if (ObjectUtils.hasLength(instructionId)) {
                this.awaitingAck.put(instructionId, result);
            }
            try {
                dispatcher.onNext((Object)instruction);
            }
            catch (Exception e) {
                this.awaitingAck.remove(instructionId);
                result.completeExceptionally(e);
                ObjectUtils.silently(dispatcher, d -> d.onError((Throwable)e));
            }
        }
        return result;
    }

    @Override
    public boolean isConnected() {
        return this.instructionDispatcher.get() != null;
    }

    private class AckHandler
    implements InstructionHandler<PlatformOutboundInstruction, PlatformInboundInstruction> {
        private AckHandler() {
        }

        @Override
        public void handle(PlatformOutboundInstruction ackMessage, ReplyChannel<PlatformInboundInstruction> replyChannel) {
            String instructionId = ackMessage.getAck().getInstructionId();
            logger.info("Received ACK for {}", (Object)instructionId);
            CompletableFuture handle = (CompletableFuture)ControlChannelImpl.this.awaitingAck.remove(instructionId);
            if (handle != null) {
                handle.complete(ackMessage.getAck());
            }
        }
    }

    private class PlatformOutboundInstructionHandler
    extends AbstractIncomingInstructionStream<PlatformOutboundInstruction, PlatformInboundInstruction> {
        public PlatformOutboundInstructionHandler(String clientId, Consumer<Throwable> disconnectHandler) {
            super(clientId, 0, 0, disconnectHandler);
        }

        @Override
        protected PlatformInboundInstruction buildAckMessage(InstructionAck ack) {
            return PlatformInboundInstruction.newBuilder().setAck(ack).build();
        }

        @Override
        protected String getInstructionId(PlatformOutboundInstruction value) {
            return value.getInstructionId();
        }

        @Override
        protected InstructionHandler<PlatformOutboundInstruction, PlatformInboundInstruction> getHandler(PlatformOutboundInstruction platformOutboundInstruction) {
            return (InstructionHandler)ControlChannelImpl.this.instructionHandlers.get((Object)platformOutboundInstruction.getRequestCase());
        }

        @Override
        protected boolean unregisterOutboundStream(StreamObserver<PlatformInboundInstruction> expected) {
            ControlChannelImpl.this.heartbeatMonitor.pause();
            boolean disconnected = ControlChannelImpl.this.instructionDispatcher.compareAndSet(expected, null);
            if (disconnected) {
                ControlChannelImpl.this.failOpenInstructions(new AxonServerException(ErrorCategory.INSTRUCTION_ACK_ERROR, "Disconnected from AxonServer before receiving instruction ACK", this.clientId()));
            }
            return disconnected;
        }

        @Override
        protected PlatformInboundInstruction buildFlowControlMessage(FlowControl flowControl) {
            return null;
        }
    }
}

