/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionHandler;
import org.apache.pulsar.client.impl.HandlerState;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse;
import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxnResponse;
import org.apache.pulsar.common.api.proto.CommandEndTxnResponse;
import org.apache.pulsar.common.api.proto.CommandNewTxnResponse;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.api.proto.Subscription;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.BackoffBuilder;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.io.netty.util.Recycler;
import org.apache.pulsar.shade.io.netty.util.ReferenceCountUtil;
import org.apache.pulsar.shade.io.netty.util.Timeout;
import org.apache.pulsar.shade.io.netty.util.Timer;
import org.apache.pulsar.shade.io.netty.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TransactionMetaStoreHandler
extends HandlerState
implements ConnectionHandler.Connection,
Closeable,
TimerTask {
    private static final Logger LOG = LoggerFactory.getLogger(TransactionMetaStoreHandler.class);
    private final long transactionCoordinatorId;
    private final ConnectionHandler connectionHandler;
    private final ConcurrentLongHashMap<OpBase<?>> pendingRequests = ConcurrentLongHashMap.newBuilder().expectedItems(16).concurrencyLevel(1).build();
    private final ConcurrentLinkedQueue<RequestTime> timeoutQueue;
    protected final Timer timer;
    private final ExecutorService internalPinnedExecutor;
    private final boolean blockIfReachMaxPendingOps;
    private final Semaphore semaphore;
    private Timeout requestTimeout;
    private final CompletableFuture<Void> connectFuture;
    private final long lookupDeadline;
    private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<Throwable>();

    public TransactionMetaStoreHandler(long transactionCoordinatorId, PulsarClientImpl pulsarClient, String topic, CompletableFuture<Void> connectFuture) {
        super(pulsarClient, topic);
        this.transactionCoordinatorId = transactionCoordinatorId;
        this.timeoutQueue = new ConcurrentLinkedQueue();
        this.blockIfReachMaxPendingOps = true;
        this.semaphore = new Semaphore(1000);
        this.requestTimeout = pulsarClient.timer().newTimeout(this, pulsarClient.getConfiguration().getOperationTimeoutMs(), TimeUnit.MILLISECONDS);
        this.connectionHandler = new ConnectionHandler(this, new BackoffBuilder().setInitialTime(pulsarClient.getConfiguration().getInitialBackoffIntervalNanos(), TimeUnit.NANOSECONDS).setMax(pulsarClient.getConfiguration().getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS).setMandatoryStop(100L, TimeUnit.MILLISECONDS).create(), this);
        this.connectFuture = connectFuture;
        this.internalPinnedExecutor = pulsarClient.getInternalExecutorService();
        this.timer = pulsarClient.timer();
        this.lookupDeadline = System.currentTimeMillis() + this.client.getConfiguration().getLookupTimeoutMs();
    }

    public void start() {
        this.connectionHandler.grabCnx();
    }

    @Override
    public void connectionFailed(PulsarClientException exception) {
        boolean timeout;
        boolean nonRetriableError = !PulsarClientException.isRetriableError((Throwable)exception);
        boolean bl = timeout = System.currentTimeMillis() > this.lookupDeadline;
        if (nonRetriableError || timeout) {
            exception.setPreviousExceptions(this.previousExceptions);
            if (this.connectFuture.completeExceptionally(exception)) {
                if (nonRetriableError) {
                    LOG.error("Transaction meta handler with transaction coordinator id {} connection failed.", (Object)this.transactionCoordinatorId, (Object)exception);
                } else {
                    LOG.error("Transaction meta handler with transaction coordinator id {} connection failed after timeout", (Object)this.transactionCoordinatorId, (Object)exception);
                }
                this.setState(HandlerState.State.Failed);
            }
        } else {
            this.previousExceptions.add(exception);
        }
    }

    @Override
    public CompletableFuture<Void> connectionOpened(ClientCnx cnx) {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.internalPinnedExecutor.execute(() -> {
            LOG.info("Transaction meta handler with transaction coordinator id {} connection opened.", (Object)this.transactionCoordinatorId);
            HandlerState.State state = this.getState();
            if (state == HandlerState.State.Closing || state == HandlerState.State.Closed) {
                this.setState(HandlerState.State.Closed);
                this.failPendingRequest();
                future.complete(null);
                return;
            }
            if (cnx.getRemoteEndpointProtocolVersion() > ProtocolVersion.v18.getValue()) {
                long requestId = this.client.newRequestId();
                ByteBuf request = Commands.newTcClientConnectRequest(this.transactionCoordinatorId, requestId);
                ((CompletableFuture)cnx.sendRequestWithId(request, requestId).thenRun(() -> this.internalPinnedExecutor.execute(() -> {
                    LOG.info("Transaction coordinator client connect success! tcId : {}", (Object)this.transactionCoordinatorId);
                    if (this.registerToConnection(cnx)) {
                        this.connectionHandler.resetBackoff();
                        this.pendingRequests.forEach((requestID, opBase) -> this.checkStateAndSendRequest((OpBase<?>)opBase));
                    }
                    future.complete(null);
                }))).exceptionally(e -> {
                    this.internalPinnedExecutor.execute(() -> {
                        LOG.error("Transaction coordinator client connect fail! tcId : {}", (Object)this.transactionCoordinatorId, (Object)e.getCause());
                        if (this.getState() == HandlerState.State.Closing || this.getState() == HandlerState.State.Closed || e.getCause() instanceof PulsarClientException.NotAllowedException) {
                            this.setState(HandlerState.State.Closed);
                            cnx.channel().close();
                            future.complete(null);
                        } else {
                            future.completeExceptionally(e.getCause());
                        }
                    });
                    return null;
                });
            } else {
                this.registerToConnection(cnx);
                future.complete(null);
            }
        });
        return future;
    }

    private boolean registerToConnection(ClientCnx cnx) {
        if (this.changeToReadyState()) {
            this.connectionHandler.setClientCnx(cnx);
            cnx.registerTransactionMetaStoreHandler(this.transactionCoordinatorId, this);
            this.connectFuture.complete(null);
            return true;
        }
        HandlerState.State state = this.getState();
        cnx.channel().close();
        this.connectFuture.completeExceptionally(new IllegalStateException("Failed to change the state from " + (Object)((Object)state) + " to Ready"));
        return false;
    }

    private void failPendingRequest() {
        this.pendingRequests.forEach((k, op) -> {
            if (op != null && !op.callback.isDone()) {
                op.callback.completeExceptionally((Throwable)new PulsarClientException.AlreadyClosedException("Could not get response from transaction meta store when the transaction meta store has already close."));
                this.onResponse((OpBase<?>)op);
            }
        });
        this.pendingRequests.clear();
    }

    public CompletableFuture<TxnID> newTransactionAsync(long timeout, TimeUnit unit) {
        CompletableFuture<TxnID> callback;
        if (LOG.isDebugEnabled()) {
            LOG.debug("New transaction with timeout in ms {}", (Object)unit.toMillis(timeout));
        }
        if (!this.canSendRequest(callback = new CompletableFuture<TxnID>())) {
            return callback;
        }
        long requestId = this.client.newRequestId();
        ByteBuf cmd = Commands.newTxn(this.transactionCoordinatorId, requestId, unit.toMillis(timeout));
        String description = String.format("Create new transaction %s", this.transactionCoordinatorId);
        OpForTxnIdCallBack op = OpForTxnIdCallBack.create(cmd, callback, this.client, description, this.cnx());
        this.internalPinnedExecutor.execute(() -> {
            this.pendingRequests.put(requestId, op);
            this.timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId));
            if (!this.checkStateAndSendRequest(op)) {
                this.pendingRequests.remove(requestId);
            }
        });
        return callback;
    }

    void handleNewTxnResponse(CommandNewTxnResponse response) {
        String message;
        ServerError error;
        boolean hasError = response.hasError();
        if (hasError) {
            error = response.getError();
            message = response.getMessage();
        } else {
            error = null;
            message = null;
        }
        TxnID txnID = new TxnID(response.getTxnidMostBits(), response.getTxnidLeastBits());
        long requestId = response.getRequestId();
        this.internalPinnedExecutor.execute(() -> {
            OpForTxnIdCallBack op = (OpForTxnIdCallBack)this.pendingRequests.remove(requestId);
            if (op == null) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got new txn response for transaction {}", (Object)txnID);
                }
                return;
            }
            if (!hasError) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got new txn response {} for request {}", (Object)txnID, (Object)requestId);
                }
                op.callback.complete(txnID);
            } else {
                if (this.checkIfNeedRetryByError(error, message, op)) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Get a response for the {}  request {} error TransactionCoordinatorNotFound and try it again", (Object)BaseCommand.Type.NEW_TXN.name(), (Object)requestId);
                    }
                    this.pendingRequests.put(requestId, op);
                    this.timer.newTimeout(timeout -> this.internalPinnedExecutor.execute(() -> {
                        if (!this.pendingRequests.containsKey(requestId)) {
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("The request {} already timeout", (Object)requestId);
                            }
                            return;
                        }
                        if (!this.checkStateAndSendRequest(op)) {
                            this.pendingRequests.remove(requestId);
                        }
                    }), op.backoff.next(), TimeUnit.MILLISECONDS);
                    return;
                }
                LOG.error("Got {} for request {} error {}", new Object[]{BaseCommand.Type.NEW_TXN.name(), requestId, error});
            }
            this.onResponse(op);
        });
    }

    public CompletableFuture<Void> addPublishPartitionToTxnAsync(TxnID txnID, List<String> partitions) {
        CompletableFuture<Void> callback;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Add publish partition {} to txn {}", partitions, (Object)txnID);
        }
        if (!this.canSendRequest(callback = new CompletableFuture<Void>())) {
            return callback;
        }
        long requestId = this.client.newRequestId();
        ByteBuf cmd = Commands.newAddPartitionToTxn(requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), partitions);
        String description = String.format("Add partition %s to TXN %s", String.valueOf(partitions), String.valueOf(txnID));
        OpForVoidCallBack op = OpForVoidCallBack.create(cmd, callback, this.client, description, this.cnx());
        this.internalPinnedExecutor.execute(() -> {
            this.pendingRequests.put(requestId, op);
            this.timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId));
            if (!this.checkStateAndSendRequest(op)) {
                this.pendingRequests.remove(requestId);
            }
        });
        return callback;
    }

    void handleAddPublishPartitionToTxnResponse(CommandAddPartitionToTxnResponse response) {
        String message;
        ServerError error;
        boolean hasError = response.hasError();
        if (hasError) {
            error = response.getError();
            message = response.getMessage();
        } else {
            error = null;
            message = null;
        }
        TxnID txnID = new TxnID(response.getTxnidMostBits(), response.getTxnidLeastBits());
        long requestId = response.getRequestId();
        this.internalPinnedExecutor.execute(() -> {
            OpForVoidCallBack op = (OpForVoidCallBack)this.pendingRequests.remove(requestId);
            if (op == null) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got add publish partition to txn response for transaction {}", (Object)txnID);
                }
                return;
            }
            if (!hasError) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Add publish partition for request {} success.", (Object)requestId);
                }
                op.callback.complete(null);
            } else {
                if (this.checkIfNeedRetryByError(error, message, op)) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Get a response for the {} request {}  error TransactionCoordinatorNotFound and try it again", (Object)BaseCommand.Type.ADD_PARTITION_TO_TXN.name(), (Object)requestId);
                    }
                    this.pendingRequests.put(requestId, op);
                    this.timer.newTimeout(timeout -> this.internalPinnedExecutor.execute(() -> {
                        if (!this.pendingRequests.containsKey(requestId)) {
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("The request {} already timeout", (Object)requestId);
                            }
                            return;
                        }
                        if (!this.checkStateAndSendRequest(op)) {
                            this.pendingRequests.remove(requestId);
                        }
                    }), op.backoff.next(), TimeUnit.MILLISECONDS);
                    return;
                }
                LOG.error("{} for request {}, transaction {}, error: {}", new Object[]{BaseCommand.Type.ADD_PARTITION_TO_TXN.name(), requestId, txnID, error});
            }
            this.onResponse(op);
        });
    }

    public CompletableFuture<Void> addSubscriptionToTxn(TxnID txnID, List<Subscription> subscriptionList) {
        CompletableFuture<Void> callback;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Add subscription {} to txn {}.", subscriptionList, (Object)txnID);
        }
        if (!this.canSendRequest(callback = new CompletableFuture<Void>())) {
            return callback;
        }
        long requestId = this.client.newRequestId();
        ByteBuf cmd = Commands.newAddSubscriptionToTxn(requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), subscriptionList);
        String description = String.format("Add subscription %s to TXN %s", this.toStringSubscriptionList(subscriptionList), String.valueOf(txnID));
        OpForVoidCallBack op = OpForVoidCallBack.create(cmd, callback, this.client, description, this.cnx());
        this.internalPinnedExecutor.execute(() -> {
            this.pendingRequests.put(requestId, op);
            this.timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId));
            if (!this.checkStateAndSendRequest(op)) {
                this.pendingRequests.remove(requestId);
            }
        });
        return callback;
    }

    private String toStringSubscriptionList(List<Subscription> list) {
        if (list == null || list.isEmpty()) {
            return "[]";
        }
        StringBuilder builder = new StringBuilder("[");
        for (Subscription subscription : list) {
            builder.append(String.format("%s %s", subscription.getTopic(), subscription.getSubscription()));
        }
        return builder.append("]").toString();
    }

    public void handleAddSubscriptionToTxnResponse(CommandAddSubscriptionToTxnResponse response) {
        String message;
        ServerError error;
        boolean hasError = response.hasError();
        if (hasError) {
            error = response.getError();
            message = response.getMessage();
        } else {
            error = null;
            message = null;
        }
        long requestId = response.getRequestId();
        TxnID txnID = new TxnID(response.getTxnidMostBits(), response.getTxnidLeastBits());
        this.internalPinnedExecutor.execute(() -> {
            OpForVoidCallBack op = (OpForVoidCallBack)this.pendingRequests.remove(requestId);
            if (op == null) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Add subscription to txn timeout for request {}.", (Object)requestId);
                }
                return;
            }
            if (!hasError) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Add subscription to txn success for request {}.", (Object)requestId);
                }
                op.callback.complete(null);
            } else {
                LOG.error("Add subscription to txn failed for request {}, transaction {}, error: {}", new Object[]{requestId, txnID, error});
                if (this.checkIfNeedRetryByError(error, message, op)) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Get a response for {} request {} error TransactionCoordinatorNotFound and try it again", (Object)BaseCommand.Type.ADD_SUBSCRIPTION_TO_TXN.name(), (Object)requestId);
                    }
                    this.pendingRequests.put(requestId, op);
                    this.timer.newTimeout(timeout -> this.internalPinnedExecutor.execute(() -> {
                        if (!this.pendingRequests.containsKey(requestId)) {
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("The request {} already timeout", (Object)requestId);
                            }
                            return;
                        }
                        if (!this.checkStateAndSendRequest(op)) {
                            this.pendingRequests.remove(requestId);
                        }
                    }), op.backoff.next(), TimeUnit.MILLISECONDS);
                    return;
                }
                LOG.error("{} failed for request {} error {}.", new Object[]{BaseCommand.Type.ADD_SUBSCRIPTION_TO_TXN.name(), requestId, error});
            }
            this.onResponse(op);
        });
    }

    public CompletableFuture<Void> endTxnAsync(TxnID txnID, TxnAction action) {
        CompletableFuture<Void> callback;
        if (LOG.isDebugEnabled()) {
            LOG.debug("End txn {}, action {}", (Object)txnID, (Object)action);
        }
        if (!this.canSendRequest(callback = new CompletableFuture<Void>())) {
            return callback;
        }
        long requestId = this.client.newRequestId();
        BaseCommand cmd = Commands.newEndTxn(requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), action);
        ByteBuf buf = Commands.serializeWithSize(cmd);
        String description = String.format("End [%s] TXN %s", String.valueOf((Object)action), String.valueOf(txnID));
        OpForVoidCallBack op = OpForVoidCallBack.create(buf, callback, this.client, description, this.cnx());
        this.internalPinnedExecutor.execute(() -> {
            this.pendingRequests.put(requestId, op);
            this.timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId));
            if (!this.checkStateAndSendRequest(op)) {
                this.pendingRequests.remove(requestId);
            }
        });
        return callback;
    }

    void handleEndTxnResponse(CommandEndTxnResponse response) {
        String message;
        ServerError error;
        boolean hasError = response.hasError();
        if (hasError) {
            error = response.getError();
            message = response.getMessage();
        } else {
            error = null;
            message = null;
        }
        TxnID txnID = new TxnID(response.getTxnidMostBits(), response.getTxnidLeastBits());
        long requestId = response.getRequestId();
        this.internalPinnedExecutor.execute(() -> {
            OpForVoidCallBack op = (OpForVoidCallBack)this.pendingRequests.remove(requestId);
            if (op == null) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got end txn response for transaction but no requests pending for txn {}", (Object)txnID);
                }
                return;
            }
            if (!hasError) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got end txn response success for request {}, txn {}", (Object)requestId, (Object)txnID);
                }
                op.callback.complete(null);
            } else {
                if (this.checkIfNeedRetryByError(error, message, op)) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Get a response for the {} request {} error TransactionCoordinatorNotFound and try it again", (Object)BaseCommand.Type.END_TXN.name(), (Object)requestId);
                    }
                    this.pendingRequests.put(requestId, op);
                    this.timer.newTimeout(timeout -> this.internalPinnedExecutor.execute(() -> {
                        if (!this.pendingRequests.containsKey(requestId)) {
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("The request {} already timeout", (Object)requestId);
                            }
                            return;
                        }
                        if (!this.checkStateAndSendRequest(op)) {
                            this.pendingRequests.remove(requestId);
                        }
                    }), op.backoff.next(), TimeUnit.MILLISECONDS);
                    return;
                }
                LOG.error("Got {} response for request {}, transaction {}, error: {}", new Object[]{BaseCommand.Type.END_TXN.name(), requestId, txnID, error});
            }
            this.onResponse(op);
        });
    }

    private boolean checkIfNeedRetryByError(ServerError error, String message, OpBase<?> op) {
        if (error == ServerError.TransactionCoordinatorNotFound) {
            if (this.getState() != HandlerState.State.Connecting) {
                this.connectionHandler.reconnectLater((Throwable)new TransactionCoordinatorClientException.CoordinatorNotFoundException(message));
            }
            return true;
        }
        if (op != null) {
            op.callback.completeExceptionally((Throwable)TransactionMetaStoreHandler.getExceptionByServerError(error, message));
        }
        return false;
    }

    public static TransactionCoordinatorClientException getExceptionByServerError(ServerError serverError, String msg) {
        switch (serverError) {
            case TransactionCoordinatorNotFound: {
                return new TransactionCoordinatorClientException.CoordinatorNotFoundException(msg);
            }
            case InvalidTxnStatus: {
                return new TransactionCoordinatorClientException.InvalidTxnStatusException(msg);
            }
            case TransactionNotFound: {
                return new TransactionCoordinatorClientException.TransactionNotFoundException(msg);
            }
        }
        return new TransactionCoordinatorClientException(msg);
    }

    private void onResponse(OpBase<?> op) {
        ReferenceCountUtil.safeRelease(op.cmd);
        op.recycle();
        this.semaphore.release();
    }

    private boolean canSendRequest(CompletableFuture<?> callback) {
        try {
            if (this.blockIfReachMaxPendingOps) {
                this.semaphore.acquire();
            } else if (!this.semaphore.tryAcquire()) {
                callback.completeExceptionally((Throwable)new TransactionCoordinatorClientException("Reach max pending ops."));
                return false;
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            callback.completeExceptionally((Throwable)TransactionCoordinatorClientException.unwrap((Throwable)e));
            return false;
        }
        return true;
    }

    private boolean checkStateAndSendRequest(OpBase<?> op) {
        switch (this.getState()) {
            case Ready: {
                ClientCnx cnx = this.cnx();
                if (cnx != null) {
                    op.cmd.retain();
                    cnx.ctx().writeAndFlush(op.cmd, this.cnx().ctx().voidPromise());
                } else {
                    LOG.error("The cnx was null when the TC handler was ready", (Throwable)new NullPointerException());
                }
                return true;
            }
            case Connecting: {
                return true;
            }
            case Closing: 
            case Closed: {
                op.callback.completeExceptionally((Throwable)new TransactionCoordinatorClientException.MetaStoreHandlerNotReadyException("Transaction meta store handler for tcId " + this.transactionCoordinatorId + " is closing or closed."));
                this.onResponse(op);
                return false;
            }
            case Failed: 
            case Uninitialized: {
                op.callback.completeExceptionally((Throwable)new TransactionCoordinatorClientException.MetaStoreHandlerNotReadyException("Transaction meta store handler for tcId " + this.transactionCoordinatorId + " not connected."));
                this.onResponse(op);
                return false;
            }
        }
        op.callback.completeExceptionally((Throwable)new TransactionCoordinatorClientException.MetaStoreHandlerNotReadyException(this.transactionCoordinatorId));
        this.onResponse(op);
        return false;
    }

    @Override
    public void run(Timeout timeout) throws Exception {
        this.internalPinnedExecutor.execute(() -> {
            long diff;
            RequestTime lastPolled;
            if (timeout.isCancelled()) {
                return;
            }
            if (this.getState() == HandlerState.State.Closing || this.getState() == HandlerState.State.Closed) {
                return;
            }
            RequestTime peeked = this.timeoutQueue.peek();
            while (peeked != null && peeked.creationTimeMs + this.client.getConfiguration().getOperationTimeoutMs() - System.currentTimeMillis() <= 0L && (lastPolled = this.timeoutQueue.poll()) != null) {
                OpBase<?> op = this.pendingRequests.remove(lastPolled.requestId);
                if (op != null && !op.callback.isDone()) {
                    op.callback.completeExceptionally((Throwable)new PulsarClientException.TimeoutException(String.format("%s failed due to timeout. connection: %s. pending-queue: %s", op.description, op.clientCnx, this.pendingRequests.size())));
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Transaction coordinator request {} is timeout.", (Object)lastPolled.requestId);
                    }
                    this.onResponse(op);
                }
                peeked = this.timeoutQueue.peek();
            }
            long timeToWaitMs = peeked == null ? this.client.getConfiguration().getOperationTimeoutMs() : ((diff = peeked.creationTimeMs + this.client.getConfiguration().getOperationTimeoutMs() - System.currentTimeMillis()) <= 0L ? this.client.getConfiguration().getOperationTimeoutMs() : diff);
            this.requestTimeout = this.client.timer().newTimeout(this, timeToWaitMs, TimeUnit.MILLISECONDS);
        });
    }

    private ClientCnx cnx() {
        return this.connectionHandler.cnx();
    }

    void connectionClosed(ClientCnx cnx) {
        this.connectionHandler.connectionClosed(cnx);
    }

    @Override
    public void close() throws IOException {
        this.requestTimeout.cancel();
        this.setState(HandlerState.State.Closed);
    }

    @VisibleForTesting
    public HandlerState.State getConnectHandleState() {
        return this.getState();
    }

    @Override
    public String getHandlerName() {
        return "Transaction meta store handler [" + this.transactionCoordinatorId + "]";
    }

    private static class OpForTxnIdCallBack
    extends OpBase<TxnID> {
        private final Recycler.Handle<OpForTxnIdCallBack> recyclerHandle;
        private static final Recycler<OpForTxnIdCallBack> RECYCLER = new Recycler<OpForTxnIdCallBack>(){

            @Override
            protected OpForTxnIdCallBack newObject(Recycler.Handle<OpForTxnIdCallBack> handle) {
                return new OpForTxnIdCallBack(handle);
            }
        };

        static OpForTxnIdCallBack create(ByteBuf cmd, CompletableFuture<TxnID> callback, PulsarClientImpl client, String description, ClientCnx clientCnx) {
            OpForTxnIdCallBack op = RECYCLER.get();
            op.callback = callback;
            op.cmd = cmd;
            op.backoff = new BackoffBuilder().setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(), TimeUnit.NANOSECONDS).setMax(client.getConfiguration().getMaxBackoffIntervalNanos() / 10L, TimeUnit.NANOSECONDS).setMandatoryStop(0L, TimeUnit.MILLISECONDS).create();
            op.description = description;
            op.clientCnx = clientCnx;
            return op;
        }

        private OpForTxnIdCallBack(Recycler.Handle<OpForTxnIdCallBack> recyclerHandle) {
            this.recyclerHandle = recyclerHandle;
        }

        @Override
        void recycle() {
            this.backoff = null;
            this.cmd = null;
            this.callback = null;
            this.description = null;
            this.clientCnx = null;
            this.recyclerHandle.recycle(this);
        }
    }

    private static class OpForVoidCallBack
    extends OpBase<Void> {
        private final Recycler.Handle<OpForVoidCallBack> recyclerHandle;
        private static final Recycler<OpForVoidCallBack> RECYCLER = new Recycler<OpForVoidCallBack>(){

            @Override
            protected OpForVoidCallBack newObject(Recycler.Handle<OpForVoidCallBack> handle) {
                return new OpForVoidCallBack(handle);
            }
        };

        static OpForVoidCallBack create(ByteBuf cmd, CompletableFuture<Void> callback, PulsarClientImpl client, String description, ClientCnx clientCnx) {
            OpForVoidCallBack op = RECYCLER.get();
            op.callback = callback;
            op.cmd = cmd;
            op.backoff = new BackoffBuilder().setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(), TimeUnit.NANOSECONDS).setMax(client.getConfiguration().getMaxBackoffIntervalNanos() / 10L, TimeUnit.NANOSECONDS).setMandatoryStop(0L, TimeUnit.MILLISECONDS).create();
            op.description = description;
            op.clientCnx = clientCnx;
            return op;
        }

        private OpForVoidCallBack(Recycler.Handle<OpForVoidCallBack> recyclerHandle) {
            this.recyclerHandle = recyclerHandle;
        }

        @Override
        void recycle() {
            this.backoff = null;
            this.cmd = null;
            this.callback = null;
            this.description = null;
            this.clientCnx = null;
            this.recyclerHandle.recycle(this);
        }
    }

    private static abstract class OpBase<T> {
        protected ByteBuf cmd;
        protected CompletableFuture<T> callback;
        protected Backoff backoff;
        protected String description;
        protected ClientCnx clientCnx;

        private OpBase() {
        }

        abstract void recycle();
    }

    private static class RequestTime {
        final long creationTimeMs;
        final long requestId;

        public RequestTime(long creationTime, long requestId) {
            this.creationTimeMs = creationTime;
            this.requestId = requestId;
        }
    }
}

