/*
 * Decompiled with CFR 0.152.
 */
package io.axoniq.axonserver.connector.impl;

import io.axoniq.axonserver.connector.AxonServerConnection;
import io.axoniq.axonserver.connector.admin.AdminChannel;
import io.axoniq.axonserver.connector.admin.impl.AdminChannelImpl;
import io.axoniq.axonserver.connector.command.CommandChannel;
import io.axoniq.axonserver.connector.command.impl.CommandChannelImpl;
import io.axoniq.axonserver.connector.control.ControlChannel;
import io.axoniq.axonserver.connector.event.EventChannel;
import io.axoniq.axonserver.connector.event.impl.EventChannelImpl;
import io.axoniq.axonserver.connector.event.transformation.EventTransformationChannel;
import io.axoniq.axonserver.connector.event.transformation.impl.EventTransformationChannelImpl;
import io.axoniq.axonserver.connector.impl.AbstractAxonServerChannel;
import io.axoniq.axonserver.connector.impl.AxonServerManagedChannel;
import io.axoniq.axonserver.connector.impl.ControlChannelImpl;
import io.axoniq.axonserver.connector.impl.ObjectUtils;
import io.axoniq.axonserver.connector.query.QueryChannel;
import io.axoniq.axonserver.connector.query.impl.QueryChannelImpl;
import io.axoniq.axonserver.grpc.control.ClientIdentification;
import io.grpc.ConnectivityState;
import io.grpc.StatusRuntimeException;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;

