/*
 * Decompiled with CFR 0.152.
 */
package io.axoniq.axonserver.connector.impl;

import io.axoniq.axonserver.connector.ErrorCategory;
import io.axoniq.axonserver.connector.InstructionHandler;
import io.axoniq.axonserver.connector.impl.FlowControlledStream;
import io.axoniq.axonserver.connector.impl.ForwardingReplyChannel;
import io.axoniq.axonserver.connector.impl.StreamUnexpectedlyCompletedException;
import io.axoniq.axonserver.connector.impl.SynchronizedRequestStream;
import io.axoniq.axonserver.grpc.ErrorMessage;
import io.axoniq.axonserver.grpc.InstructionAck;
import io.axoniq.axonserver.grpc.InstructionResult;
import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.ClientCallStreamObserver;
import java.lang.invoke.MethodHandles;
import java.util.Optional;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractIncomingInstructionStream<IN, OUT>
extends FlowControlledStream<IN, OUT> {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final InstructionAck NO_HANDLER_FOR_INSTRUCTION = InstructionAck.newBuilder().setSuccess(false).setError(ErrorMessage.newBuilder().setErrorCode(ErrorCategory.UNSUPPORTED_INSTRUCTION.errorCode()).setMessage("No handler for instruction").build()).build();
    private static final InstructionAck SUCCESS_ACK = InstructionAck.newBuilder().setSuccess(true).build();
    private final Consumer<Throwable> disconnectHandler;
    private final Consumer<CallStreamObserver<OUT>> beforeStartHandler;
    private CallStreamObserver<OUT> instructionsForPlatform;

    protected AbstractIncomingInstructionStream(String clientId, int permits, int permitsBatch, Consumer<Throwable> disconnectHandler, Consumer<CallStreamObserver<OUT>> beforeStartHandler) {
        super(clientId, permits, permitsBatch);
        this.disconnectHandler = disconnectHandler;
        this.beforeStartHandler = beforeStartHandler;
    }

    public void onNext(IN value) {
        InstructionHandler<IN, OUT> handler = this.getHandler(value);
        String instructionId = this.getInstructionId(value);
        if (handler == null) {
            logger.debug("Unsupported instruction received: {}", value);
            this.markConsumed();
            if (instructionId != null && !instructionId.isEmpty()) {
                this.instructionsForPlatform.onNext(this.buildAckMessage(NO_HANDLER_FOR_INSTRUCTION));
            }
        } else {
            if (instructionId != null && !instructionId.isEmpty()) {
                this.instructionsForPlatform.onNext(this.buildAckMessage(SUCCESS_ACK));
            }
            ForwardingReplyChannel<OUT> replyChannel = new ForwardingReplyChannel<OUT>(this.getInstructionId(value), this.clientId(), this.instructionsForPlatform, this::buildResultMessage, this::markConsumed);
            handler.handle(value, replyChannel);
        }
    }

    protected abstract OUT buildAckMessage(InstructionAck var1);

    protected Optional<OUT> buildResultMessage(InstructionResult result) {
        return Optional.empty();
    }

    protected abstract String getInstructionId(IN var1);

    protected abstract InstructionHandler<IN, OUT> getHandler(IN var1);

    public void onCompleted() {
        logger.debug("Stream completed from server side");
        if (this.unregisterOutboundStream(this.instructionsForPlatform)) {
            logger.debug("Instruction stream disconnected. Scheduling reconnect");
            StreamUnexpectedlyCompletedException t = new StreamUnexpectedlyCompletedException("Stream unexpectedly completed by server");
            this.disconnectHandler.accept(t);
            this.instructionsForPlatform.onCompleted();
        }
    }

    public void onError(Throwable t) {
        logger.debug("Error received", t);
        if (this.unregisterOutboundStream(this.instructionsForPlatform)) {
            logger.debug("Instruction stream disconnected. Scheduling reconnect");
            this.disconnectHandler.accept(t);
            this.instructionsForPlatform.onCompleted();
        }
    }

    @Override
    public void beforeStart(ClientCallStreamObserver<OUT> requestStream) {
        SynchronizedRequestStream<OUT> synchronizedRequestStream = new SynchronizedRequestStream<OUT>(requestStream);
        super.beforeStart(synchronizedRequestStream);
        this.instructionsForPlatform = synchronizedRequestStream;
        this.beforeStartHandler.accept((CallStreamObserver<OUT>)this.getInstructionsForPlatform());
    }

    public ClientCallStreamObserver<OUT> getInstructionsForPlatform() {
        return this.outboundStream();
    }

    protected abstract boolean unregisterOutboundStream(CallStreamObserver<OUT> var1);
}

