/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.axonserver.connector;

import io.axoniq.axonserver.grpc.ErrorMessage;
import io.axoniq.axonserver.grpc.InstructionAck;
import io.axoniq.axonserver.grpc.command.CommandProviderInbound;
import io.axoniq.axonserver.grpc.command.CommandProviderOutbound;
import io.axoniq.axonserver.grpc.command.CommandServiceGrpc;
import io.axoniq.axonserver.grpc.control.ClientIdentification;
import io.axoniq.axonserver.grpc.control.NodeInfo;
import io.axoniq.axonserver.grpc.control.PlatformInboundInstruction;
import io.axoniq.axonserver.grpc.control.PlatformInfo;
import io.axoniq.axonserver.grpc.control.PlatformOutboundInstruction;
import io.axoniq.axonserver.grpc.control.PlatformServiceGrpc;
import io.axoniq.axonserver.grpc.query.QueryProviderInbound;
import io.axoniq.axonserver.grpc.query.QueryProviderOutbound;
import io.axoniq.axonserver.grpc.query.QueryServiceGrpc;
import io.grpc.Channel;
import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors;
import io.grpc.ConnectivityState;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.stub.StreamObserver;
import io.netty.handler.ssl.SslContext;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.net.ssl.SSLException;
import org.axonframework.axonserver.connector.AxonServerConfiguration;
import org.axonframework.axonserver.connector.AxonServerException;
import org.axonframework.axonserver.connector.DefaultHandlers;
import org.axonframework.axonserver.connector.DefaultInstructionAckSource;
import org.axonframework.axonserver.connector.ErrorCode;
import org.axonframework.axonserver.connector.Handlers;
import org.axonframework.axonserver.connector.InstructionAckSource;
import org.axonframework.axonserver.connector.SynchronizedStreamObserver;
import org.axonframework.axonserver.connector.util.AxonFrameworkVersionResolver;
import org.axonframework.axonserver.connector.util.ContextAddingInterceptor;
import org.axonframework.axonserver.connector.util.GrpcBufferingInterceptor;
import org.axonframework.axonserver.connector.util.TokenAddingInterceptor;
import org.axonframework.axonserver.connector.util.UpstreamAwareStreamObserver;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.AxonThreadFactory;
import org.axonframework.common.BuilderUtils;
import org.axonframework.config.TagsConfiguration;
import org.axonframework.lifecycle.ShutdownHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AxonServerConnectionManager {
    private static final Logger logger = LoggerFactory.getLogger(AxonServerConnectionManager.class);
    private final Map<String, ManagedChannel> channels = new ConcurrentHashMap<String, ManagedChannel>();
    private final Handlers<PlatformOutboundInstruction.RequestCase, BiConsumer<PlatformOutboundInstruction, StreamObserver<PlatformInboundInstruction>>> handlers = new DefaultHandlers<PlatformOutboundInstruction.RequestCase, BiConsumer<PlatformOutboundInstruction, StreamObserver<PlatformInboundInstruction>>>();
    private Map<String, StreamObserver<PlatformInboundInstruction>> instructionStreams = new ConcurrentHashMap<String, StreamObserver<PlatformInboundInstruction>>();
    private final Map<String, ScheduledFuture<?>> reconnectTasks = new ConcurrentHashMap();
    private final List<Consumer<String>> reconnectListeners = new CopyOnWriteArrayList<Consumer<String>>();
    private final List<Consumer<String>> disconnectListeners = new CopyOnWriteArrayList<Consumer<String>>();
    private final List<Function<Consumer<String>, Consumer<String>>> reconnectInterceptors = new CopyOnWriteArrayList<Function<Consumer<String>, Consumer<String>>>();
    private volatile boolean shutdown;
    private final AxonServerConfiguration axonServerConfiguration;
    private final TagsConfiguration tagsConfiguration;
    private final ScheduledExecutorService scheduler;
    private final Supplier<String> axonFrameworkVersionResolver;
    private final Function<UpstreamAwareStreamObserver<PlatformOutboundInstruction>, StreamObserver<PlatformInboundInstruction>> requestStreamFactory;
    private final InstructionAckSource<PlatformInboundInstruction> instructionAckSource;

    @Deprecated
    public AxonServerConnectionManager(AxonServerConfiguration axonServerConfiguration) {
        this(axonServerConfiguration, new TagsConfiguration());
    }

    @Deprecated
    public AxonServerConnectionManager(AxonServerConfiguration axonServerConfiguration, TagsConfiguration tagsConfiguration) {
        this.axonServerConfiguration = axonServerConfiguration;
        this.tagsConfiguration = tagsConfiguration;
        this.scheduler = Executors.newScheduledThreadPool(1, (ThreadFactory)new AxonThreadFactory("AxonServerConnector"){

            public Thread newThread(Runnable r) {
                Thread thread = super.newThread(r);
                thread.setDaemon(true);
                return thread;
            }
        });
        this.axonFrameworkVersionResolver = new AxonFrameworkVersionResolver();
        this.requestStreamFactory = os -> os.getRequestStream();
        this.instructionAckSource = new DefaultInstructionAckSource<PlatformInboundInstruction>(ack -> PlatformInboundInstruction.newBuilder().setAck((InstructionAck)ack).build());
    }

    protected AxonServerConnectionManager(Builder builder) {
        builder.validate();
        this.axonServerConfiguration = builder.axonServerConfiguration;
        this.tagsConfiguration = builder.tagsConfiguration;
        this.scheduler = builder.scheduler;
        this.axonFrameworkVersionResolver = builder.axonFrameworkVersionResolver;
        this.requestStreamFactory = builder.requestStreamFactory;
        this.instructionAckSource = builder.instructionAckSource;
        this.onOutboundInstruction(PlatformOutboundInstruction.RequestCase.NODE_NOTIFICATION, (PlatformOutboundInstruction instruction, StreamObserver<PlatformInboundInstruction> stream) -> logger.debug("Received: {}", (Object)instruction.getNodeNotification()));
        this.handlers.register(PlatformOutboundInstruction.RequestCase.ACK, (instruction, stream) -> {
            if (this.isUnsupportedInstructionErrorAck(instruction.getAck())) {
                logger.warn("Unsupported instruction sent to the server. {}", (Object)instruction.getAck());
            } else {
                logger.trace("Received instruction ack {}.", (Object)instruction.getAck());
            }
        });
    }

    private boolean isUnsupportedInstructionErrorAck(InstructionAck instructionResult) {
        return instructionResult.hasError() && instructionResult.getError().getErrorCode().equals(ErrorCode.UNSUPPORTED_INSTRUCTION.errorCode());
    }

    public static Builder builder() {
        return new Builder();
    }

    public Channel getChannel() {
        return this.getChannel(this.getDefaultContext());
    }

    public synchronized Channel getChannel(String context) {
        ConnectivityState channelState;
        this.checkConnectionState(context);
        ManagedChannel channel = this.channels.get(context);
        ConnectivityState connectivityState = channelState = channel == null ? ConnectivityState.SHUTDOWN : channel.getState(false);
        if (channelState != ConnectivityState.READY) {
            logger.info("Connecting using {}...", (Object)(this.axonServerConfiguration.isSslEnabled() ? "TLS" : "unencrypted connection"));
            ClientIdentification clientIdentification = ClientIdentification.newBuilder().setClientId(this.axonServerConfiguration.getClientId()).setComponentName(this.axonServerConfiguration.getComponentName()).putAllTags(this.tagsConfiguration.getTags()).setVersion(this.axonFrameworkVersionResolver.get()).build();
            ManagedChannel previousChannel = this.channels.remove(context);
            if (previousChannel != null && !previousChannel.isTerminated()) {
                logger.info("Channel re-opened. Shutting down previous Channel.");
                previousChannel.shutdownNow();
            }
            boolean axonServerUnavailable = false;
            for (NodeInfo nodeInfo : this.axonServerConfiguration.routingServers()) {
                ManagedChannel candidate = this.createChannel(nodeInfo.getHostName(), nodeInfo.getGrpcPort());
                PlatformServiceGrpc.PlatformServiceBlockingStub stub = (PlatformServiceGrpc.PlatformServiceBlockingStub)((PlatformServiceGrpc.PlatformServiceBlockingStub)PlatformServiceGrpc.newBlockingStub((Channel)candidate).withDeadlineAfter(this.axonServerConfiguration.getConnectTimeout(), TimeUnit.MILLISECONDS)).withInterceptors(new ClientInterceptor[]{new ContextAddingInterceptor(this.axonServerConfiguration.getContext()), new TokenAddingInterceptor(this.axonServerConfiguration.getToken())});
                try {
                    logger.info("Requesting connection details from {}:{}", (Object)nodeInfo.getHostName(), (Object)nodeInfo.getGrpcPort());
                    PlatformInfo clusterInfo = stub.getPlatformServer(clientIdentification);
                    logger.debug("Received PlatformInfo suggesting [{}] ({}:{}), {}", new Object[]{clusterInfo.getPrimary().getNodeName(), clusterInfo.getPrimary().getHostName(), clusterInfo.getPrimary().getGrpcPort(), clusterInfo.getSameConnection() ? "reusing existing connection" : "using new connection"});
                    if (this.isPrimary(nodeInfo, clusterInfo)) {
                        logger.info("Reusing existing channel");
                        this.channels.put(context, candidate);
                    } else {
                        this.shutdownNow(candidate);
                        logger.info("Connecting to [{}] ({}:{})", new Object[]{clusterInfo.getPrimary().getNodeName(), clusterInfo.getPrimary().getHostName(), clusterInfo.getPrimary().getGrpcPort()});
                        this.channels.put(context, this.createChannel(clusterInfo.getPrimary().getHostName(), clusterInfo.getPrimary().getGrpcPort()));
                    }
                    this.onOutboundInstruction(context, PlatformOutboundInstruction.RequestCase.REQUEST_RECONNECT, (PlatformOutboundInstruction instruction, StreamObserver<PlatformInboundInstruction> requestStream) -> this.onRequestReconnect(context, (StreamObserver<PlatformInboundInstruction>)requestStream));
                    this.startInstructionStream(context, clusterInfo.getPrimary().getNodeName(), clientIdentification);
                    axonServerUnavailable = false;
                    logger.info("Re-subscribing commands and queries");
                    this.notifyConnectionChange(this.reconnectListeners, context);
                    break;
                }
                catch (StatusRuntimeException sre) {
                    this.shutdownNow(candidate);
                    logger.warn("Connecting to AxonServer node [{}]:[{}] failed: {}", new Object[]{nodeInfo.getHostName(), nodeInfo.getGrpcPort(), sre.getMessage()});
                    axonServerUnavailable = true;
                }
            }
            if (axonServerUnavailable) {
                if (!this.axonServerConfiguration.getSuppressDownloadMessage()) {
                    this.axonServerConfiguration.setSuppressDownloadMessage(true);
                    this.writeDownloadMessage();
                }
                this.scheduleReconnect(context, false, true);
                throw new AxonServerException(ErrorCode.CONNECTION_FAILED.errorCode(), "No connection to AxonServer available");
            }
            if (!this.axonServerConfiguration.getSuppressDownloadMessage()) {
                this.axonServerConfiguration.setSuppressDownloadMessage(true);
            }
        }
        return this.intercepted(context, (Channel)this.channels.get(context));
    }

    private void onRequestReconnect(String context, StreamObserver<PlatformInboundInstruction> requestStream) {
        Consumer<String> reconnect = c -> {
            this.notifyConnectionChange(this.disconnectListeners, (String)c);
            requestStream.onCompleted();
            this.scheduleReconnect(context, true, true);
        };
        for (Function<Consumer<String>, Consumer<String>> interceptor : this.reconnectInterceptors) {
            reconnect = interceptor.apply(reconnect);
        }
        reconnect.accept(context);
    }

    private void notifyConnectionChange(List<Consumer<String>> listeners, String context) {
        listeners.forEach(action -> this.scheduler.execute(() -> action.accept(context)));
    }

    private Channel intercepted(String context, Channel candidate) {
        return ClientInterceptors.intercept((Channel)candidate, (ClientInterceptor[])new ClientInterceptor[]{new TokenAddingInterceptor(this.axonServerConfiguration.getToken()), new ContextAddingInterceptor(context)});
    }

    private void checkConnectionState(String context) {
        if (this.shutdown) {
            throw new AxonServerException(ErrorCode.CONNECTION_FAILED.errorCode(), "Shutdown in progress");
        }
        ScheduledFuture<?> reconnectTask = this.reconnectTasks.get(context);
        if (reconnectTask != null && !reconnectTask.isDone()) {
            throw new AxonServerException(ErrorCode.CONNECTION_FAILED.errorCode(), "No connection to AxonServer available");
        }
    }

    private void writeDownloadMessage() {
        try (InputStream in = this.getClass().getClassLoader().getResourceAsStream("axonserver_download.txt");){
            int read;
            byte[] buffer = new byte[1024];
            while (in != null && (read = in.read(buffer, 0, 1024)) >= 0) {
                System.out.write(buffer, 0, read);
            }
        }
        catch (IOException e) {
            logger.debug("Unable to write download advice. You're on your own now.", (Throwable)e);
        }
    }

    private void shutdownNow(ManagedChannel managedChannel) {
        try {
            managedChannel.shutdownNow().awaitTermination(1L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            logger.debug("Interrupted during shutdown");
        }
    }

    private boolean isPrimary(NodeInfo nodeInfo, PlatformInfo clusterInfo) {
        if (clusterInfo.getSameConnection()) {
            return true;
        }
        return clusterInfo.getPrimary().getGrpcPort() == nodeInfo.getGrpcPort() && clusterInfo.getPrimary().getHostName().equals(nodeInfo.getHostName());
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private ManagedChannel createChannel(String hostName, int port) {
        NettyChannelBuilder builder = NettyChannelBuilder.forAddress((String)hostName, (int)port);
        if (this.axonServerConfiguration.getKeepAliveTime() > 0L) {
            builder.keepAliveTime(this.axonServerConfiguration.getKeepAliveTime(), TimeUnit.MILLISECONDS).keepAliveTimeout(this.axonServerConfiguration.getKeepAliveTimeout(), TimeUnit.MILLISECONDS).keepAliveWithoutCalls(true);
        }
        if (this.axonServerConfiguration.getMaxMessageSize() > 0) {
            builder.maxInboundMessageSize(this.axonServerConfiguration.getMaxMessageSize());
        }
        if (this.axonServerConfiguration.isSslEnabled()) {
            try {
                if (this.axonServerConfiguration.getCertFile() == null) return ((NettyChannelBuilder)builder.intercept(new ClientInterceptor[]{new GrpcBufferingInterceptor(this.axonServerConfiguration.getMaxGrpcBufferedMessages())})).build();
                File certFile = new File(this.axonServerConfiguration.getCertFile());
                if (!certFile.exists()) {
                    throw new RuntimeException("Certificate file [" + this.axonServerConfiguration.getCertFile() + "] does not exist");
                }
                SslContext sslContext = GrpcSslContexts.forClient().trustManager(new File(this.axonServerConfiguration.getCertFile())).build();
                builder.sslContext(sslContext);
                return ((NettyChannelBuilder)builder.intercept(new ClientInterceptor[]{new GrpcBufferingInterceptor(this.axonServerConfiguration.getMaxGrpcBufferedMessages())})).build();
            }
            catch (SSLException e) {
                throw new RuntimeException("Couldn't set up SSL context", e);
            }
        } else {
            builder.usePlaintext();
        }
        return ((NettyChannelBuilder)builder.intercept(new ClientInterceptor[]{new GrpcBufferingInterceptor(this.axonServerConfiguration.getMaxGrpcBufferedMessages())})).build();
    }

    private synchronized void startInstructionStream(final String context, final String name, ClientIdentification clientIdentification) {
        logger.debug("Start instruction stream to node [{}] for context [{}]", (Object)name, (Object)context);
        SynchronizedStreamObserver<PlatformInboundInstruction> inputStream = new SynchronizedStreamObserver<PlatformInboundInstruction>(this.getPlatformStream(context, (StreamObserver<PlatformOutboundInstruction>)new UpstreamAwareStreamObserver<PlatformOutboundInstruction>(){

            public void onNext(PlatformOutboundInstruction messagePlatformOutboundInstruction) {
                Set<BiConsumer<PlatformOutboundInstruction, StreamObserver>> defaultHandlers = Collections.singleton((poi, stream) -> AxonServerConnectionManager.this.instructionAckSource.sendUnsupportedInstruction(poi.getInstructionId(), AxonServerConnectionManager.this.axonServerConfiguration.getClientId(), (StreamObserver)AxonServerConnectionManager.this.requestStreamFactory.apply(this)));
                AxonServerConnectionManager.this.handlers.getOrDefault(context, messagePlatformOutboundInstruction.getRequestCase(), defaultHandlers).forEach(consumer -> consumer.accept(messagePlatformOutboundInstruction, AxonServerConnectionManager.this.requestStreamFactory.apply(this)));
            }

            public void onError(Throwable throwable) {
                StatusRuntimeException sre;
                logger.warn("Lost instruction stream from [{}] - {}", (Object)name, (Object)throwable.getMessage());
                this.completeRequestStream();
                AxonServerConnectionManager.this.notifyConnectionChange(AxonServerConnectionManager.this.disconnectListeners, context);
                if (throwable instanceof StatusRuntimeException && (sre = (StatusRuntimeException)throwable).getStatus().getCode().equals((Object)Status.Code.PERMISSION_DENIED)) {
                    return;
                }
                AxonServerConnectionManager.this.scheduleReconnect(context, true, false);
            }

            public void onCompleted() {
                logger.info("Closed instruction stream to [{}]", (Object)name);
                this.completeRequestStream();
                AxonServerConnectionManager.this.notifyConnectionChange(AxonServerConnectionManager.this.disconnectListeners, context);
                AxonServerConnectionManager.this.scheduleReconnect(context, true, false);
            }
        }));
        inputStream.onNext(PlatformInboundInstruction.newBuilder().setRegister(clientIdentification).build());
        StreamObserver existingStream = this.instructionStreams.put(context, inputStream);
        if (existingStream != null) {
            existingStream.onCompleted();
        }
    }

    private synchronized void scheduleReconnect(String context, boolean immediate, boolean forceDisconnect) {
        ScheduledFuture<?> reconnectTask = this.reconnectTasks.get(context);
        if (!this.shutdown && (reconnectTask == null || reconnectTask.isDone())) {
            ManagedChannel channel;
            if (!forceDisconnect && (channel = this.channels.get(context)) != null && channel.getState(false) == ConnectivityState.READY) {
                return;
            }
            channel = this.channels.remove(context);
            if (channel != null) {
                try {
                    channel.shutdownNow().awaitTermination(1L, TimeUnit.SECONDS);
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
            this.reconnectTasks.put(context, this.scheduler.schedule(() -> this.tryReconnect(context), immediate ? 100L : 5000L, TimeUnit.MILLISECONDS));
        }
    }

    private synchronized void tryReconnect(String context) {
        if (this.channels.containsKey(context) || this.shutdown) {
            return;
        }
        try {
            this.reconnectTasks.remove(context);
            this.getChannel(context);
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    public void addReconnectListener(String context, Runnable action) {
        this.addReconnectListener(c -> {
            if (context.equals(c)) {
                action.run();
            }
        });
    }

    public void addReconnectListener(Consumer<String> action) {
        this.reconnectListeners.add(action);
    }

    public void addDisconnectListener(String context, Runnable action) {
        this.addDisconnectListener(c -> {
            if (context.equals(c)) {
                action.run();
            }
        });
    }

    public void addDisconnectListener(Consumer<String> action) {
        this.disconnectListeners.add(action);
    }

    public void addReconnectInterceptor(Function<Consumer<String>, Consumer<String>> interceptor) {
        this.reconnectInterceptors.add(interceptor);
    }

    public StreamObserver<CommandProviderOutbound> getCommandStream(String context, StreamObserver<CommandProviderInbound> inboundCommandStream) {
        return CommandServiceGrpc.newStub(this.getChannel(context)).openStream(inboundCommandStream);
    }

    public StreamObserver<QueryProviderOutbound> getQueryStream(String context, StreamObserver<QueryProviderInbound> inboundQueryStream) {
        return QueryServiceGrpc.newStub(this.getChannel(context)).openStream(inboundQueryStream);
    }

    public StreamObserver<PlatformInboundInstruction> getPlatformStream(String context, StreamObserver<PlatformOutboundInstruction> outboundInstructionStream) {
        return PlatformServiceGrpc.newStub(this.intercepted(context, (Channel)this.channels.get(context))).openStream(outboundInstructionStream);
    }

    @Deprecated
    public void onOutboundInstruction(PlatformOutboundInstruction.RequestCase requestCase, Consumer<PlatformOutboundInstruction> consumer) {
        this.onOutboundInstruction(this.getDefaultContext(), requestCase, consumer);
    }

    public void onOutboundInstruction(String context, PlatformOutboundInstruction.RequestCase requestCase, Consumer<PlatformOutboundInstruction> consumer) {
        this.onOutboundInstruction(context, requestCase, (PlatformOutboundInstruction i, StreamObserver<PlatformInboundInstruction> s) -> consumer.accept((PlatformOutboundInstruction)i));
    }

    public void onOutboundInstruction(PlatformOutboundInstruction.RequestCase requestCase, BiConsumer<PlatformOutboundInstruction, StreamObserver<PlatformInboundInstruction>> handler) {
        this.handlers.register(requestCase, this.wrapWithConfirmation(handler));
    }

    public void onOutboundInstruction(String context, BiConsumer<PlatformOutboundInstruction, StreamObserver<PlatformInboundInstruction>> handler) {
        this.handlers.register((PlatformOutboundInstruction.RequestCase)((Object)context), this.wrapWithConfirmation(handler));
    }

    public void onOutboundInstruction(BiConsumer<PlatformOutboundInstruction, StreamObserver<PlatformInboundInstruction>> handler) {
        this.handlers.register(this.wrapWithConfirmation(handler));
    }

    public void onOutboundInstruction(String context, PlatformOutboundInstruction.RequestCase requestCase, BiConsumer<PlatformOutboundInstruction, StreamObserver<PlatformInboundInstruction>> handler) {
        this.handlers.register(context, requestCase, this.wrapWithConfirmation(handler));
    }

    public void onOutboundInstruction(BiPredicate<String, PlatformOutboundInstruction.RequestCase> handlerSelector, BiConsumer<PlatformOutboundInstruction, StreamObserver<PlatformInboundInstruction>> handler) {
        this.handlers.register(handlerSelector, this.wrapWithConfirmation(handler));
    }

    private BiConsumer<PlatformOutboundInstruction, StreamObserver<PlatformInboundInstruction>> wrapWithConfirmation(BiConsumer<PlatformOutboundInstruction, StreamObserver<PlatformInboundInstruction>> handler) {
        return (i, s) -> {
            try {
                handler.accept((PlatformOutboundInstruction)i, (StreamObserver<PlatformInboundInstruction>)s);
                this.instructionAckSource.sendSuccessfulAck(i.getInstructionId(), (StreamObserver<PlatformInboundInstruction>)s);
            }
            catch (Exception e) {
                logger.warn("Error happened while handling instruction {}.", (Object)i.getInstructionId());
                ErrorMessage instructionAckError = ErrorMessage.newBuilder().setErrorCode(ErrorCode.INSTRUCTION_ACK_ERROR.errorCode()).setLocation(this.axonServerConfiguration.getClientId()).addDetails("Error happened while handling instruction").build();
                this.instructionAckSource.sendUnsuccessfulAck(i.getInstructionId(), instructionAckError, (StreamObserver<PlatformInboundInstruction>)s);
            }
        };
    }

    public void send(String context, PlatformInboundInstruction instruction) {
        if (this.getChannel(context) != null) {
            this.instructionStreams.get(context).onNext((Object)instruction);
        }
    }

    public void disconnectExceptionally(String context, Throwable cause) {
        if (this.isConnected(context)) {
            this.instructionStreams.get(context).onError(cause);
        }
    }

    public boolean isConnected(String context) {
        ManagedChannel channel = this.channels.get(context);
        return channel != null && !channel.isShutdown();
    }

    @ShutdownHandler(phase=-134217728)
    public void shutdown() {
        this.shutdown = true;
        this.instructionStreams.values().forEach(StreamObserver::onCompleted);
        this.disconnect();
        this.scheduler.shutdown();
    }

    public void disconnect(String context) {
        ManagedChannel channel = this.channels.remove(context);
        if (channel != null) {
            this.shutdownChannel(channel, context);
        }
    }

    public void disconnect() {
        this.channels.forEach((context, channel) -> this.shutdownChannel((ManagedChannel)channel, (String)context));
        this.channels.clear();
    }

    private void shutdownChannel(ManagedChannel channel, String context) {
        try {
            channel.shutdown();
            if (!channel.awaitTermination(5L, TimeUnit.SECONDS)) {
                logger.warn("Awaited Context [{}] comm-channel for 5 seconds. Will shutdown forcefully.", (Object)context);
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            logger.debug("Interrupted during shutdown of context [{}]", (Object)context);
        }
    }

    public String getDefaultContext() {
        return this.axonServerConfiguration.getContext();
    }

    public static class Builder {
        private static final int DEFAULT_POOL_SIZE = 1;
        private AxonServerConfiguration axonServerConfiguration;
        private Supplier<String> axonFrameworkVersionResolver = new AxonFrameworkVersionResolver();
        private TagsConfiguration tagsConfiguration = new TagsConfiguration();
        private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, (ThreadFactory)new AxonThreadFactory(AxonServerConnectionManager.class.getSimpleName()){

            public Thread newThread(Runnable r) {
                Thread thread = super.newThread(r);
                thread.setDaemon(true);
                return thread;
            }
        });
        private Function<UpstreamAwareStreamObserver<PlatformOutboundInstruction>, StreamObserver<PlatformInboundInstruction>> requestStreamFactory = os -> os.getRequestStream();
        private InstructionAckSource<PlatformInboundInstruction> instructionAckSource = new DefaultInstructionAckSource<PlatformInboundInstruction>(ack -> PlatformInboundInstruction.newBuilder().setAck((InstructionAck)ack).build());

        public Builder axonServerConfiguration(AxonServerConfiguration axonServerConfiguration) {
            BuilderUtils.assertNonNull((Object)axonServerConfiguration, (String)"AxonServerConfiguration may not be null");
            this.axonServerConfiguration = axonServerConfiguration;
            return this;
        }

        public Builder tagsConfiguration(TagsConfiguration tagsConfiguration) {
            BuilderUtils.assertNonNull((Object)tagsConfiguration, (String)"TagsConfiguration may not be null");
            this.tagsConfiguration = tagsConfiguration;
            return this;
        }

        public Builder scheduler(ScheduledExecutorService scheduler) {
            BuilderUtils.assertNonNull((Object)scheduler, (String)"ScheduledExecutorService may not be null");
            this.scheduler = scheduler;
            return this;
        }

        public Builder axonFrameworkVersionResolver(Supplier<String> axonFrameworkVersionResolver) {
            BuilderUtils.assertNonNull(axonFrameworkVersionResolver, (String)"Axon Framework Version Resolver may not be null");
            this.axonFrameworkVersionResolver = axonFrameworkVersionResolver;
            return this;
        }

        public Builder requestStreamFactory(Function<UpstreamAwareStreamObserver<PlatformOutboundInstruction>, StreamObserver<PlatformInboundInstruction>> requestStreamFactory) {
            BuilderUtils.assertNonNull(requestStreamFactory, (String)"RequestStreamFactory may not be null");
            this.requestStreamFactory = requestStreamFactory;
            return this;
        }

        public Builder instructionAckSource(InstructionAckSource<PlatformInboundInstruction> instructionAckSource) {
            BuilderUtils.assertNonNull(instructionAckSource, (String)"InstructionAckSource may not be null");
            this.instructionAckSource = instructionAckSource;
            return this;
        }

        public AxonServerConnectionManager build() {
            return new AxonServerConnectionManager(this);
        }

        protected void validate() throws AxonConfigurationException {
            BuilderUtils.assertNonNull((Object)this.axonServerConfiguration, (String)"The AxonServerConfiguration is a hard requirement and should be provided");
        }
    }
}

