/*
 * Decompiled with CFR 0.152.
 */
package io.axoniq.axonserver.connector.impl;

import io.axoniq.axonserver.connector.AxonServerException;
import io.axoniq.axonserver.connector.ErrorCategory;
import io.axoniq.axonserver.connector.impl.AxonServerManagedChannel;
import io.axoniq.axonserver.connector.impl.ObjectUtils;
import io.axoniq.axonserver.grpc.InstructionAck;
import io.axoniq.axonserver.grpc.control.ClientIdentification;
import io.grpc.ConnectivityState;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractAxonServerChannel<OUT> {
    private static final Logger logger = LoggerFactory.getLogger(AbstractAxonServerChannel.class);
    private final ClientIdentification clientIdentification;
    private final ScheduledExecutorService executor;
    private final AxonServerManagedChannel channel;
    private final ConcurrentMap<String, CompletableFuture<Void>> instructions = new ConcurrentHashMap<String, CompletableFuture<Void>>();

    protected AbstractAxonServerChannel(ClientIdentification clientIdentification, ScheduledExecutorService executor, AxonServerManagedChannel axonServerManagedChannel) {
        this.clientIdentification = clientIdentification;
        this.executor = executor;
        this.channel = axonServerManagedChannel;
    }

    protected CompletableFuture<Void> sendInstruction(OUT instruction, Function<OUT, String> instructionIdProvider, StreamObserver<OUT> outboundStream) {
        CompletableFuture<Void> ack = new CompletableFuture<Void>();
        String instructionId = instructionIdProvider.apply(instruction);
        if (ObjectUtils.hasLength(instructionId)) {
            this.instructions.put(instructionId, ack);
            ack.whenComplete((r, e) -> this.instructions.remove(instructionId, ack));
        }
        ObjectUtils.doIfNotNull(outboundStream, s -> s.onNext(instruction)).orElse(() -> ack.completeExceptionally(new AxonServerException(ErrorCategory.INSTRUCTION_ACK_ERROR, "Unable to send instruction: no connection to AxonServer", this.clientIdentification.getClientId())));
        if (!ObjectUtils.hasLength(instructionId)) {
            ack.complete(null);
        }
        return ack;
    }

    protected void processAck(InstructionAck ack) {
        CompletableFuture future = (CompletableFuture)this.instructions.remove(ack.getInstructionId());
        if (future != null) {
            if (ack.getSuccess()) {
                future.complete(null);
            } else {
                future.completeExceptionally(new AxonServerException(ack.getError()));
            }
        }
    }

    protected void scheduleReconnect(Throwable disconnectReason) {
        this.instructions.keySet().forEach(k -> ObjectUtils.doIfNotNull((CompletableFuture)this.instructions.remove(k), f -> f.completeExceptionally(disconnectReason)));
        switch (Status.fromThrowable((Throwable)disconnectReason).getCode()) {
            case NOT_FOUND: 
            case PERMISSION_DENIED: 
            case UNIMPLEMENTED: 
            case UNAUTHENTICATED: 
            case FAILED_PRECONDITION: 
            case INVALID_ARGUMENT: 
            case RESOURCE_EXHAUSTED: {
                this.scheduleReconnect(5000);
                break;
            }
            case UNAVAILABLE: {
                this.scheduleReconnect(50);
                break;
            }
            default: {
                this.scheduleReconnect(500);
            }
        }
    }

    protected void scheduleImmediateReconnect() {
        logger.debug("Scheduling immediate reconnect");
        this.scheduleReconnect(0);
    }

    private void scheduleReconnect(int delay) {
        try {
            this.executor.schedule(() -> {
                ConnectivityState connectivityState = this.channel.getState(false);
                if (connectivityState == ConnectivityState.READY) {
                    this.connect();
                } else {
                    logger.debug("No connection to AxonServer available. Scheduling next attempt in 500ms");
                    this.scheduleReconnect(500);
                }
            }, (long)delay, TimeUnit.MILLISECONDS);
        }
        catch (RejectedExecutionException e) {
            logger.info("Ignoring reconnect request, as connector is being shut down.");
        }
    }

    public abstract void connect();

    public abstract void reconnect();

    public abstract void disconnect();

    public abstract boolean isReady();
}

