/*
 * Decompiled with CFR 0.152.
 */
package com.rabbitmq.stream.impl;

import com.rabbitmq.stream.AuthenticationFailureException;
import com.rabbitmq.stream.ByteCapacity;
import com.rabbitmq.stream.ChunkChecksum;
import com.rabbitmq.stream.Codec;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.MessageBuilder;
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.StreamCreator;
import com.rabbitmq.stream.StreamException;
import com.rabbitmq.stream.compression.Compression;
import com.rabbitmq.stream.compression.CompressionCodec;
import com.rabbitmq.stream.compression.CompressionCodecFactory;
import com.rabbitmq.stream.impl.ClientProperties;
import com.rabbitmq.stream.impl.Codecs;
import com.rabbitmq.stream.impl.ConnectionStreamException;
import com.rabbitmq.stream.impl.ExecutorServiceFactory;
import com.rabbitmq.stream.impl.JdkChunkChecksum;
import com.rabbitmq.stream.impl.MessageBatch;
import com.rabbitmq.stream.impl.ServerFrameHandler;
import com.rabbitmq.stream.impl.TimeoutStreamException;
import com.rabbitmq.stream.impl.Utils;
import com.rabbitmq.stream.metrics.MetricsCollector;
import com.rabbitmq.stream.metrics.NoOpMetricsCollector;
import com.rabbitmq.stream.sasl.CredentialsProvider;
import com.rabbitmq.stream.sasl.DefaultSaslConfiguration;
import com.rabbitmq.stream.sasl.DefaultUsernamePasswordCredentialsProvider;
import com.rabbitmq.stream.sasl.SaslConfiguration;
import com.rabbitmq.stream.sasl.SaslMechanism;
import com.rabbitmq.stream.sasl.UsernamePasswordCredentialsProvider;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.flush.FlushConsolidationHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.Field;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.ToLongFunction;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLHandshakeException;
import javax.net.ssl.SSLParameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Client
implements AutoCloseable {
    private static final Charset CHARSET = StandardCharsets.UTF_8;
    public static final int DEFAULT_PORT = 5552;
    public static final int DEFAULT_TLS_PORT = 5551;
    static final OutboundEntityWriteCallback OUTBOUND_MESSAGE_WRITE_CALLBACK = new OutboundMessageWriteCallback();
    static final OutboundEntityWriteCallback OUTBOUND_MESSAGE_BATCH_WRITE_CALLBACK = new OutboundMessageBatchWriteCallback();
    static final String NETTY_HANDLER_FRAME_DECODER = LengthFieldBasedFrameDecoder.class.getSimpleName();
    static final String NETTY_HANDLER_IDLE_STATE = IdleStateHandler.class.getSimpleName();
    static final Duration DEFAULT_RPC_TIMEOUT = Duration.ofSeconds(10L);
    private static final PublishConfirmListener NO_OP_PUBLISH_CONFIRM_LISTENER = (publisherId, publishingId) -> {};
    private static final PublishErrorListener NO_OP_PUBLISH_ERROR_LISTENER = (publisherId, publishingId, errorCode) -> {};
    private static final Logger LOGGER = LoggerFactory.getLogger(Client.class);
    final PublishConfirmListener publishConfirmListener;
    final PublishErrorListener publishErrorListener;
    final ChunkListener chunkListener;
    final MessageListener messageListener;
    final MessageIgnoredListener messageIgnoredListener;
    final CreditNotification creditNotification;
    final ConsumerUpdateListener consumerUpdateListener;
    final MetadataListener metadataListener;
    final Codec codec;
    final Channel channel;
    final ConcurrentMap<Integer, OutstandingRequest<?>> outstandingRequests = new ConcurrentHashMap();
    final List<SubscriptionOffset> subscriptionOffsets = new CopyOnWriteArrayList<SubscriptionOffset>();
    final ExecutorService executorService;
    final ExecutorService dispatchingExecutorService;
    final TuneState tuneState;
    final AtomicBoolean closing = new AtomicBoolean(false);
    final ChunkChecksum chunkChecksum;
    final MetricsCollector metricsCollector;
    final CompressionCodecFactory compressionCodecFactory;
    private final Consumer<ShutdownContext.ShutdownReason> shutdownListenerCallback;
    private final ToLongFunction<Object> publishSequenceFunction = new ToLongFunction<Object>(){
        private final AtomicLong publishSequence = new AtomicLong(0L);

        @Override
        public long applyAsLong(Object value) {
            return this.publishSequence.getAndIncrement();
        }
    };
    private final AtomicInteger correlationSequence = new AtomicInteger(0);
    private final Runnable executorServiceClosing;
    private final SaslConfiguration saslConfiguration;
    private final CredentialsProvider credentialsProvider;
    private final Runnable nettyClosing;
    private final int maxFrameSize;
    private final boolean frameSizeCopped;
    private final EventLoopGroup eventLoopGroup;
    private final Map<String, String> clientProperties;
    private final String NETTY_HANDLER_FLUSH_CONSOLIDATION = FlushConsolidationHandler.class.getSimpleName();
    private final String NETTY_HANDLER_STREAM = StreamHandler.class.getSimpleName();
    private final String host;
    private final String clientConnectionName;
    private final int port;
    private final Map<String, String> serverProperties;
    private final Map<String, String> connectionProperties;
    private final Duration rpcTimeout;
    private final List<String> saslMechanisms;
    private volatile ShutdownContext.ShutdownReason shutdownReason = null;
    private final Runnable streamStatsCommandVersionsCheck;
    private final boolean filteringSupported;
    private final Runnable superStreamManagementCommandVersionsCheck;

    @SuppressFBWarnings(value={"CT_CONSTRUCTOR_THROW"})
    public Client() {
        this(new ClientParameters());
    }

    @SuppressFBWarnings(value={"CT_CONSTRUCTOR_THROW"})
    public Client(final ClientParameters parameters) {
        ChannelFuture f;
        this.publishConfirmListener = parameters.publishConfirmListener;
        this.publishErrorListener = parameters.publishErrorListener;
        this.chunkListener = parameters.chunkListener;
        this.messageListener = parameters.messageListener;
        this.messageIgnoredListener = parameters.messageIgnoredListener;
        this.creditNotification = parameters.creditNotification;
        this.codec = parameters.codec == null ? Codecs.DEFAULT : parameters.codec;
        this.saslConfiguration = parameters.saslConfiguration;
        this.credentialsProvider = parameters.credentialsProvider;
        this.chunkChecksum = parameters.chunkChecksum;
        this.metricsCollector = parameters.metricsCollector;
        this.metadataListener = parameters.metadataListener;
        this.consumerUpdateListener = parameters.consumerUpdateListener;
        this.compressionCodecFactory = parameters.compressionCodecFactory == null ? compression -> null : parameters.compressionCodecFactory;
        this.rpcTimeout = parameters.rpcTimeout == null ? DEFAULT_RPC_TIMEOUT : parameters.rpcTimeout;
        ShutdownListener shutdownListener = parameters.shutdownListener;
        AtomicBoolean started = new AtomicBoolean(false);
        this.shutdownListenerCallback = Utils.makeIdempotent(shutdownReason -> {
            if (started.get()) {
                this.metricsCollector.closeConnection();
                shutdownListener.handle(new ShutdownContext((ShutdownContext.ShutdownReason)((Object)shutdownReason)));
            }
        });
        Consumer bootstrapCustomizer = parameters.bootstrapCustomizer == null ? Utils.noOpConsumer() : parameters.bootstrapCustomizer;
        Bootstrap b = new Bootstrap();
        bootstrapCustomizer.accept(b);
        if (b.config().group() == null) {
            EventLoopGroup eventLoopGroup;
            if (parameters.eventLoopGroup == null) {
                eventLoopGroup = this.eventLoopGroup = new NioEventLoopGroup();
            } else {
                this.eventLoopGroup = null;
                eventLoopGroup = parameters.eventLoopGroup;
            }
            b.group(eventLoopGroup);
        } else {
            this.eventLoopGroup = null;
        }
        if (b.config().channelFactory() == null) {
            b.channel(NioSocketChannel.class);
        }
        if (!b.config().options().containsKey(ChannelOption.SO_KEEPALIVE)) {
            b.option(ChannelOption.SO_KEEPALIVE, (Object)true);
        }
        if (!b.config().options().containsKey(ChannelOption.ALLOCATOR)) {
            b.option(ChannelOption.ALLOCATOR, (Object)(parameters.byteBufAllocator == null ? ByteBufAllocator.DEFAULT : parameters.byteBufAllocator));
        }
        final Consumer channelCustomizer = parameters.channelCustomizer == null ? Utils.noOpConsumer() : parameters.channelCustomizer;
        b.handler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

            public void initChannel(SocketChannel ch) {
                ch.pipeline().addFirst(Client.this.NETTY_HANDLER_FLUSH_CONSOLIDATION, (ChannelHandler)new FlushConsolidationHandler(256, true));
                ch.pipeline().addLast(NETTY_HANDLER_FRAME_DECODER, (ChannelHandler)new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
                ch.pipeline().addLast(Client.this.NETTY_HANDLER_STREAM, (ChannelHandler)new StreamHandler());
                ch.pipeline().addLast(new ChannelHandler[]{new MetricsHandler(Client.this.metricsCollector)});
                if (parameters.sslContext != null) {
                    SslHandler sslHandler = parameters.sslContext.newHandler(ch.alloc(), parameters.host, parameters.port);
                    if (parameters.tlsHostnameVerification) {
                        SSLEngine sslEngine = sslHandler.engine();
                        SSLParameters sslParameters = sslEngine.getSSLParameters();
                        sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
                        sslEngine.setSSLParameters(sslParameters);
                    }
                    ch.pipeline().addFirst("ssl", (ChannelHandler)sslHandler);
                }
                channelCustomizer.accept(ch);
            }
        });
        String clientConnectionName = parameters.clientProperties == null ? "" : (parameters.clientProperties.containsKey("connection_name") ? (String)parameters.clientProperties.get("connection_name") : "");
        try {
            LOGGER.debug("Trying to create stream connection to {}:{}, with client connection name '{}'", new Object[]{parameters.host, parameters.port, clientConnectionName});
            f = b.connect(parameters.host, parameters.port).sync();
            this.host = parameters.host;
            this.port = parameters.port;
            this.clientConnectionName = clientConnectionName;
        }
        catch (Exception e) {
            String message = String.format("Error while creating stream connection to %s:%d", parameters.host, parameters.port);
            if (e instanceof ConnectTimeoutException) {
                throw new TimeoutStreamException(message, e);
            }
            if (e instanceof ConnectException) {
                throw new ConnectionStreamException(message, e);
            }
            throw new StreamException(message, e);
        }
        this.channel = f.channel();
        this.nettyClosing = Utils.makeIdempotent(this::closeNetty);
        ExecutorServiceFactory executorServiceFactory = parameters.executorServiceFactory;
        this.executorService = executorServiceFactory == null ? Executors.newSingleThreadExecutor(new Utils.NamedThreadFactory(clientConnectionName + "-")) : executorServiceFactory.get();
        ExecutorServiceFactory dispatchingExecutorServiceFactory = parameters.dispatchingExecutorServiceFactory;
        this.dispatchingExecutorService = dispatchingExecutorServiceFactory == null ? Executors.newSingleThreadExecutor(new Utils.NamedThreadFactory("dispatching-" + clientConnectionName + "-")) : dispatchingExecutorServiceFactory.get();
        this.executorServiceClosing = Utils.makeIdempotent(() -> {
            this.dispatchingExecutorService.shutdownNow();
            if (dispatchingExecutorServiceFactory == null) {
                this.dispatchingExecutorService.shutdownNow();
            } else {
                dispatchingExecutorServiceFactory.clientClosed(this.dispatchingExecutorService);
            }
            if (executorServiceFactory == null) {
                this.executorService.shutdownNow();
            } else {
                executorServiceFactory.clientClosed(this.executorService);
            }
        });
        try {
            this.tuneState = new TuneState(parameters.requestedMaxFrameSize, (int)parameters.requestedHeartbeat.getSeconds());
            this.clientProperties = Client.clientProperties(parameters.clientProperties);
            this.serverProperties = this.peerProperties();
            this.saslMechanisms = this.getSaslMechanisms();
            this.authenticate(this.credentialsProvider);
            this.tuneState.await(Duration.ofSeconds(10L));
            this.maxFrameSize = this.tuneState.getMaxFrameSize();
            this.frameSizeCopped = this.maxFrameSize() > 0;
            LOGGER.debug("Connection tuned with max frame size {} and heartbeat {}", (Object)this.maxFrameSize(), (Object)this.tuneState.getHeartbeat());
            this.connectionProperties = this.open(parameters.virtualHost);
            Set<ServerFrameHandler.FrameHandlerInfo> supportedCommands = this.maybeExchangeCommandVersions();
            AtomicBoolean streamStatsSupported = new AtomicBoolean(false);
            AtomicBoolean filteringSupportedReference = new AtomicBoolean(false);
            AtomicBoolean superStreamManagementSupported = new AtomicBoolean(false);
            supportedCommands.forEach(c -> {
                if (c.getKey() == 28) {
                    streamStatsSupported.set(true);
                }
                if (c.getKey() == 2 && c.getMaxVersion() >= 2) {
                    filteringSupportedReference.set(true);
                }
                if (c.getKey() == 29) {
                    superStreamManagementSupported.set(true);
                }
            });
            this.streamStatsCommandVersionsCheck = streamStatsSupported.get() ? () -> {} : () -> {
                throw new UnsupportedOperationException("QueryStreamInfo is available only on RabbitMQ 3.11 or more.");
            };
            this.filteringSupported = filteringSupportedReference.get();
            this.superStreamManagementCommandVersionsCheck = superStreamManagementSupported.get() ? () -> {} : () -> {
                throw new UnsupportedOperationException("Super stream management is available only on RabbitMQ 3.13 or more.");
            };
            started.set(true);
            this.metricsCollector.openConnection();
        }
        catch (RuntimeException e) {
            this.closingSequence(null);
            throw e;
        }
    }

    private static Map<String, String> clientProperties(Map<String, String> fromParameters) {
        fromParameters = fromParameters == null ? Collections.emptyMap() : fromParameters;
        HashMap<String, String> clientProperties = new HashMap<String, String>(fromParameters);
        clientProperties.putAll(ClientProperties.DEFAULT_CLIENT_PROPERTIES);
        return Collections.unmodifiableMap(clientProperties);
    }

    static void checkMessageFitsInFrame(int maxFrameSize, Codec.EncodedMessage encodedMessage) {
        int frameBeginning = 24 + encodedMessage.getSize();
        if (frameBeginning > maxFrameSize) {
            throw new IllegalArgumentException("Message too big to fit in one frame: " + encodedMessage.getSize());
        }
    }

    Codec codec() {
        return this.codec;
    }

    int maxFrameSize() {
        return this.maxFrameSize;
    }

    private Map<String, String> peerProperties() {
        int clientPropertiesSize = Client.mapSize(this.clientProperties);
        int length = 8 + clientPropertiesSize;
        int correlationId = this.correlationSequence.incrementAndGet();
        try {
            ByteBuf bb = this.allocateNoCheck(length + 4);
            bb.writeInt(length);
            bb.writeShort((int)Utils.encodeRequestCode((short)17));
            bb.writeShort(1);
            bb.writeInt(correlationId);
            Client.writeMap(bb, this.clientProperties);
            OutstandingRequest request = this.outstandingRequest();
            this.outstandingRequests.put(correlationId, request);
            this.channel.writeAndFlush((Object)bb);
            request.block();
            if (request.error() == null) {
                return (Map)request.response.get();
            }
            throw new StreamException("Error when establishing stream connection", request.error());
        }
        catch (StreamException e) {
            this.outstandingRequests.remove(correlationId);
            throw e;
        }
        catch (RuntimeException e) {
            this.outstandingRequests.remove(correlationId);
            throw new StreamException("Error while trying to exchange peer properties", e);
        }
    }

    void authenticate(CredentialsProvider credentialsProvider) {
        SaslMechanism saslMechanism = this.saslConfiguration.getSaslMechanism(this.saslMechanisms);
        byte[] challenge = null;
        boolean authDone = false;
        while (!authDone) {
            byte[] saslResponse = saslMechanism.handleChallenge(challenge, credentialsProvider);
            SaslAuthenticateResponse saslAuthenticateResponse = this.sendSaslAuthenticate(saslMechanism, saslResponse);
            if (saslAuthenticateResponse.isOk()) {
                authDone = true;
                continue;
            }
            if (saslAuthenticateResponse.isChallenge()) {
                challenge = saslAuthenticateResponse.challenge;
                continue;
            }
            if (saslAuthenticateResponse.isAuthenticationFailure()) {
                String message = "Unexpected response code during authentication: " + Utils.formatConstant(saslAuthenticateResponse.getResponseCode());
                if (saslAuthenticateResponse.getResponseCode() == 11) {
                    message = message + ". The user is not authorized to connect from a remote host. If the broker is running locally, make sure the '" + this.host + "' hostname is resolved to the loopback interface (localhost, 127.0.0.1, ::1). See https://www.rabbitmq.com/access-control.html#loopback-users.";
                }
                throw new AuthenticationFailureException(message, saslAuthenticateResponse.getResponseCode());
            }
            throw new StreamException("Unexpected response code during authentication: " + Utils.formatConstant(saslAuthenticateResponse.getResponseCode()));
        }
    }

    private SaslAuthenticateResponse sendSaslAuthenticate(SaslMechanism saslMechanism, byte[] challengeResponse) {
        int length = 10 + saslMechanism.getName().length() + 4 + (challengeResponse == null ? 0 : challengeResponse.length);
        int correlationId = this.correlationSequence.incrementAndGet();
        try {
            ByteBuf bb = this.allocateNoCheck(length + 4);
            bb.writeInt(length);
            bb.writeShort((int)Utils.encodeRequestCode((short)19));
            bb.writeShort(1);
            bb.writeInt(correlationId);
            bb.writeShort(saslMechanism.getName().length());
            bb.writeBytes(saslMechanism.getName().getBytes(CHARSET));
            if (challengeResponse == null) {
                bb.writeInt(-1);
            } else {
                bb.writeInt(challengeResponse.length).writeBytes(challengeResponse);
            }
            OutstandingRequest request = this.outstandingRequest();
            this.outstandingRequests.put(correlationId, request);
            this.channel.writeAndFlush((Object)bb);
            request.block();
            return (SaslAuthenticateResponse)request.response.get();
        }
        catch (StreamException e) {
            this.outstandingRequests.remove(correlationId);
            throw e;
        }
        catch (RuntimeException e) {
            this.outstandingRequests.remove(correlationId);
            throw new StreamException("Error while trying to authenticate", e);
        }
    }

    private Map<String, String> open(String virtualHost) {
        int length = 10 + virtualHost.length();
        int correlationId = this.correlationSequence.incrementAndGet();
        try {
            ByteBuf bb = this.allocate(length + 4);
            bb.writeInt(length);
            bb.writeShort((int)Utils.encodeRequestCode((short)21));
            bb.writeShort(1);
            bb.writeInt(correlationId);
            bb.writeShort(virtualHost.length());
            bb.writeBytes(virtualHost.getBytes(CHARSET));
            OutstandingRequest request = this.outstandingRequest();
            this.outstandingRequests.put(correlationId, request);
            this.channel.writeAndFlush((Object)bb);
            request.block();
            if (!((OpenResponse)request.response.get()).isOk()) {
                throw new StreamException("Unexpected response code when connecting to virtual host: " + Utils.formatConstant(((OpenResponse)request.response.get()).getResponseCode()));
            }
            return ((OpenResponse)request.response.get()).connectionProperties;
        }
        catch (StreamException e) {
            this.outstandingRequests.remove(correlationId);
            throw e;
        }
        catch (RuntimeException e) {
            this.outstandingRequests.remove(correlationId);
            throw new StreamException("Error during open command", e);
        }
    }

    void send(byte[] content) {
        ByteBuf bb = this.allocateNoCheck(content.length);
        bb.writeBytes(content);
        try {
            this.channel.writeAndFlush((Object)bb).sync();
        }
        catch (InterruptedException e) {
            throw new StreamException("Error while sending bytes", e);
        }
    }

    private void sendClose(short code, String reason) {
        int length = 12 + reason.length();
        int correlationId = this.correlationSequence.incrementAndGet();
        try {
            ByteBuf bb = this.allocate(length + 4);
            bb.writeInt(length);
            bb.writeShort((int)Utils.encodeRequestCode((short)22));
            bb.writeShort(1);
            bb.writeInt(correlationId);
            bb.writeShort((int)code);
            bb.writeShort(reason.length());
            bb.writeBytes(reason.getBytes(CHARSET));
            OutstandingRequest request = this.outstandingRequest();
            this.outstandingRequests.put(correlationId, request);
            this.channel.writeAndFlush((Object)bb);
            request.block();
            if (!((Response)request.response.get()).isOk()) {
                LOGGER.warn("Unexpected response code when closing: {}", (Object)Utils.formatConstant(((Response)request.response.get()).getResponseCode()));
                throw new StreamException("Unexpected response code when closing: " + Utils.formatConstant(((Response)request.response.get()).getResponseCode()));
            }
        }
        catch (StreamException e) {
            this.outstandingRequests.remove(correlationId);
            throw e;
        }
        catch (RuntimeException e) {
            this.outstandingRequests.remove(correlationId);
            throw new StreamException("Error while closing connection", e);
        }
    }

    private List<String> getSaslMechanisms() {
        int length = 8;
        int correlationId = this.correlationSequence.incrementAndGet();
        try {
            ByteBuf bb = this.allocateNoCheck(length + 4);
            bb.writeInt(length);
            bb.writeShort((int)Utils.encodeRequestCode((short)18));
            bb.writeShort(1);
            bb.writeInt(correlationId);
            OutstandingRequest request = this.outstandingRequest();
            this.outstandingRequests.put(correlationId, request);
            this.channel.writeAndFlush((Object)bb);
            request.block();
            return (List)request.response.get();
        }
        catch (StreamException e) {
            this.outstandingRequests.remove(correlationId);
            throw e;
        }
        catch (RuntimeException e) {
            this.outstandingRequests.remove(correlationId);
            throw new StreamException("Error while exchanging SASL mechanisms", e);
        }
    }

    public Response create(String stream) {
        return this.create(stream, Collections.emptyMap());
    }

    public Response create(String stream, Map<String, String> arguments) {
        int length = 10 + stream.length() + Client.mapSize(arguments);
        int correlationId = this.correlationSequence.incrementAndGet();
        try {
            ByteBuf bb = this.allocate(length + 4);
            bb.writeInt(length);
            bb.writeShort((int)Utils.encodeRequestCode((short)13));
            bb.writeShort(1);
            bb.writeInt(correlationId);
            bb.writeShort(stream.length());
            bb.writeBytes(stream.getBytes(CHARSET));
            Client.writeMap(bb, arguments);
            OutstandingRequest request = this.outstandingRequest();
            this.outstandingRequests.put(correlationId, request);
            this.channel.writeAndFlush((Object)bb);
            request.block();
            return (Response)request.response.get();
        }
        catch (StreamException e) {
            this.outstandingRequests.remove(correlationId);
            throw e;
        }
        catch (RuntimeException e) {
            this.outstandingRequests.remove(correlationId);
            throw new StreamException(String.format("Error while creating stream '%s'", stream), e);
        }
    }

    Response createSuperStream(String superStream, List<String> partitions, List<String> bindingKeys, Map<String, String> arguments) {
        this.superStreamManagementCommandVersionsCheck.run();
        if (partitions.isEmpty() || bindingKeys.isEmpty()) {
            throw new IllegalArgumentException("Partitions and routing keys of a super stream cannot be empty");
        }
        if (partitions.size() != bindingKeys.size()) {
            throw new IllegalArgumentException("Partitions and routing keys of a super stream must have the same number of elements");
        }
        arguments = arguments == null ? Collections.emptyMap() : arguments;
        int length = 10 + superStream.length() + Client.collectionSize(partitions) + Client.collectionSize(bindingKeys) + Client.mapSize(arguments);
        int correlationId = this.correlationSequence.incrementAndGet();
        try {
            ByteBuf bb = this.allocate(length + 4);
            bb.writeInt(length);
            bb.writeShort((int)Utils.encodeRequestCode((short)29));
            bb.writeShort(1);
            bb.writeInt(correlationId);
            bb.writeShort(superStream.length());
            bb.writeBytes(superStream.getBytes(CHARSET));
            Client.writeCollection(bb, partitions);
            Client.writeCollection(bb, bindingKeys);
            Client.writeMap(bb, arguments);
            OutstandingRequest request = this.outstandingRequest();
            this.outstandingRequests.put(correlationId, request);
            this.channel.writeAndFlush((Object)bb);
            request.block();
            return (Response)request.response.get();
        }
        catch (StreamException e) {
            this.outstandingRequests.remove(correlationId);
            throw e;
        }
        catch (RuntimeException e) {
            this.outstandingRequests.remove(correlationId);
            throw new StreamException(String.format("Error while creating super stream '%s'", superStream), e);
        }
    }

    Response deleteSuperStream(String superStream) {
        this.superStreamManagementCommandVersionsCheck.run();
        int length = 10 + superStream.length();
        int correlationId = this.correlationSequence.incrementAndGet();
        try {
            ByteBuf bb = this.allocate(length + 4);
            bb.writeInt(length);
            bb.writeShort((int)Utils.encodeRequestCode((short)30));
            bb.writeShort(1);
            bb.writeInt(correlationId);
            bb.writeShort(superStream.length());
            bb.writeBytes(superStream.getBytes(CHARSET));
            OutstandingRequest request = this.outstandingRequest();
            this.outstandingRequests.put(correlationId, request);
            this.channel.writeAndFlush((Object)bb);
            request.block();
            return (Response)request.response.get();
        }
        catch (StreamException e) {
            this.outstandingRequests.remove(correlationId);
            throw e;
        }
        catch (RuntimeException e) {
            this.outstandingRequests.remove(correlationId);
            throw new StreamException(String.format("Error while deleting stream '%s'", superStream), e);
        }
    }

    private static int collectionSize(Collection<String> elements) {
        return 4 + elements.stream().mapToInt(v -> 2 + v.length()).sum();
    }

    private static int arraySize(String ... elements) {
        return Client.collectionSize(Arrays.asList(elements));
    }

    private static int mapSize(Map<String, String> elements) {
        return 4 + elements.entrySet().stream().mapToInt(e -> 2 + ((String)e.getKey()).length() + 2 + ((String)e.getValue()).length()).sum();
    }

    private static ByteBuf writeCollection(ByteBuf bb, Collection<String> elements) {
        bb.writeInt(elements.size());
        elements.forEach(e -> bb.writeShort(e.length()).writeBytes(e.getBytes(CHARSET)));
        return bb;
    }

    private static ByteBuf writeArray(ByteBuf bb, String ... elements) {
        return Client.writeCollection(bb, Arrays.asList(elements));
    }

    private static ByteBuf writeMap(ByteBuf bb, Map<String, String> elements) {
        bb.writeInt(elements.size());
        elements.forEach((key, value) -> bb.writeShort(key.length()).writeBytes(key.getBytes(CHARSET)).writeShort(value.length()).writeBytes(value.getBytes(CHARSET)));
        return bb;
    }

    ByteBuf allocate(ByteBufAllocator allocator, int capacity) {
        if (this.frameSizeCopped && capacity > this.maxFrameSize()) {
            throw new IllegalArgumentException("Cannot allocate " + capacity + " bytes for outbound frame, limit is " + this.maxFrameSize());
        }
        return allocator.buffer(capacity);
    }

    private ByteBuf allocate(int capacity) {
        return this.allocate(this.channel.alloc(), capacity);
    }

    ByteBuf allocateNoCheck(ByteBufAllocator allocator, int capacity) {
        return allocator.buffer(capacity);
    }

    private ByteBuf allocateNoCheck(int capacity) {
        return this.allocateNoCheck(this.channel.alloc(), capacity);
    }

    public Response delete(String stream) {
        int length = 10 + stream.length();
        int correlationId = this.correlationSequence.incrementAndGet();
        try {
            ByteBuf bb = this.allocate(length + 4);
            bb.writeInt(length);
            bb.writeShort((int)Utils.encodeRequestCode((short)14));
            bb.writeShort(1);
            bb.writeInt(correlationId);
            bb.writeShort(stream.length());
            bb.writeBytes(stream.getBytes(CHARSET));
            OutstandingRequest request = this.outstandingRequest();
            this.outstandingRequests.put(correlationId, request);
            this.channel.writeAndFlush((Object)bb);
            request.block();
            return (Response)request.response.get();
        }
        catch (StreamException e) {
            this.outstandingRequests.remove(correlationId);
            throw e;
        }
        catch (RuntimeException e) {
            this.outstandingRequests.remove(correlationId);
            throw new StreamException(String.format("Error while deleting stream '%s'", stream), e);
        }
    }

    Map<String, StreamMetadata> metadata(List<String> streams) {
        return this.metadata(streams.toArray(new String[0]));
    }

    public Map<String, StreamMetadata> metadata(String ... streams) {
        if (streams == null || streams.length == 0) {
            throw new IllegalArgumentException("At least one stream must be specified");
        }
        int length = 8 + Client.arraySize(streams);
        int correlationId = this.correlationSequence.incrementAndGet();
        try {
            ByteBuf bb = this.allocate(length + 4);
            bb.writeInt(length);
            bb.writeShort((int)Utils.encodeRequestCode((short)15));
            bb.writeShort(1);
            bb.writeInt(correlationId);
            Client.writeArray(bb, streams);
            OutstandingRequest request = this.outstandingRequest();
            this.outstandingRequests.put(correlationId, request);
            this.channel.writeAndFlush((Object)bb);
            request.block();
            return (Map)request.response.get();
        }
        catch (StreamException e) {
            this.outstandingRequests.remove(correlationId);
            throw e;
        }
        catch (RuntimeException e) {
            this.outstandingRequests.remove(correlationId);
            throw new StreamException(String.format("Error while getting metadata for stream(s) '%s'", String.join((CharSequence)",", streams)), e);
        }
    }

    public Response declarePublisher(byte publisherId, String publisherReference, String stream) {
        int publisherReferenceSize;
        int n = publisherReferenceSize = publisherReference == null || publisherReference.isEmpty() ? 0 : publisherReference.length();
        if (publisherReferenceSize > 256) {
            throw new IllegalArgumentException("If specified, publisher reference must less than 256 characters");
        }
        int length = 11 + publisherReferenceSize + 2 + stream.length();
        int correlationId = this.correlationSequence.getAndIncrement();
        try {
            ByteBuf bb = this.allocate(length + 4);
            bb.writeInt(length);
            bb.writeShort((int)Utils.encodeRequestCode((short)1));
            bb.writeShort(1);
            bb.writeInt(correlationId);
            bb.writeByte((int)publisherId);
            bb.writeShort(publisherReferenceSize);
            if (publisherReferenceSize > 0) {
                bb.writeBytes(publisherReference.getBytes(CHARSET));
            }
            bb.writeShort(stream.length());
            bb.writeBytes(stream.getBytes(CHARSET));
            OutstandingRequest request = this.outstandingRequest();
            this.outstandingRequests.put(correlationId, request);
            this.channel.writeAndFlush((Object)bb);
            request.block();
            return (Response)request.response.get();
        }
        catch (StreamException e) {
            this.outstandingRequests.remove(correlationId);
            throw e;
        }
        catch (RuntimeException e) {
            this.outstandingRequests.remove(correlationId);
            throw new StreamException(String.format("Error while declaring publisher for stream '%s'", stream), e);
        }
    }

    public Response deletePublisher(byte publisherId) {
        int length = 9;
        int correlationId = this.correlationSequence.getAndIncrement();
        try {
            ByteBuf bb = this.allocate(length + 4);
            bb.writeInt(length);
            bb.writeShort((int)Utils.encodeRequestCode((short)6));
            bb.writeShort(1);
            bb.writeInt(correlationId);
            bb.writeByte((int)publisherId);
            OutstandingRequest request = this.outstandingRequest();
            this.outstandingRequests.put(correlationId, request);
            this.channel.writeAndFlush((Object)bb);
            request.block();
            return (Response)request.response.get();
        }
        catch (StreamException e) {
            this.outstandingRequests.remove(correlationId);
            throw e;
        }
        catch (RuntimeException e) {
            this.outstandingRequests.remove(correlationId);
            throw new StreamException("Error while deleting publisher", e);
        }
    }

    public List<Long> publish(byte publisherId, List<Message> messages) {
        return this.publish(publisherId, messages, this.publishSequenceFunction);
    }

    public List<Long> publish(byte publisherId, List<Message> messages, ToLongFunction<Object> publishSequenceFunction) {
        ArrayList<Object> encodedMessages = new ArrayList<Object>(messages.size());
        for (Message message : messages) {
            Codec.EncodedMessage encodedMessage = this.codec.encode(message);
            this.checkMessageFitsInFrame(encodedMessage);
            encodedMessages.add(encodedMessage);
        }
        return this.publishInternal((short)1, this.channel, publisherId, encodedMessages, OUTBOUND_MESSAGE_WRITE_CALLBACK, publishSequenceFunction);
    }

    public List<Long> publish(byte publisherId, List<Message> messages, OutboundEntityMappingCallback mappingCallback) {
        return this.publish(publisherId, messages, mappingCallback, this.publishSequenceFunction);
    }

    public List<Long> publish(byte publisherId, List<Message> messages, OutboundEntityMappingCallback mappingCallback, ToLongFunction<Object> publishSequenceFunction) {
        ArrayList<Object> encodedMessages = new ArrayList<Object>(messages.size());
        for (Message message : messages) {
            Codec.EncodedMessage encodedMessage = this.codec.encode(message);
            this.checkMessageFitsInFrame(encodedMessage);
            OriginalAndEncodedOutboundEntity wrapper = new OriginalAndEncodedOutboundEntity(message, encodedMessage);
            encodedMessages.add(wrapper);
        }
        return this.publishInternal((short)1, this.channel, publisherId, encodedMessages, new OriginalEncodedEntityOutboundEntityWriteCallback(mappingCallback, OUTBOUND_MESSAGE_WRITE_CALLBACK), publishSequenceFunction);
    }

    public List<Long> publishBatches(byte publisherId, List<MessageBatch> messageBatches) {
        return this.publishBatches(publisherId, messageBatches, this.publishSequenceFunction);
    }

    public List<Long> publishBatches(byte publisherId, List<MessageBatch> messageBatches, ToLongFunction<Object> publishSequenceFunction) {
        ArrayList<Object> encodedMessageBatches = new ArrayList<Object>(messageBatches.size());
        for (MessageBatch batch : messageBatches) {
            EncodedMessageBatch encodedMessageBatch = this.createEncodedMessageBatch(batch.compression, batch.messages.size());
            for (Message message : batch.messages) {
                Codec.EncodedMessage encodedMessage = this.codec.encode(message);
                this.checkMessageFitsInFrame(encodedMessage);
                encodedMessageBatch.add(encodedMessage);
            }
            encodedMessageBatch.close();
            this.checkMessageBatchFitsInFrame(encodedMessageBatch);
            encodedMessageBatches.add(encodedMessageBatch);
        }
        return this.publishInternal((short)1, this.channel, publisherId, encodedMessageBatches, OUTBOUND_MESSAGE_BATCH_WRITE_CALLBACK, publishSequenceFunction);
    }

    public List<Long> publishBatches(byte publisherId, List<MessageBatch> messageBatches, OutboundEntityMappingCallback mappingCallback) {
        return this.publishBatches(publisherId, messageBatches, mappingCallback, this.publishSequenceFunction);
    }

    public List<Long> publishBatches(byte publisherId, List<MessageBatch> messageBatches, OutboundEntityMappingCallback mappingCallback, ToLongFunction<Object> publishSequenceFunction) {
        ArrayList<Object> encodedMessageBatches = new ArrayList<Object>(messageBatches.size());
        for (MessageBatch batch : messageBatches) {
            EncodedMessageBatch encodedMessageBatch = this.createEncodedMessageBatch(batch.compression, batch.messages.size());
            for (Message message : batch.messages) {
                Codec.EncodedMessage encodedMessage = this.codec.encode(message);
                this.checkMessageFitsInFrame(encodedMessage);
                encodedMessageBatch.add(encodedMessage);
            }
            encodedMessageBatch.close();
            this.checkMessageBatchFitsInFrame(encodedMessageBatch);
            OriginalAndEncodedOutboundEntity wrapper = new OriginalAndEncodedOutboundEntity(batch, encodedMessageBatch);
            encodedMessageBatches.add(wrapper);
        }
        return this.publishInternal((short)1, this.channel, publisherId, encodedMessageBatches, new OriginalEncodedEntityOutboundEntityWriteCallback(mappingCallback, OUTBOUND_MESSAGE_BATCH_WRITE_CALLBACK), publishSequenceFunction);
    }

    private void checkMessageFitsInFrame(Codec.EncodedMessage encodedMessage) {
        Client.checkMessageFitsInFrame(this.maxFrameSize(), encodedMessage);
    }

    private void checkMessageBatchFitsInFrame(EncodedMessageBatch encodedMessageBatch) {
        int frameBeginning = 27 + encodedMessageBatch.sizeInBytes();
        if (frameBeginning > this.maxFrameSize()) {
            throw new IllegalArgumentException("Message batch too big to fit in one frame: " + encodedMessageBatch.sizeInBytes());
        }
    }

    List<Long> publishInternal(short version, byte publisherId, List<Object> encodedEntities, OutboundEntityWriteCallback callback, ToLongFunction<Object> publishSequenceFunction) {
        return this.publishInternal(version, this.channel, publisherId, encodedEntities, callback, publishSequenceFunction);
    }

    List<Long> publishInternal(short version, Channel ch, byte publisherId, List<Object> encodedEntities, OutboundEntityWriteCallback callback, ToLongFunction<Object> publishSequenceFunction) {
        int frameHeaderLength = 9;
        ArrayList<Long> sequences = new ArrayList<Long>(encodedEntities.size());
        int length = frameHeaderLength;
        int currentIndex = 0;
        int startIndex = 0;
        for (Object encodedEntity : encodedEntities) {
            if ((length += callback.fragmentLength(encodedEntity)) > this.maxFrameSize()) {
                int frameLength = length - callback.fragmentLength(encodedEntity);
                this.sendEntityBatch(version, ch, frameLength, publisherId, startIndex, currentIndex, encodedEntities, callback, sequences, publishSequenceFunction);
                length = frameHeaderLength + callback.fragmentLength(encodedEntity);
                startIndex = currentIndex;
            }
            ++currentIndex;
        }
        this.sendEntityBatch(version, ch, length, publisherId, startIndex, currentIndex, encodedEntities, callback, sequences, publishSequenceFunction);
        return sequences;
    }

    private void sendEntityBatch(short version, Channel ch, int frameLength, byte publisherId, int fromIncluded, int toExcluded, List<Object> messages, OutboundEntityWriteCallback callback, List<Long> sequences, ToLongFunction<Object> publishSequenceFunction) {
        ByteBuf out = this.allocateNoCheck(ch.alloc(), frameLength + 4);
        out.writeInt(frameLength);
        out.writeShort((int)Utils.encodeRequestCode((short)2));
        out.writeShort((int)version);
        out.writeByte((int)publisherId);
        int messageCount = 0;
        out.writeInt(toExcluded - fromIncluded);
        for (int i = fromIncluded; i < toExcluded; ++i) {
            Object message = messages.get(i);
            long sequence = publishSequenceFunction.applyAsLong(message);
            out.writeLong(sequence);
            messageCount += callback.write(out, message, sequence);
            sequences.add(sequence);
        }
        int msgCount = messageCount;
        ch.writeAndFlush((Object)out).addListener(future -> {
            if (future.isSuccess()) {
                this.metricsCollector.publish(msgCount);
            }
        });
    }

    public MessageBuilder messageBuilder() {
        return this.codec.messageBuilder();
    }

    public void credit(byte subscriptionId, int credit) {
        if (credit < 0 || credit > Short.MAX_VALUE) {
            throw new IllegalArgumentException("Credit value must be between 0 and 32767");
        }
        int length = 7;
        ByteBuf bb = this.allocate(length + 4);
        bb.writeInt(length);
        bb.writeShort((int)Utils.encodeRequestCode((short)9));
        bb.writeShort(1);
        bb.writeByte((int)subscriptionId);
        bb.writeShort((int)((short)credit));
        this.channel.writeAndFlush((Object)bb);
    }

    public Response subscribe(byte subscriptionId, String stream, OffsetSpecification offsetSpecification, int credit) {
        return this.subscribe(subscriptionId, stream, offsetSpecification, credit, Collections.emptyMap());
    }

    public Response subscribe(byte subscriptionId, String stream, OffsetSpecification offsetSpecification, int initialCredits, Map<String, String> properties) {
        if (initialCredits < 0 || initialCredits > Short.MAX_VALUE) {
            throw new IllegalArgumentException("Credit value must be between 0 and 32767");
        }
        int length = 11 + stream.length() + 2 + 2;
        if (offsetSpecification.isOffset() || offsetSpecification.isTimestamp()) {
            length += 8;
        }
        int propertiesSize = 0;
        if (properties != null && !properties.isEmpty()) {
            propertiesSize = Client.mapSize(properties);
        }
        length += propertiesSize;
        int correlationId = this.correlationSequence.getAndIncrement();
        try {
            ByteBuf bb = this.allocate(length + 4);
            bb.writeInt(length);
            bb.writeShort((int)Utils.encodeRequestCode((short)7));
            bb.writeShort(1);
            bb.writeInt(correlationId);
            bb.writeByte((int)subscriptionId);
            bb.writeShort(stream.length());
            bb.writeBytes(stream.getBytes(CHARSET));
            bb.writeShort((int)offsetSpecification.getType());
            if (offsetSpecification.isOffset() || offsetSpecification.isTimestamp()) {
                bb.writeLong(offsetSpecification.getOffset());
            }
            bb.writeShort(initialCredits);
            if (properties != null && !properties.isEmpty()) {
                Client.writeMap(bb, properties);
            }
            OutstandingRequest request = this.outstandingRequest();
            this.outstandingRequests.put(correlationId, request);
            if (offsetSpecification.isOffset()) {
                this.subscriptionOffsets.add(new SubscriptionOffset(subscriptionId, offsetSpecification.getOffset()));
            }
            this.channel.writeAndFlush((Object)bb);
            request.block();
            return (Response)request.response.get();
        }
        catch (StreamException e) {
            this.outstandingRequests.remove(correlationId);
            throw e;
        }
        catch (RuntimeException e) {
            this.outstandingRequests.remove(correlationId);
            throw new StreamException(String.format("Error while trying to subscribe to stream '%s'", stream), e);
        }
    }

    public void storeOffset(String reference, String stream, long offset) {
        if (reference == null || reference.isEmpty() || reference.length() > 256) {
            throw new IllegalArgumentException("Reference must a non-empty string of less than 256 characters");
        }
        if (stream == null || stream.isEmpty()) {
            throw new IllegalArgumentException("Stream cannot be null or empty");
        }
        int length = 6 + reference.length() + 2 + stream.length() + 8;
        ByteBuf bb = this.allocate(length + 4);
        bb.writeInt(length);
        bb.writeShort((int)Utils.encodeRequestCode((short)10));
        bb.writeShort(1);
        bb.writeShort(reference.length());
        bb.writeBytes(reference.getBytes(CHARSET));
        bb.writeShort(stream.length());
        bb.writeBytes(stream.getBytes(CHARSET));
        bb.writeLong(offset);
        this.channel.writeAndFlush((Object)bb);
    }

    public QueryOffsetResponse queryOffset(String reference, String stream) {
        if (reference == null || reference.isEmpty() || reference.length() > 256) {
            throw new IllegalArgumentException("Reference must a non-empty string of less than 256 characters");
        }
        if (stream == null || stream.isEmpty()) {
            throw new IllegalArgumentException("Stream cannot be null or empty");
        }
        int length = 10 + reference.length() + 2 + stream.length();
        int correlationId = this.correlationSequence.getAndIncrement();
        try {
            ByteBuf bb = this.allocate(length + 4);
            bb.writeInt(length);
            bb.writeShort((int)Utils.encodeRequestCode((short)11));
            bb.writeShort(1);
            bb.writeInt(correlationId);
            bb.writeShort(reference.length());
            bb.writeBytes(reference.getBytes(CHARSET));
            bb.writeShort(stream.length());
            bb.writeBytes(stream.getBytes(CHARSET));
            OutstandingRequest request = this.outstandingRequest();
            this.outstandingRequests.put(correlationId, request);
            this.channel.writeAndFlush((Object)bb);
            request.block();
            QueryOffsetResponse response = (QueryOffsetResponse)request.response.get();
            return response;
        }
        catch (StreamException e) {
            this.outstandingRequests.remove(correlationId);
            throw e;
        }
        catch (RuntimeException e) {
            this.outstandingRequests.remove(correlationId);
            throw new StreamException(String.format("Error while querying offset for reference '%s' on stream '%s'", reference, stream), e);
        }
    }

    public long queryPublisherSequence(String publisherReference, String stream) {
        if (publisherReference == null || publisherReference.isEmpty() || publisherReference.length() > 256) {
            throw new IllegalArgumentException("Publisher reference must a non-empty string of less than 256 characters");
        }
        if (stream == null || stream.isEmpty()) {
            throw new IllegalArgumentException("Stream cannot be null or empty");
        }
        int length = 10 + publisherReference.length() + 2 + stream.length();
        int correlationId = this.correlationSequence.getAndIncrement();
        try {
            ByteBuf bb = this.allocate(length + 4);
            bb.writeInt(length);
            bb.writeShort((int)Utils.encodeRequestCode((short)5));
            bb.writeShort(1);
            bb.writeInt(correlationId);
            bb.writeShort(publisherReference.length());
            bb.writeBytes(publisherReference.getBytes(CHARSET));
            bb.writeShort(stream.length());
            bb.writeBytes(stream.getBytes(CHARSET));
            OutstandingRequest request = this.outstandingRequest();
            this.outstandingRequests.put(correlationId, request);
            this.channel.writeAndFlush((Object)bb);
            request.block();
            QueryPublisherSequenceResponse response = (QueryPublisherSequenceResponse)request.response.get();
            if (!response.isOk()) {
                LOGGER.info("Query publisher sequence failed with code {}", (Object)Utils.formatConstant(response.getResponseCode()));
            }
            return response.getSequence();
        }
        catch (StreamException e) {
            this.outstandingRequests.remove(correlationId);
            throw e;
        }
        catch (RuntimeException e) {
            this.outstandingRequests.remove(correlationId);
            throw new StreamException(String.format("Error while querying publisher sequence for '%s' on stream '%s'", publisherReference, stream), e);
        }
    }

    public Response unsubscribe(byte subscriptionId) {
        int length = 9;
        int correlationId = this.correlationSequence.getAndIncrement();
        try {
            ByteBuf bb = this.allocate(length + 4);
            bb.writeInt(length);
            bb.writeShort((int)Utils.encodeRequestCode((short)12));
            bb.writeShort(1);
            bb.writeInt(correlationId);
            bb.writeByte((int)subscriptionId);
            OutstandingRequest request = this.outstandingRequest();
            this.outstandingRequests.put(correlationId, request);
            this.channel.writeAndFlush((Object)bb);
            request.block();
            return (Response)request.response.get();
        }
        catch (StreamException e) {
            this.outstandingRequests.remove(correlationId);
            throw e;
        }
        catch (RuntimeException e) {
            this.outstandingRequests.remove(correlationId);
            throw new StreamException("Error while unsubscribing", e);
        }
    }

    @Override
    public void close() {
        if (this.closing.compareAndSet(false, true)) {
            LOGGER.debug("Closing client");
            this.sendClose((short)1, "OK");
            this.closingSequence(ShutdownContext.ShutdownReason.CLIENT_CLOSE);
            LOGGER.debug("Client closed");
        }
    }

    void closingSequence(ShutdownContext.ShutdownReason reason) {
        if (reason != null) {
            this.shutdownListenerCallback.accept(reason);
        }
        this.nettyClosing.run();
        this.executorServiceClosing.run();
    }

    private void closeNetty() {
        try {
            if (this.channel.isOpen()) {
                LOGGER.debug("Closing Netty channel");
                this.channel.close().get(10L, TimeUnit.SECONDS);
            }
        }
        catch (InterruptedException e) {
            LOGGER.info("Channel closing has been interrupted");
            Thread.currentThread().interrupt();
        }
        catch (ExecutionException e) {
            LOGGER.info("Channel closing failed", (Throwable)e);
        }
        catch (TimeoutException e) {
            LOGGER.info("Could not close channel in 10 seconds");
        }
        this.maybeShutdownEventLoop();
    }

    private void maybeShutdownEventLoop() {
        try {
            if (!(this.eventLoopGroup == null || this.eventLoopGroup.isShuttingDown() && this.eventLoopGroup.isShutdown())) {
                LOGGER.debug("Closing Netty event loop group");
                this.eventLoopGroup.shutdownGracefully(1L, 10L, TimeUnit.SECONDS).get(10L, TimeUnit.SECONDS);
            }
        }
        catch (InterruptedException e) {
            LOGGER.info("Event loop group closing has been interrupted");
            Thread.currentThread().interrupt();
        }
        catch (ExecutionException e) {
            LOGGER.info("Event loop group closing failed", (Throwable)e);
        }
        catch (TimeoutException e) {
            LOGGER.info("Could not close event loop group in 10 seconds");
        }
    }

    public boolean isOpen() {
        return !this.closing.get();
    }

    String getHost() {
        return this.host;
    }

    int getPort() {
        return this.port;
    }

    String connectionName() {
        StringBuilder builder = new StringBuilder();
        SocketAddress localAddress = this.localAddress();
        if (localAddress instanceof InetSocketAddress) {
            InetSocketAddress address = (InetSocketAddress)localAddress;
            builder.append(address.getHostString()).append(":").append(address.getPort());
        } else {
            builder.append("?");
        }
        builder.append(" -> ");
        return builder.append(this.serverAddress()).toString();
    }

    private String serverAddress() {
        SocketAddress remoteAddress = this.remoteAddress();
        if (remoteAddress instanceof InetSocketAddress) {
            InetSocketAddress address = (InetSocketAddress)remoteAddress;
            return address.getHostString() + ":" + address.getPort();
        }
        return this.host + ":" + this.port;
    }

    public boolean filteringSupported() {
        return this.filteringSupported;
    }

    public List<String> route(String routingKey, String superStream) {
        if (routingKey == null || superStream == null) {
            throw new IllegalArgumentException("routing key and stream must not be null");
        }
        int length = 10 + routingKey.length() + 2 + superStream.length();
        int correlationId = this.correlationSequence.incrementAndGet();
        try {
            ByteBuf bb = this.allocate(length + 4);
            bb.writeInt(length);
            bb.writeShort((int)Utils.encodeRequestCode((short)24));
            bb.writeShort(1);
            bb.writeInt(correlationId);
            bb.writeShort(routingKey.length());
            bb.writeBytes(routingKey.getBytes(CHARSET));
            bb.writeShort(superStream.length());
            bb.writeBytes(superStream.getBytes(CHARSET));
            OutstandingRequest request = this.outstandingRequest();
            this.outstandingRequests.put(correlationId, request);
            this.channel.writeAndFlush((Object)bb);
            request.block();
            return (List)request.response.get();
        }
        catch (StreamException e) {
            this.outstandingRequests.remove(correlationId);
            throw e;
        }
        catch (RuntimeException e) {
            this.outstandingRequests.remove(correlationId);
            throw new StreamException(String.format("Error while querying route for routing key '%s' on super stream '%s'", routingKey, superStream), e);
        }
    }

    public List<String> partitions(String superStream) {
        if (superStream == null) {
            throw new IllegalArgumentException("stream must not be null");
        }
        int length = 10 + superStream.length();
        int correlationId = this.correlationSequence.incrementAndGet();
        try {
            ByteBuf bb = this.allocate(length + 4);
            bb.writeInt(length);
            bb.writeShort((int)Utils.encodeRequestCode((short)25));
            bb.writeShort(1);
            bb.writeInt(correlationId);
            bb.writeShort(superStream.length());
            bb.writeBytes(superStream.getBytes(CHARSET));
            OutstandingRequest request = this.outstandingRequest();
            this.outstandingRequests.put(correlationId, request);
            this.channel.writeAndFlush((Object)bb);
            request.block();
            return (List)request.response.get();
        }
        catch (StreamException e) {
            this.outstandingRequests.remove(correlationId);
            throw e;
        }
        catch (RuntimeException e) {
            this.outstandingRequests.remove(correlationId);
            throw new StreamException(String.format("Error while querying partitions for super stream '%s'", superStream), e);
        }
    }

    List<ServerFrameHandler.FrameHandlerInfo> exchangeCommandVersions() {
        List<ServerFrameHandler.FrameHandlerInfo> commandVersions = ServerFrameHandler.commandVersions();
        int length = 12;
        length += commandVersions.size() * 6;
        int correlationId = this.correlationSequence.incrementAndGet();
        try {
            ByteBuf bb = this.allocate(length + 4);
            bb.writeInt(length);
            bb.writeShort((int)Utils.encodeRequestCode((short)27));
            bb.writeShort(1);
            bb.writeInt(correlationId);
            bb.writeInt(commandVersions.size());
            for (ServerFrameHandler.FrameHandlerInfo commandVersion : commandVersions) {
                bb.writeShort((int)commandVersion.getKey());
                bb.writeShort((int)commandVersion.getMinVersion());
                bb.writeShort((int)commandVersion.getMaxVersion());
            }
            OutstandingRequest request = this.outstandingRequest();
            this.outstandingRequests.put(correlationId, request);
            this.channel.writeAndFlush((Object)bb);
            request.block();
            return (List)request.response.get();
        }
        catch (StreamException e) {
            this.outstandingRequests.remove(correlationId);
            throw e;
        }
        catch (RuntimeException e) {
            this.outstandingRequests.remove(correlationId);
            throw new StreamException("Error while exchanging command version", e);
        }
    }

    StreamStatsResponse streamStats(String stream) {
        this.streamStatsCommandVersionsCheck.run();
        if (stream == null) {
            throw new IllegalArgumentException("stream must not be null");
        }
        int length = 10 + stream.length();
        int correlationId = this.correlationSequence.incrementAndGet();
        try {
            ByteBuf bb = this.allocate(length + 4);
            bb.writeInt(length);
            bb.writeShort((int)Utils.encodeRequestCode((short)28));
            bb.writeShort(1);
            bb.writeInt(correlationId);
            bb.writeShort(stream.length());
            bb.writeBytes(stream.getBytes(CHARSET));
            OutstandingRequest request = this.outstandingRequest();
            this.outstandingRequests.put(correlationId, request);
            this.channel.writeAndFlush((Object)bb);
            request.block();
            return (StreamStatsResponse)request.response.get();
        }
        catch (StreamException e) {
            this.outstandingRequests.remove(correlationId);
            throw e;
        }
        catch (RuntimeException e) {
            this.outstandingRequests.remove(correlationId);
            throw new StreamException(String.format("Error while querying statistics for stream '%s'", stream), e);
        }
    }

    public void consumerUpdateResponse(int correlationId, short responseCode, OffsetSpecification offsetSpecification) {
        offsetSpecification = offsetSpecification == null ? OffsetSpecification.none() : offsetSpecification;
        int length = 12;
        if (offsetSpecification.isOffset() || offsetSpecification.isTimestamp()) {
            length += 8;
        }
        ByteBuf bb = this.allocate(length + 4);
        bb.writeInt(length);
        bb.writeShort((int)Utils.encodeResponseCode((short)26));
        bb.writeShort(1);
        bb.writeInt(correlationId);
        bb.writeShort((int)responseCode);
        bb.writeShort((int)offsetSpecification.getType());
        if (offsetSpecification.isOffset() || offsetSpecification.isTimestamp()) {
            bb.writeLong(offsetSpecification.getOffset());
        }
        this.channel.writeAndFlush((Object)bb);
    }

    void shutdownReason(ShutdownContext.ShutdownReason reason) {
        this.shutdownReason = reason;
    }

    public SocketAddress localAddress() {
        return this.channel.localAddress();
    }

    public SocketAddress remoteAddress() {
        return this.channel.remoteAddress();
    }

    String serverAdvertisedHost() {
        return this.connectionProperties("advertised_host");
    }

    int serverAdvertisedPort() {
        return Integer.parseInt(this.connectionProperties("advertised_port"));
    }

    public String brokerVersion() {
        return this.serverProperties.get("version");
    }

    private String connectionProperties(String key) {
        if (this.connectionProperties != null && this.connectionProperties.containsKey(key)) {
            return this.connectionProperties.get(key);
        }
        throw new IllegalArgumentException("Connection property '" + key + "' missing. Available properties: " + this.connectionProperties + ".");
    }

    private EncodedMessageBatch createEncodedMessageBatch(Compression compression, int batchSize) {
        return EncodedMessageBatch.create(this.channel.alloc(), compression.code(), this.compressionCodecFactory.get(compression), batchSize);
    }

    private Set<ServerFrameHandler.FrameHandlerInfo> maybeExchangeCommandVersions() {
        HashSet<ServerFrameHandler.FrameHandlerInfo> supported = new HashSet<ServerFrameHandler.FrameHandlerInfo>();
        try {
            if (Utils.is3_11_OrMore(this.brokerVersion())) {
                supported.addAll(this.exchangeCommandVersions());
            }
        }
        catch (Exception e) {
            LOGGER.info("Error while exchanging command versions: {}", (Object)e.getMessage());
        }
        return supported;
    }

    static Response responseOk() {
        return Response.OK;
    }

    private <T> OutstandingRequest<T> outstandingRequest() {
        return new OutstandingRequest(this.rpcTimeout, this.host + ":" + this.port);
    }

    public String toString() {
        return "Client{connectionName='" + this.connectionName() + "'}";
    }

    static /* synthetic */ PublishConfirmListener access$4200() {
        return NO_OP_PUBLISH_CONFIRM_LISTENER;
    }

    static /* synthetic */ PublishErrorListener access$4300() {
        return NO_OP_PUBLISH_ERROR_LISTENER;
    }

    private class StreamHandler
    extends ChannelInboundHandlerAdapter {
        private StreamHandler() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            Runnable task;
            ByteBuf m = (ByteBuf)msg;
            Client.this.metricsCollector.readBytes(m.capacity() + 4);
            int frameSize = m.readableBytes();
            short commandId = Utils.extractResponseCode(m.readShort());
            short version = m.readShort();
            if (Client.this.closing.get()) {
                if (commandId == 22) {
                    task = () -> ServerFrameHandler.defaultHandler().handle(Client.this, frameSize, ctx, m);
                } else {
                    LOGGER.debug("Ignoring command {} from server while closing", (Object)commandId);
                    try {
                        while (m.isReadable()) {
                            m.readByte();
                        }
                    }
                    finally {
                        m.release();
                    }
                    task = null;
                }
            } else {
                ServerFrameHandler.FrameHandler frameHandler = ServerFrameHandler.lookup(commandId, version, m);
                task = () -> frameHandler.handle(Client.this, frameSize, ctx, m);
            }
            if (task != null) {
                if (commandId == 8) {
                    Client.this.dispatchingExecutorService.submit(task);
                } else {
                    Client.this.executorService.submit(task);
                }
            }
        }

        public void channelInactive(ChannelHandlerContext ctx) {
            LOGGER.debug("Netty channel became inactive");
            if (Client.this.shutdownReason == null && Client.this.closing.compareAndSet(false, true)) {
                Client.this.executorService.submit(() -> Client.this.closingSequence(ShutdownContext.ShutdownReason.UNKNOWN));
            }
        }

        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent e = (IdleStateEvent)evt;
                if (e.state() == IdleState.READER_IDLE) {
                    LOGGER.info("Closing connection {} on {}:{} because it's been idle for too long", new Object[]{Client.this.clientConnectionName, Client.this.host, Client.this.port});
                    Client.this.closing.set(true);
                    Client.this.closingSequence(ShutdownContext.ShutdownReason.HEARTBEAT_FAILURE);
                } else if (e.state() == IdleState.WRITER_IDLE) {
                    LOGGER.debug("Sending heartbeat frame");
                    ByteBuf bb = Client.this.allocate(ctx.alloc(), 8);
                    bb.writeInt(4).writeShort((int)Utils.encodeRequestCode((short)23)).writeShort(1);
                    ctx.writeAndFlush((Object)bb);
                }
            }
        }

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            if (cause instanceof DecoderException && cause.getCause() instanceof SSLHandshakeException) {
                LOGGER.debug("Error during TLS handshake");
                if (Client.this.outstandingRequests.size() == 1) {
                    ArrayList requests = new ArrayList(Client.this.outstandingRequests.values());
                    if (requests.size() == 1) {
                        OutstandingRequest outstandingRequest = (OutstandingRequest)requests.get(0);
                        outstandingRequest.completeExceptionally(cause.getCause());
                    }
                } else {
                    LOGGER.debug("More than 1 outstanding request: {}", Client.this.outstandingRequests);
                }
            }
            LOGGER.warn("Error in stream handler", cause);
            ctx.close();
        }
    }

    public static class StreamParametersBuilder {
        private final Map<String, String> parameters = new HashMap<String, String>();

        public StreamParametersBuilder maxLengthBytes(long bytes) {
            this.parameters.put("max-length-bytes", String.valueOf(bytes));
            return this;
        }

        public StreamParametersBuilder maxLengthBytes(ByteCapacity bytes) {
            return this.maxLengthBytes(bytes.toBytes());
        }

        public StreamParametersBuilder maxLengthKb(long kiloBytes) {
            return this.maxLengthBytes(kiloBytes * 1000L);
        }

        public StreamParametersBuilder maxLengthMb(long megaBytes) {
            return this.maxLengthBytes(megaBytes * 1000L * 1000L);
        }

        public StreamParametersBuilder maxLengthGb(long gigaBytes) {
            return this.maxLengthBytes(gigaBytes * 1000L * 1000L * 1000L);
        }

        public StreamParametersBuilder maxLengthTb(long teraBytes) {
            return this.maxLengthBytes(teraBytes * 1000L * 1000L * 1000L * 1000L);
        }

        public StreamParametersBuilder maxSegmentSizeBytes(long bytes) {
            if (bytes <= 0L) {
                this.parameters.remove("stream-max-segment-size-bytes");
            } else {
                this.parameters.put("stream-max-segment-size-bytes", String.valueOf(bytes));
            }
            return this;
        }

        public StreamParametersBuilder maxSegmentSizeBytes(ByteCapacity bytes) {
            return this.maxSegmentSizeBytes(bytes == null ? 0L : bytes.toBytes());
        }

        public StreamParametersBuilder maxSegmentSizeKb(long kiloBytes) {
            return this.maxSegmentSizeBytes(kiloBytes * 1000L);
        }

        public StreamParametersBuilder maxSegmentSizeMb(long megaBytes) {
            return this.maxSegmentSizeBytes(megaBytes * 1000L * 1000L);
        }

        public StreamParametersBuilder maxSegmentSizeGb(long gigaBytes) {
            return this.maxSegmentSizeBytes(gigaBytes * 1000L * 1000L * 1000L);
        }

        public StreamParametersBuilder maxSegmentSizeTb(long teraBytes) {
            return this.maxSegmentSizeBytes(teraBytes * 1000L * 1000L * 1000L * 1000L);
        }

        public StreamParametersBuilder maxAge(Duration duration) {
            if (duration == null || duration.isZero() || duration.isNegative() || duration.getSeconds() < 0L) {
                throw new IllegalArgumentException("Max age must be a positive duration");
            }
            this.parameters.put("max-age", duration.getSeconds() + "s");
            return this;
        }

        public StreamParametersBuilder leaderLocator(StreamCreator.LeaderLocator leaderLocator) {
            this.parameters.put("queue-leader-locator", leaderLocator.value());
            return this;
        }

        public StreamParametersBuilder filterSize(int size) {
            if (size < 16 || size > 255) {
                throw new IllegalArgumentException("Stream filter size must be between 16 and 255");
            }
            this.parameters.put("stream-filter-size-bytes", String.valueOf(size));
            return this;
        }

        public StreamParametersBuilder put(String key, String value) {
            this.parameters.put(key, value);
            return this;
        }

        public Map<String, String> build() {
            return new HashMap<String, String>(this.parameters);
        }
    }

    static final class SubscriptionOffset {
        private final int subscriptionId;
        private final long offset;

        SubscriptionOffset(int subscriptionId, long offset) {
            this.subscriptionId = subscriptionId;
            this.offset = offset;
        }

        int subscriptionId() {
            return this.subscriptionId;
        }

        long offset() {
            return this.offset;
        }
    }

    static class OutstandingRequest<T> {
        private final CountDownLatch latch = new CountDownLatch(1);
        private final Duration timeout;
        private final String node;
        private final AtomicReference<T> response = new AtomicReference();
        private final AtomicReference<Throwable> error = new AtomicReference();

        private OutstandingRequest(Duration timeout, String node) {
            this.timeout = timeout;
            this.node = node;
        }

        void block() {
            boolean completed;
            try {
                completed = this.latch.await(this.timeout.toMillis(), TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new StreamException("Interrupted while waiting for response");
            }
            if (!completed) {
                throw new TimeoutStreamException(String.format("Could not get response in %d ms from node %s", this.timeout.toMillis(), this.node));
            }
        }

        void completeExceptionally(Throwable cause) {
            this.error.set(cause);
            this.latch.countDown();
        }

        Throwable error() {
            return this.error.get();
        }

        AtomicReference<T> response() {
            return this.response;
        }

        void countDown() {
            this.latch.countDown();
        }
    }

    public static class ClientParameters {
        private final Map<String, String> clientProperties = new ConcurrentHashMap<String, String>();
        EventLoopGroup eventLoopGroup;
        private Codec codec;
        private String host = "localhost";
        private int port = 5552;
        CompressionCodecFactory compressionCodecFactory;
        private String virtualHost = "/";
        private Duration requestedHeartbeat = Duration.ofSeconds(60L);
        private int requestedMaxFrameSize = 0x100000;
        private PublishConfirmListener publishConfirmListener = Client.access$4200();
        private PublishErrorListener publishErrorListener = Client.access$4300();
        private ChunkListener chunkListener = (client, correlationId, offset, messageCount, dataSize) -> null;
        private MessageListener messageListener = (correlationId, offset, chunkTimestamp, committedOffset, chunkContext, message) -> {};
        private MessageIgnoredListener messageIgnoredListener = (subscriptionId, offset, chunkTimestamp, committedChunkId, chunkContext) -> {};
        private MetadataListener metadataListener = (stream, code) -> {};
        private CreditNotification creditNotification = (subscriptionId, responseCode) -> LOGGER.warn("Received notification for subscription {}: {}", (Object)subscriptionId, (Object)Utils.formatConstant(responseCode));
        private ConsumerUpdateListener consumerUpdateListener = (client, subscriptionId, active) -> null;
        private ShutdownListener shutdownListener = shutdownContext -> {};
        private SaslConfiguration saslConfiguration = DefaultSaslConfiguration.PLAIN;
        private CredentialsProvider credentialsProvider = new DefaultUsernamePasswordCredentialsProvider("guest", "guest");
        private ChunkChecksum chunkChecksum = JdkChunkChecksum.CRC32_SINGLETON;
        private MetricsCollector metricsCollector = NoOpMetricsCollector.SINGLETON;
        private SslContext sslContext;
        private boolean tlsHostnameVerification = true;
        private ByteBufAllocator byteBufAllocator;
        private Duration rpcTimeout;
        private Consumer<Channel> channelCustomizer = Utils.noOpConsumer();
        private Consumer<Bootstrap> bootstrapCustomizer = Utils.noOpConsumer();
        private ExecutorServiceFactory dispatchingExecutorServiceFactory;
        private ExecutorServiceFactory executorServiceFactory;

        public ClientParameters host(String host) {
            this.host = host;
            return this;
        }

        public ClientParameters port(int port) {
            this.port = port;
            return this;
        }

        public ClientParameters publishConfirmListener(PublishConfirmListener publishConfirmListener) {
            this.publishConfirmListener = publishConfirmListener;
            return this;
        }

        public ClientParameters publishErrorListener(PublishErrorListener publishErrorListener) {
            this.publishErrorListener = publishErrorListener;
            return this;
        }

        public ClientParameters chunkListener(ChunkListener chunkListener) {
            this.chunkListener = chunkListener;
            return this;
        }

        public ClientParameters messageListener(MessageListener messageListener) {
            this.messageListener = messageListener;
            return this;
        }

        public ClientParameters messageIgnoredListener(MessageIgnoredListener messageIgnoredListener) {
            this.messageIgnoredListener = messageIgnoredListener;
            return this;
        }

        public ClientParameters creditNotification(CreditNotification creditNotification) {
            this.creditNotification = creditNotification;
            return this;
        }

        public ClientParameters consumerUpdateListener(ConsumerUpdateListener consumerUpdateListener) {
            this.consumerUpdateListener = consumerUpdateListener;
            return this;
        }

        public ClientParameters codec(Codec codec) {
            this.codec = codec;
            return this;
        }

        public ClientParameters eventLoopGroup(EventLoopGroup eventLoopGroup) {
            this.eventLoopGroup = eventLoopGroup;
            return this;
        }

        public ClientParameters byteBufAllocator(ByteBufAllocator byteBufAllocator) {
            this.byteBufAllocator = byteBufAllocator;
            return this;
        }

        public ClientParameters saslConfiguration(SaslConfiguration saslConfiguration) {
            this.saslConfiguration = saslConfiguration;
            return this;
        }

        public ClientParameters credentialsProvider(CredentialsProvider credentialsProvider) {
            this.credentialsProvider = credentialsProvider;
            return this;
        }

        public ClientParameters username(String username) {
            this.credentialsProvider = this.credentialsProvider instanceof UsernamePasswordCredentialsProvider ? new DefaultUsernamePasswordCredentialsProvider(username, ((UsernamePasswordCredentialsProvider)this.credentialsProvider).getPassword()) : new DefaultUsernamePasswordCredentialsProvider(username, null);
            return this;
        }

        public ClientParameters password(String password) {
            this.credentialsProvider = this.credentialsProvider instanceof UsernamePasswordCredentialsProvider ? new DefaultUsernamePasswordCredentialsProvider(((UsernamePasswordCredentialsProvider)this.credentialsProvider).getUsername(), password) : new DefaultUsernamePasswordCredentialsProvider(null, password);
            return this;
        }

        public ClientParameters virtualHost(String virtualHost) {
            this.virtualHost = virtualHost;
            return this;
        }

        public ClientParameters requestedHeartbeat(Duration requestedHeartbeat) {
            this.requestedHeartbeat = requestedHeartbeat;
            return this;
        }

        public ClientParameters requestedMaxFrameSize(int requestedMaxFrameSize) {
            this.requestedMaxFrameSize = requestedMaxFrameSize;
            return this;
        }

        public ClientParameters chunkChecksum(ChunkChecksum chunkChecksum) {
            this.chunkChecksum = chunkChecksum;
            return this;
        }

        public ClientParameters clientProperties(Map<String, String> clientProperties) {
            this.clientProperties.putAll(clientProperties);
            return this;
        }

        public ClientParameters clientProperty(String key, String value) {
            this.clientProperties.put(key, value);
            return this;
        }

        public ClientParameters metricsCollector(MetricsCollector metricsCollector) {
            this.metricsCollector = metricsCollector;
            return this;
        }

        public ClientParameters metadataListener(MetadataListener metadataListener) {
            this.metadataListener = metadataListener;
            return this;
        }

        public ClientParameters shutdownListener(ShutdownListener shutdownListener) {
            this.shutdownListener = shutdownListener;
            return this;
        }

        public ClientParameters sslContext(SslContext sslContext) {
            this.sslContext = sslContext;
            if (this.port == 5552 && sslContext != null) {
                this.port = 5551;
            }
            return this;
        }

        public ClientParameters tlsHostnameVerification(boolean tlsHostnameVerification) {
            this.tlsHostnameVerification = tlsHostnameVerification;
            return this;
        }

        public ClientParameters compressionCodecFactory(CompressionCodecFactory compressionCodecFactory) {
            this.compressionCodecFactory = compressionCodecFactory;
            return this;
        }

        public ClientParameters rpcTimeout(Duration rpcTimeout) {
            this.rpcTimeout = rpcTimeout;
            return this;
        }

        public ClientParameters dispatchingExecutorServiceFactory(ExecutorServiceFactory dispatchingExecutorServiceFactory) {
            this.dispatchingExecutorServiceFactory = dispatchingExecutorServiceFactory;
            return this;
        }

        public ClientParameters executorServiceFactory(ExecutorServiceFactory executorServiceFactory) {
            this.executorServiceFactory = executorServiceFactory;
            return this;
        }

        String host() {
            return this.host;
        }

        int port() {
            return this.port;
        }

        Map<String, String> clientProperties() {
            return Collections.unmodifiableMap(this.clientProperties);
        }

        Codec codec() {
            return this.codec;
        }

        CredentialsProvider credentialsProvider() {
            return this.credentialsProvider;
        }

        public ClientParameters channelCustomizer(Consumer<Channel> channelCustomizer) {
            this.channelCustomizer = channelCustomizer;
            return this;
        }

        public ClientParameters bootstrapCustomizer(Consumer<Bootstrap> bootstrapCustomizer) {
            this.bootstrapCustomizer = bootstrapCustomizer;
            return this;
        }

        ClientParameters duplicate() {
            ClientParameters duplicate = new ClientParameters();
            for (Field field : ClientParameters.class.getDeclaredFields()) {
                field.setAccessible(true);
                try {
                    ConcurrentHashMap value = field.get(this);
                    if (value instanceof Map) {
                        value = new ConcurrentHashMap(value);
                    }
                    field.set(duplicate, value);
                }
                catch (IllegalAccessException e) {
                    throw new StreamException("Error while duplicating client parameters", e);
                }
            }
            return duplicate;
        }
    }

    public static class Broker {
        private final String host;
        private final int port;

        public Broker(String host, int port) {
            this.host = host;
            this.port = port;
        }

        public String getHost() {
            return this.host;
        }

        public int getPort() {
            return this.port;
        }

        public String toString() {
            return "Broker{host='" + this.host + '\'' + ", port=" + this.port + '}';
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            Broker broker = (Broker)o;
            return this.port == broker.port && this.host.equals(broker.host);
        }

        public int hashCode() {
            return Objects.hash(this.host, this.port);
        }

        String label() {
            return this.host + ":" + this.port;
        }
    }

    public static class StreamMetadata {
        private final String stream;
        private final short responseCode;
        private final Broker leader;
        private final List<Broker> replicas;

        public StreamMetadata(String stream, short responseCode, Broker leader, List<Broker> replicas) {
            this.stream = stream;
            this.responseCode = responseCode;
            this.leader = leader;
            this.replicas = replicas == null ? null : Collections.unmodifiableList(replicas);
        }

        public short getResponseCode() {
            return this.responseCode;
        }

        public boolean isResponseOk() {
            return this.responseCode == 1;
        }

        public Broker getLeader() {
            return this.leader;
        }

        public List<Broker> getReplicas() {
            return this.replicas;
        }

        public String getStream() {
            return this.stream;
        }

        public String toString() {
            return "StreamMetadata{stream='" + this.stream + '\'' + ", responseCode=" + this.responseCode + ", leader=" + this.leader + ", replicas=" + this.replicas + '}';
        }
    }

    static class StreamStatsResponse
    extends Response {
        private final Map<String, Long> info;

        StreamStatsResponse(short responseCode, Map<String, Long> info) {
            super(responseCode);
            this.info = Collections.unmodifiableMap(new HashMap<String, Long>(info));
        }

        public Map<String, Long> getInfo() {
            return this.info;
        }
    }

    static class QueryPublisherSequenceResponse
    extends Response {
        private final long sequence;

        QueryPublisherSequenceResponse(short responseCode, long sequence) {
            super(responseCode);
            this.sequence = sequence;
        }

        public long getSequence() {
            return this.sequence;
        }
    }

    public static class QueryOffsetResponse
    extends Response {
        private final long offset;

        public QueryOffsetResponse(short responseCode, long offset) {
            super(responseCode);
            this.offset = offset;
        }

        public long getOffset() {
            return this.offset;
        }
    }

    static class OpenResponse
    extends Response {
        private final Map<String, String> connectionProperties;

        OpenResponse(short responseCode, Map<String, String> connectionProperties) {
            super(responseCode);
            this.connectionProperties = connectionProperties;
        }
    }

    static class SaslAuthenticateResponse
    extends Response {
        private final byte[] challenge;

        public SaslAuthenticateResponse(short responseCode, byte[] challenge) {
            super(responseCode);
            this.challenge = challenge;
        }

        public boolean isChallenge() {
            return this.getResponseCode() == 10;
        }

        public boolean isAuthenticationFailure() {
            return this.getResponseCode() == 8 || this.getResponseCode() == 11;
        }
    }

    public static class Response {
        private static final Response OK = new Response(1);
        private final short responseCode;

        public Response(short responseCode) {
            this.responseCode = responseCode;
        }

        public boolean isOk() {
            return this.responseCode == 1;
        }

        public short getResponseCode() {
            return this.responseCode;
        }

        public String toString() {
            return Utils.formatConstant(this.responseCode);
        }
    }

    static class TuneState {
        private final CountDownLatch latch = new CountDownLatch(1);
        private final AtomicInteger maxFrameSize = new AtomicInteger();
        private final AtomicInteger heartbeat = new AtomicInteger();
        private final int requestedMaxFrameSize;
        private final int requestedHeartbeat;

        public TuneState(int requestedMaxFrameSize, int requestedHeartbeat) {
            this.requestedMaxFrameSize = requestedMaxFrameSize;
            this.requestedHeartbeat = requestedHeartbeat;
        }

        void await(Duration duration) {
            try {
                boolean completed = this.latch.await(duration.toMillis(), TimeUnit.MILLISECONDS);
                if (!completed) {
                    throw new StreamException("Waited for tune frame for " + duration.getSeconds() + " second(s)");
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new StreamException("Interrupted while waiting for tune frame");
            }
        }

        int getMaxFrameSize() {
            return this.maxFrameSize.get();
        }

        int getHeartbeat() {
            return this.heartbeat.get();
        }

        int requestedHeartbeat() {
            return this.requestedHeartbeat;
        }

        int requestedMaxFrameSize() {
            return this.requestedMaxFrameSize;
        }

        TuneState maxFrameSize(int maxFrameSize) {
            this.maxFrameSize.set(maxFrameSize);
            return this;
        }

        TuneState heartbeat(int heartbeat) {
            this.heartbeat.set(heartbeat);
            return this;
        }

        public void done() {
            this.latch.countDown();
        }
    }

    public static class ShutdownContext {
        private final ShutdownReason shutdownReason;

        ShutdownContext(ShutdownReason shutdownReason) {
            this.shutdownReason = shutdownReason;
        }

        public ShutdownReason getShutdownReason() {
            return this.shutdownReason;
        }

        boolean isShutdownUnexpected() {
            return this.getShutdownReason() == ShutdownReason.HEARTBEAT_FAILURE || this.getShutdownReason() == ShutdownReason.UNKNOWN;
        }

        public static enum ShutdownReason {
            CLIENT_CLOSE,
            SERVER_CLOSE,
            HEARTBEAT_FAILURE,
            UNKNOWN;

        }
    }

    private static class OutboundMessageBatchWriteCallback
    implements OutboundEntityWriteCallback {
        private OutboundMessageBatchWriteCallback() {
        }

        @Override
        public int write(ByteBuf bb, Object entity, long publishingId) {
            EncodedMessageBatch batchToPublish = (EncodedMessageBatch)entity;
            bb.writeByte(0x80 | batchToPublish.compression() << 4);
            bb.writeShort(batchToPublish.batchSize());
            bb.writeInt(batchToPublish.uncompressedSizeInBytes());
            bb.writeInt(batchToPublish.sizeInBytes());
            batchToPublish.write(bb);
            return batchToPublish.batchSize();
        }

        @Override
        public int fragmentLength(Object entity) {
            return 19 + ((EncodedMessageBatch)entity).sizeInBytes();
        }
    }

    private static class OutboundMessageWriteCallback
    implements OutboundEntityWriteCallback {
        private OutboundMessageWriteCallback() {
        }

        @Override
        public int write(ByteBuf bb, Object entity, long publishingId) {
            Codec.EncodedMessage messageToPublish = (Codec.EncodedMessage)entity;
            bb.writeInt(messageToPublish.getSize());
            bb.writeBytes(messageToPublish.getData(), 0, messageToPublish.getSize());
            return 1;
        }

        @Override
        public int fragmentLength(Object entity) {
            return 12 + ((Codec.EncodedMessage)entity).getSize();
        }
    }

    static class CompressedEncodedMessageBatch
    implements EncodedMessageBatch {
        private final ByteBufAllocator allocator;
        private final CompressionCodec codec;
        private final List<Codec.EncodedMessage> messages;
        private int uncompressedByteSize = 0;
        private ByteBuf buffer;

        CompressedEncodedMessageBatch(ByteBufAllocator allocator, CompressionCodec codec, List<Codec.EncodedMessage> messages, int batchSize) {
            this.allocator = allocator;
            this.codec = codec;
            this.messages = new ArrayList<Codec.EncodedMessage>(batchSize);
            for (int i = 0; i < messages.size(); ++i) {
                this.add(messages.get(i));
            }
        }

        CompressedEncodedMessageBatch(ByteBufAllocator allocator, CompressionCodec codec, int batchSize) {
            this(allocator, codec, Collections.emptyList(), batchSize);
        }

        @Override
        public void add(Codec.EncodedMessage encodedMessage) {
            this.messages.add(encodedMessage);
            this.uncompressedByteSize += 4 + encodedMessage.getSize();
        }

        @Override
        public void close() {
            int maxCompressedLength = this.codec.maxCompressedLength(this.uncompressedByteSize);
            this.buffer = this.allocator.buffer(maxCompressedLength);
            OutputStream outputStream = this.codec.compress((OutputStream)new ByteBufOutputStream(this.buffer));
            try {
                for (int i = 0; i < this.messages.size(); ++i) {
                    int size = this.messages.get(i).getSize();
                    outputStream.write(size >>> 24 & 0xFF);
                    outputStream.write(size >>> 16 & 0xFF);
                    outputStream.write(size >>> 8 & 0xFF);
                    outputStream.write(size >>> 0 & 0xFF);
                    outputStream.write(this.messages.get(i).getData(), 0, size);
                }
                outputStream.flush();
                outputStream.close();
            }
            catch (IOException e) {
                throw new StreamException("Error while closing compressing output stream", e);
            }
        }

        @Override
        public void write(ByteBuf bb) {
            bb.writeBytes(this.buffer, 0, this.buffer.writerIndex());
            this.buffer.release();
        }

        @Override
        public int batchSize() {
            return this.messages.size();
        }

        @Override
        public int sizeInBytes() {
            return this.buffer.writerIndex();
        }

        @Override
        public int uncompressedSizeInBytes() {
            return this.uncompressedByteSize;
        }

        @Override
        public byte compression() {
            return this.codec.code();
        }
    }

    private static class PlainEncodedMessageBatch
    implements EncodedMessageBatch {
        private final List<Codec.EncodedMessage> messages;
        private int size;

        PlainEncodedMessageBatch(List<Codec.EncodedMessage> messages) {
            this.messages = messages;
        }

        @Override
        public void add(Codec.EncodedMessage encodedMessage) {
            this.messages.add(encodedMessage);
            this.size += 4 + encodedMessage.getSize();
        }

        @Override
        public void close() {
        }

        @Override
        public void write(ByteBuf bb) {
            for (Codec.EncodedMessage message : this.messages) {
                bb.writeInt(message.getSize()).writeBytes(message.getData(), 0, message.getSize());
            }
        }

        @Override
        public int batchSize() {
            return this.messages.size();
        }

        @Override
        public int sizeInBytes() {
            return this.size;
        }

        @Override
        public int uncompressedSizeInBytes() {
            return this.size;
        }

        @Override
        public byte compression() {
            return Compression.NONE.code();
        }
    }

    private static final class OriginalEncodedEntityOutboundEntityWriteCallback
    implements OutboundEntityWriteCallback {
        private final OutboundEntityMappingCallback callback;
        private final OutboundEntityWriteCallback delegate;

        private OriginalEncodedEntityOutboundEntityWriteCallback(OutboundEntityMappingCallback callback, OutboundEntityWriteCallback delegate) {
            this.callback = callback;
            this.delegate = delegate;
        }

        @Override
        public int write(ByteBuf bb, Object entity, long publishingId) {
            OriginalAndEncodedOutboundEntity wrapper = (OriginalAndEncodedOutboundEntity)entity;
            this.callback.handle(publishingId, wrapper.original);
            return this.delegate.write(bb, wrapper.encoded, publishingId);
        }

        @Override
        public int fragmentLength(Object entity) {
            OriginalAndEncodedOutboundEntity wrapper = (OriginalAndEncodedOutboundEntity)entity;
            return this.delegate.fragmentLength(wrapper.encoded);
        }
    }

    private static final class OriginalAndEncodedOutboundEntity {
        private final Object original;
        private final Object encoded;

        private OriginalAndEncodedOutboundEntity(Object original, Object encoded) {
            this.original = original;
            this.encoded = encoded;
        }
    }

    static interface EncodedMessageBatch {
        public static EncodedMessageBatch create(ByteBufAllocator allocator, byte compression, CompressionCodec compressionCodec, int batchSize) {
            if (compression == Compression.NONE.code()) {
                return new PlainEncodedMessageBatch(new ArrayList<Codec.EncodedMessage>(batchSize));
            }
            return new CompressedEncodedMessageBatch(allocator, compressionCodec, batchSize);
        }

        public void add(Codec.EncodedMessage var1);

        public void close();

        public void write(ByteBuf var1);

        public int batchSize();

        public int sizeInBytes();

        public int uncompressedSizeInBytes();

        public byte compression();
    }

    public static interface ShutdownListener {
        public void handle(ShutdownContext var1);
    }

    public static interface ConsumerUpdateListener {
        public OffsetSpecification handle(Client var1, byte var2, boolean var3);
    }

    public static interface CreditNotification {
        public void handle(byte var1, short var2);
    }

    public static interface MessageIgnoredListener {
        public void ignored(byte var1, long var2, long var4, long var6, Object var8);
    }

    public static interface MessageListener {
        public void handle(byte var1, long var2, long var4, long var6, Object var8, Message var9);
    }

    public static interface ChunkListener {
        public Object handle(Client var1, byte var2, long var3, long var5, long var7);
    }

    public static interface MetadataListener {
        public void handle(String var1, short var2);
    }

    public static interface PublishErrorListener {
        public void handle(byte var1, long var2, short var4);
    }

    public static interface PublishConfirmListener {
        public void handle(byte var1, long var2);
    }

    static interface OutboundEntityWriteCallback {
        public int write(ByteBuf var1, Object var2, long var3);

        public int fragmentLength(Object var1);
    }

    public static interface OutboundEntityMappingCallback {
        public void handle(long var1, Object var3);
    }

    private static class MetricsHandler
    extends ChannelOutboundHandlerAdapter {
        private final MetricsCollector metricsCollector;

        private MetricsHandler(MetricsCollector metricsCollector) {
            this.metricsCollector = metricsCollector;
        }

        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            this.metricsCollector.writtenBytes(((ByteBuf)msg).capacity());
            super.write(ctx, msg, promise);
        }
    }
}

