package io.axoniq.axonserver.connector.impl;

import io.axoniq.axonserver.connector.InstructionHandler;
import io.axoniq.axonserver.connector.Registration;
import io.axoniq.axonserver.connector.ReplyChannel;
import io.axoniq.axonserver.connector.control.ControlChannel;
import io.axoniq.axonserver.connector.control.ProcessorInstructionHandler;
import io.axoniq.axonserver.grpc.FlowControl;
import io.axoniq.axonserver.grpc.InstructionAck;
import io.axoniq.axonserver.grpc.control.ClientIdentification;
import io.axoniq.axonserver.grpc.control.EventProcessorInfo;
import io.axoniq.axonserver.grpc.control.Heartbeat;
import io.axoniq.axonserver.grpc.control.PlatformInboundInstruction;
import io.axoniq.axonserver.grpc.control.PlatformOutboundInstruction;
import io.axoniq.axonserver.grpc.control.PlatformServiceGrpc;
import io.grpc.Status;
import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.util.Collection;
import java.util.EnumMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/axoniq/axonserver/connector/impl/ControlChannelImpl.class */
public class ControlChannelImpl extends AbstractAxonServerChannel<PlatformInboundInstruction> implements ControlChannel {
    private static final Logger logger = LoggerFactory.getLogger(ControlChannelImpl.class);
    private final ClientIdentification clientIdentification;
    private final ScheduledExecutorService executor;
    private final long processorInfoUpdateFrequency;
    private final Runnable reconnectHandler;
    private final AtomicReference<CallStreamObserver<PlatformInboundInstruction>> instructionDispatcher;
    private final Map<PlatformOutboundInstruction.RequestCase, InstructionHandler<PlatformOutboundInstruction, PlatformInboundInstruction>> instructionHandlers;
    private final HeartbeatMonitor heartbeatMonitor;
    private final String context;
    private final Map<String, ProcessorInstructionHandler> processorInstructionHandlers;
    private final Map<String, Supplier<EventProcessorInfo>> processorInfoSuppliers;
    private final AtomicBoolean infoSupplierActive;
    private final PlatformServiceGrpc.PlatformServiceStub platformServiceStub;
    private final AxonServerManagedChannel channel;