public class ContextConnection
implements AxonServerConnection {
    private final ClientIdentification clientIdentification;
    private final ControlChannelImpl controlChannel;
    private final AtomicReference<CommandChannelImpl> commandChannel = new AtomicReference();
    private final AtomicReference<EventChannelImpl> eventChannel = new AtomicReference();
    private final AtomicReference<QueryChannelImpl> queryChannel = new AtomicReference();
    private final AtomicReference<EventTransformationChannelImpl> eventTransformationChannel = new AtomicReference();
    private final AtomicReference<AdminChannelImpl> adminChannel = new AtomicReference();
    private final ScheduledExecutorService executorService;
    private final AxonServerManagedChannel connection;
    private final int commandPermits;
    private final int queryPermits;
    private final String context;
    private final Consumer<ContextConnection> onShutdown;

    public ContextConnection(ClientIdentification clientIdentification, ScheduledExecutorService executorService, AxonServerManagedChannel connection, long processorInfoUpdateFrequency, int commandPermits, int queryPermits, String context, Consumer<ContextConnection> onShutdown) {
        this.clientIdentification = clientIdentification;
        this.executorService = executorService;
        this.connection = connection;
        this.commandPermits = commandPermits;
        this.queryPermits = queryPermits;
        this.context = context;
        this.onShutdown = onShutdown;
        this.controlChannel = new ControlChannelImpl(clientIdentification, context, executorService, connection, processorInfoUpdateFrequency, this::reconnectChannels);
    }

    private void reconnectChannels() {
        this.connection.requestReconnect();
        ObjectUtils.doIfNotNull(this.commandChannel.get(), CommandChannelImpl::reconnect);
        ObjectUtils.doIfNotNull(this.queryChannel.get(), QueryChannelImpl::reconnect);
        ObjectUtils.doIfNotNull(this.controlChannel, ControlChannelImpl::reconnect);
        ObjectUtils.doIfNotNull(this.eventChannel.get(), EventChannelImpl::reconnect);
        ObjectUtils.doIfNotNull(this.eventTransformationChannel.get(), EventTransformationChannelImpl::reconnect);
        ObjectUtils.doIfNotNull(this.adminChannel.get(), AdminChannelImpl::reconnect);
    }

    @Override
    public boolean isConnectionFailed() {
        return this.connection.getState(false) == ConnectivityState.TRANSIENT_FAILURE;
    }

    @Override
    public boolean isReady() {
        return this.isConnected() && Optional.ofNullable(this.commandChannel.get()).map(CommandChannelImpl::isReady).orElse(true) != false && Optional.ofNullable(this.queryChannel.get()).map(QueryChannelImpl::isReady).orElse(true) != false && Optional.ofNullable(this.eventChannel.get()).map(EventChannelImpl::isReady).orElse(true) != false && Optional.ofNullable(this.eventTransformationChannel.get()).map(EventTransformationChannelImpl::isReady).orElse(true) != false && Optional.ofNullable(this.adminChannel.get()).map(AdminChannelImpl::isReady).orElse(true) != false && this.controlChannel.isReady();
    }

    @Override
    public boolean isConnected() {
        return this.connection.getState(false) == ConnectivityState.READY;
    }

    @Override
    public void disconnect() {
        ObjectUtils.doIfNotNull(this.controlChannel, ControlChannelImpl::disconnect);
        ObjectUtils.doIfNotNull(this.commandChannel.get(), CommandChannelImpl::disconnect);
        ObjectUtils.doIfNotNull(this.queryChannel.get(), QueryChannelImpl::disconnect);
        ObjectUtils.doIfNotNull(this.eventChannel.get(), EventChannelImpl::disconnect);
        ObjectUtils.doIfNotNull(this.eventTransformationChannel.get(), EventTransformationChannelImpl::disconnect);
        ObjectUtils.doIfNotNull(this.adminChannel.get(), AdminChannelImpl::disconnect);
        this.connection.shutdown();
        this.onShutdown.accept(this);
        try {
            if (!this.connection.awaitTermination(5L, TimeUnit.SECONDS)) {
                this.connection.shutdownNow();
            }
        }
        catch (InterruptedException e) {
            this.connection.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    @Override
    public ControlChannel controlChannel() {
        return this.controlChannel;
    }

    public void connect() {
        this.ensureConnected(this.controlChannel);
    }

    @Override
    public CommandChannel commandChannel() {
        return this.createIfAbsentAndInitialize(this.commandChannel, () -> new CommandChannelImpl(this.clientIdentification, this.context, this.commandPermits, this.commandPermits / 4, this.executorService, this.connection), this::ensureConnected);
    }

    @Override
    public EventChannel eventChannel() {
        return this.createIfAbsentAndInitialize(this.eventChannel, () -> new EventChannelImpl(this.clientIdentification, this.executorService, this.connection), this::ensureConnected);
    }

    @Override
    public QueryChannel queryChannel() {
        return this.createIfAbsentAndInitialize(this.queryChannel, () -> new QueryChannelImpl(this.clientIdentification, this.context, this.queryPermits, this.queryPermits / 4, this.executorService, this.connection), this::ensureConnected);
    }

    @Override
    public EventTransformationChannel eventTransformationChannel() {
        return this.createIfAbsentAndInitialize(this.eventTransformationChannel, () -> new EventTransformationChannelImpl(this.clientIdentification, this.executorService, this.connection), this::ensureConnected);
    }

    @Override
    public AdminChannel adminChannel() {
        return this.createIfAbsentAndInitialize(this.adminChannel, () -> new AdminChannelImpl(this.clientIdentification, this.executorService, this.connection), this::ensureConnected);
    }

    private <T extends AbstractAxonServerChannel<?>> T createIfAbsentAndInitialize(AtomicReference<T> reference, Supplier<T> ifAbsent, UnaryOperator<T> initializer) {
        AbstractAxonServerChannel current = (AbstractAxonServerChannel)reference.get();
        if (current != null) {
            return (T)current;
        }
        AbstractAxonServerChannel newInstance = (AbstractAxonServerChannel)reference.updateAndGet(existing -> existing == null ? (AbstractAxonServerChannel)ifAbsent.get() : existing);
        return (T)((AbstractAxonServerChannel)initializer.apply(newInstance));
    }

    private <T extends AbstractAxonServerChannel<?>> T ensureConnected(T channel) {
        if (!channel.isReady()) {
            ConnectivityState state = this.connection.getState(true);
            if (state != ConnectivityState.SHUTDOWN && state != ConnectivityState.TRANSIENT_FAILURE) {
                try {
                    channel.connect();
                }
                catch (StatusRuntimeException e) {
                    this.connection.notifyWhenStateChanged(state, channel::connect);
                }
            } else {
                this.connection.notifyWhenStateChanged(state, channel::connect);
            }
        }
        return channel;
    }

    public AxonServerManagedChannel getManagedChannel() {
        return this.connection;
    }
}