    /* loaded from: input_file:io/axoniq/axonserver/connector/impl/ControlChannelImpl$PlatformOutboundInstructionHandler.class */
    private class PlatformOutboundInstructionHandler extends AbstractIncomingInstructionStream<PlatformOutboundInstruction, PlatformInboundInstruction> {
        public PlatformOutboundInstructionHandler(String str, Consumer<Throwable> consumer, Consumer<CallStreamObserver<PlatformInboundInstruction>> consumer2) {
            super(str, 64, 8, consumer, consumer2);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // io.axoniq.axonserver.connector.impl.AbstractIncomingInstructionStream
        public PlatformInboundInstruction buildAckMessage(InstructionAck instructionAck) {
            return PlatformInboundInstruction.newBuilder().setAck(instructionAck).m978build();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // io.axoniq.axonserver.connector.impl.AbstractIncomingInstructionStream
        public String getInstructionId(PlatformOutboundInstruction platformOutboundInstruction) {
            return platformOutboundInstruction.getInstructionId();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // io.axoniq.axonserver.connector.impl.AbstractIncomingInstructionStream
        public InstructionHandler<PlatformOutboundInstruction, PlatformInboundInstruction> getHandler(PlatformOutboundInstruction platformOutboundInstruction) {
            return (InstructionHandler) ControlChannelImpl.this.instructionHandlers.get(platformOutboundInstruction.getRequestCase());
        }

        @Override // io.axoniq.axonserver.connector.impl.AbstractIncomingInstructionStream
        protected boolean unregisterOutboundStream(CallStreamObserver<PlatformInboundInstruction> callStreamObserver) {
            return ControlChannelImpl.this.instructionDispatcher.compareAndSet(callStreamObserver, null);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // io.axoniq.axonserver.connector.impl.FlowControlledStream
        public PlatformInboundInstruction buildFlowControlMessage(FlowControl flowControl) {
            return null;
        }
    }

    public ControlChannelImpl(ClientIdentification clientIdentification, String str, ScheduledExecutorService scheduledExecutorService, AxonServerManagedChannel axonServerManagedChannel, long j, Runnable runnable) {
        super(clientIdentification, scheduledExecutorService, axonServerManagedChannel);
        this.instructionDispatcher = new AtomicReference<>();
        this.instructionHandlers = new EnumMap(PlatformOutboundInstruction.RequestCase.class);
        this.processorInstructionHandlers = new ConcurrentHashMap();
        this.processorInfoSuppliers = new ConcurrentHashMap();
        this.infoSupplierActive = new AtomicBoolean();
        this.channel = axonServerManagedChannel;
        this.clientIdentification = clientIdentification;
        this.context = str;
        this.executor = scheduledExecutorService;
        this.processorInfoUpdateFrequency = j;
        this.reconnectHandler = runnable;
        HeartbeatSender heartbeatSender = this::sendHeartBeat;
        axonServerManagedChannel.getClass();
        this.heartbeatMonitor = new HeartbeatMonitor(scheduledExecutorService, heartbeatSender, axonServerManagedChannel::requestReconnect);
        this.instructionHandlers.put(PlatformOutboundInstruction.RequestCase.ACK, this::handleAck);
        this.instructionHandlers.put(PlatformOutboundInstruction.RequestCase.HEARTBEAT, (platformOutboundInstruction, replyChannel) -> {
            this.heartbeatMonitor.handleIncomingBeat(replyChannel);
        });
        this.instructionHandlers.put(PlatformOutboundInstruction.RequestCase.MERGE_EVENT_PROCESSOR_SEGMENT, ProcessorInstructions.mergeHandler(this.processorInstructionHandlers));
        this.instructionHandlers.put(PlatformOutboundInstruction.RequestCase.SPLIT_EVENT_PROCESSOR_SEGMENT, ProcessorInstructions.splitHandler(this.processorInstructionHandlers));
        this.instructionHandlers.put(PlatformOutboundInstruction.RequestCase.START_EVENT_PROCESSOR, ProcessorInstructions.startHandler(this.processorInstructionHandlers));
        this.instructionHandlers.put(PlatformOutboundInstruction.RequestCase.PAUSE_EVENT_PROCESSOR, ProcessorInstructions.pauseHandler(this.processorInstructionHandlers));
        this.instructionHandlers.put(PlatformOutboundInstruction.RequestCase.RELEASE_SEGMENT, ProcessorInstructions.releaseSegmentHandler(this.processorInstructionHandlers));
        this.instructionHandlers.put(PlatformOutboundInstruction.RequestCase.REQUEST_EVENT_PROCESSOR_INFO, ProcessorInstructions.requestInfoHandler(this.processorInfoSuppliers));
        this.instructionHandlers.put(PlatformOutboundInstruction.RequestCase.REQUEST_RECONNECT, this::handleReconnectRequest);
        this.platformServiceStub = PlatformServiceGrpc.newStub(axonServerManagedChannel);
    }

    private void handleAck(PlatformOutboundInstruction platformOutboundInstruction, ReplyChannel<PlatformInboundInstruction> replyChannel) {
        processAck(platformOutboundInstruction.getAck());
        replyChannel.complete();
    }

    private CompletableFuture<Void> sendHeartBeat() {
        return !isReady() ? CompletableFuture.completedFuture(null) : sendInstruction(PlatformInboundInstruction.newBuilder().setInstructionId(UUID.randomUUID().toString()).setHeartbeat(Heartbeat.getDefaultInstance()).m978build());
    }

    void handleReconnectRequest(PlatformOutboundInstruction platformOutboundInstruction, ReplyChannel<PlatformInboundInstruction> replyChannel) {
        logger.info("AxonServer requested reconnect for context '{}'", this.context);
        replyChannel.sendAck();
        this.reconnectHandler.run();
    }

    @Override // io.axoniq.axonserver.connector.impl.AbstractAxonServerChannel
    public synchronized void connect() {
        if (this.instructionDispatcher.get() != null) {
            logger.info("ControlChannel for context '{}' is already connected", this.context);
            return;
        }
        StreamObserver<PlatformOutboundInstruction> platformOutboundInstructionHandler = new PlatformOutboundInstructionHandler(this.clientIdentification.getClientId(), this::handleDisconnect, this::registerOutboundStream);
        logger.debug("Opening instruction stream for context '{}'", this.context);
        this.platformServiceStub.openStream(platformOutboundInstructionHandler);
        ClientCallStreamObserver instructionsForPlatform = platformOutboundInstructionHandler.getInstructionsForPlatform();
        try {
            logger.info("Connected instruction stream for context '{}'. Sending client identification", this.context);
            instructionsForPlatform.onNext(PlatformInboundInstruction.newBuilder().setRegister(this.clientIdentification).m978build());
            this.heartbeatMonitor.resume();
        } catch (Exception e) {
            this.instructionDispatcher.set(null);
            instructionsForPlatform.onError(e);
        }
    }

    private void registerOutboundStream(CallStreamObserver<PlatformInboundInstruction> callStreamObserver) {
        ObjectUtils.silently(this.instructionDispatcher.getAndSet(callStreamObserver), (v0) -> {
            v0.onCompleted();
        });
    }

    @Override // io.axoniq.axonserver.connector.impl.AbstractAxonServerChannel
    public synchronized void reconnect() {
        ObjectUtils.doIfNotNull(this.instructionDispatcher.getAndSet(null), (v0) -> {
            v0.onCompleted();
        });
        scheduleImmediateReconnect();
    }

    private void handleDisconnect(Throwable th) {
        this.heartbeatMonitor.pause();
        if (Status.fromThrowable(th) == Status.UNAVAILABLE && this.channel.isReady()) {
            logger.info("Upstream unavailable. Forcing new connection.");
            this.reconnectHandler.run();
        }
        scheduleReconnect(th);
    }

    @Override // io.axoniq.axonserver.connector.impl.AbstractAxonServerChannel
    public synchronized void disconnect() {
        this.heartbeatMonitor.disableHeartbeat();
        StreamObserver andSet = this.instructionDispatcher.getAndSet(null);
        if (andSet != null) {
            andSet.onCompleted();
        }
    }

    @Override // io.axoniq.axonserver.connector.control.ControlChannel
    public Registration registerInstructionHandler(PlatformOutboundInstruction.RequestCase requestCase, InstructionHandler<PlatformOutboundInstruction, PlatformInboundInstruction> instructionHandler) {
        this.instructionHandlers.put(requestCase, instructionHandler);
        return new SyncRegistration(() -> {
            this.instructionHandlers.remove(requestCase, instructionHandler);
        });
    }

    @Override // io.axoniq.axonserver.connector.control.ControlChannel
    public Registration registerEventProcessor(String str, Supplier<EventProcessorInfo> supplier, ProcessorInstructionHandler processorInstructionHandler) {
        this.processorInstructionHandlers.put(str, processorInstructionHandler);
        this.processorInfoSuppliers.put(str, supplier);
        if (this.infoSupplierActive.compareAndSet(false, true)) {
            sendScheduledProcessorInfo();
        }
        return new SyncRegistration(() -> {
            this.processorInstructionHandlers.remove(str, processorInstructionHandler);
            this.processorInfoSuppliers.remove(str, supplier);
        });
    }

    private void sendScheduledProcessorInfo() {
        Collection<Supplier<EventProcessorInfo>> values = this.processorInfoSuppliers.values();
        if (values.isEmpty()) {
            this.infoSupplierActive.set(false);
            if (this.processorInfoSuppliers.isEmpty() || !this.infoSupplierActive.compareAndSet(false, true)) {
                return;
            }
            sendScheduledProcessorInfo();
            return;
        }
        CallStreamObserver<PlatformInboundInstruction> callStreamObserver = this.instructionDispatcher.get();
        if (callStreamObserver == null || !callStreamObserver.isReady()) {
            logger.debug("Not sending processor info for context '{}'. Channel not ready...", this.context);
        } else {
            values.forEach(supplier -> {
                ObjectUtils.doIfNotNull(supplier.get(), this::sendProcessorInfo);
            });
        }
        this.executor.schedule(this::sendScheduledProcessorInfo, this.processorInfoUpdateFrequency, TimeUnit.MILLISECONDS);
    }

    private CompletableFuture<Void> sendProcessorInfo(EventProcessorInfo eventProcessorInfo) {
        return sendInstruction(PlatformInboundInstruction.newBuilder().setEventProcessorInfo(eventProcessorInfo).m978build());
    }

    @Override // io.axoniq.axonserver.connector.control.ControlChannel
    public void enableHeartbeat(long j, long j2, TimeUnit timeUnit) {
        this.heartbeatMonitor.enableHeartbeat(j, j2, timeUnit);
    }

    @Override // io.axoniq.axonserver.connector.control.ControlChannel
    public void disableHeartbeat() {
        this.heartbeatMonitor.disableHeartbeat();
    }

    @Override // io.axoniq.axonserver.connector.control.ControlChannel
    public CompletableFuture<Void> sendInstruction(PlatformInboundInstruction platformInboundInstruction) {
        return sendInstruction(platformInboundInstruction, (v0) -> {
            return v0.getInstructionId();
        }, this.instructionDispatcher.get());
    }

    @Override // io.axoniq.axonserver.connector.impl.AbstractAxonServerChannel
    public boolean isReady() {
        return this.instructionDispatcher.get() != null;
    }
}
