/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AtomicDouble;
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import java.time.Instant;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.EntryAndMetadata;
import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.RedeliveryTracker;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.TransportCnx;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.CommandTopicMigrated;
import org.apache.pulsar.common.api.proto.KeyLongValue;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterPolicies;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.stats.Rate;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
import org.apache.pulsar.transaction.common.exception.TransactionConflictException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Consumer {
    private final Subscription subscription;
    private final CommandSubscribe.SubType subType;
    private final TransportCnx cnx;
    private final String appId;
    private final String topicName;
    private final int partitionIdx;
    private final long consumerId;
    private final int priorityLevel;
    private final boolean readCompacted;
    private final String consumerName;
    private final Rate msgOut;
    private final Rate msgRedeliver;
    private final LongAdder msgOutCounter;
    private final LongAdder msgRedeliverCounter;
    private final LongAdder bytesOutCounter;
    private final LongAdder messageAckCounter;
    private final Rate messageAckRate;
    private volatile long lastConsumedTimestamp;
    private volatile long lastAckedTimestamp;
    private volatile long lastConsumedFlowTimestamp;
    private Rate chunkedMessageRate;
    private static final AtomicIntegerFieldUpdater<Consumer> MESSAGE_PERMITS_UPDATER = AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "messagePermits");
    private volatile int messagePermits = 0;
    private static final AtomicIntegerFieldUpdater<Consumer> PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "permitsReceivedWhileConsumerBlocked");
    private volatile int permitsReceivedWhileConsumerBlocked = 0;
    private final ConcurrentLongLongPairHashMap pendingAcks;
    private final ConsumerStatsImpl stats;
    private final boolean isDurable;
    private final boolean isPersistentTopic;
    private static final AtomicIntegerFieldUpdater<Consumer> UNACKED_MESSAGES_UPDATER = AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "unackedMessages");
    private volatile int unackedMessages = 0;
    private volatile boolean blockedConsumerOnUnackedMsgs = false;
    private final Map<String, String> metadata;
    private final KeySharedMeta keySharedMeta;
    private final AtomicDouble avgMessagesPerEntry = new AtomicDouble(0.0);
    private static final long[] EMPTY_ACK_SET = new long[0];
    private static final double avgPercent = 0.9;
    private boolean preciseDispatcherFlowControl;
    private PositionImpl readPositionWhenJoining;
    private final String clientAddress;
    private final MessageId startMessageId;
    private final boolean isAcknowledgmentAtBatchIndexLevelEnabled;
    private volatile long consumerEpoch;
    private long negativeUnackedMsgsTimestamp;
    private final SchemaType schemaType;
    private final Instant connectedSince = Instant.now();
    private volatile Attributes openTelemetryAttributes;
    private static final AtomicReferenceFieldUpdater<Consumer, Attributes> OPEN_TELEMETRY_ATTRIBUTES_FIELD_UPDATER = AtomicReferenceFieldUpdater.newUpdater(Consumer.class, Attributes.class, "openTelemetryAttributes");
    private static final Logger log = LoggerFactory.getLogger(Consumer.class);

    public Consumer(Subscription subscription, CommandSubscribe.SubType subType, String topicName, long consumerId, int priorityLevel, String consumerName, boolean isDurable, TransportCnx cnx, String appId, Map<String, String> metadata, boolean readCompacted, KeySharedMeta keySharedMeta, MessageId startMessageId, long consumerEpoch) {
        this(subscription, subType, topicName, consumerId, priorityLevel, consumerName, isDurable, cnx, appId, metadata, readCompacted, keySharedMeta, startMessageId, consumerEpoch, null);
    }

    public Consumer(Subscription subscription, CommandSubscribe.SubType subType, String topicName, long consumerId, int priorityLevel, String consumerName, boolean isDurable, TransportCnx cnx, String appId, Map<String, String> metadata, boolean readCompacted, KeySharedMeta keySharedMeta, MessageId startMessageId, long consumerEpoch, SchemaType schemaType) {
        this.subscription = subscription;
        this.subType = subType;
        this.topicName = topicName;
        this.partitionIdx = TopicName.getPartitionIndex((String)topicName);
        this.consumerId = consumerId;
        this.priorityLevel = priorityLevel;
        this.readCompacted = readCompacted;
        this.consumerName = consumerName;
        this.isDurable = isDurable;
        this.isPersistentTopic = subscription.getTopic() instanceof PersistentTopic;
        this.keySharedMeta = keySharedMeta;
        this.cnx = cnx;
        this.msgOut = new Rate();
        this.chunkedMessageRate = new Rate();
        this.msgRedeliver = new Rate();
        this.msgRedeliverCounter = new LongAdder();
        this.bytesOutCounter = new LongAdder();
        this.msgOutCounter = new LongAdder();
        this.messageAckCounter = new LongAdder();
        this.messageAckRate = new Rate();
        this.appId = appId;
        this.startMessageId = readCompacted && startMessageId == null ? MessageId.earliest : startMessageId;
        this.preciseDispatcherFlowControl = cnx.isPreciseDispatcherFlowControl();
        PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.set(this, 0);
        MESSAGE_PERMITS_UPDATER.set(this, 0);
        UNACKED_MESSAGES_UPDATER.set(this, 0);
        this.metadata = metadata != null ? metadata : Collections.emptyMap();
        this.stats = new ConsumerStatsImpl();
        this.stats.setAddress(cnx.clientSourceAddressAndPort());
        this.stats.consumerName = consumerName;
        this.stats.setConnectedSince(DateFormatter.format((Instant)this.connectedSince));
        this.stats.setClientVersion(cnx.getClientVersion());
        this.stats.metadata = this.metadata;
        this.pendingAcks = Subscription.isIndividualAckMode(subType) ? ConcurrentLongLongPairHashMap.newBuilder().autoShrink(subscription.getTopic().getBrokerService().getPulsar().getConfiguration().isAutoShrinkForConsumerPendingAcksMap()).expectedItems(256).concurrencyLevel(1).build() : null;
        this.clientAddress = cnx.clientSourceAddress();
        this.consumerEpoch = consumerEpoch;
        this.isAcknowledgmentAtBatchIndexLevelEnabled = subscription.getTopic().getBrokerService().getPulsar().getConfiguration().isAcknowledgmentAtBatchIndexLevelEnabled();
        this.schemaType = schemaType;
        OPEN_TELEMETRY_ATTRIBUTES_FIELD_UPDATER.set(this, null);
    }

    @VisibleForTesting
    Consumer(String consumerName, int availablePermits) {
        this.subscription = null;
        this.subType = null;
        this.cnx = null;
        this.appId = null;
        this.topicName = null;
        this.partitionIdx = 0;
        this.consumerId = 0L;
        this.priorityLevel = 0;
        this.readCompacted = false;
        this.consumerName = consumerName;
        this.msgOut = null;
        this.msgRedeliver = null;
        this.msgRedeliverCounter = null;
        this.msgOutCounter = null;
        this.bytesOutCounter = null;
        this.messageAckCounter = null;
        this.messageAckRate = null;
        this.pendingAcks = null;
        this.stats = null;
        this.isDurable = false;
        this.isPersistentTopic = false;
        this.metadata = null;
        this.keySharedMeta = null;
        this.clientAddress = null;
        this.startMessageId = null;
        this.isAcknowledgmentAtBatchIndexLevelEnabled = false;
        this.schemaType = null;
        MESSAGE_PERMITS_UPDATER.set(this, availablePermits);
        OPEN_TELEMETRY_ATTRIBUTES_FIELD_UPDATER.set(this, null);
    }

    public CommandSubscribe.SubType subType() {
        return this.subType;
    }

    public long consumerId() {
        return this.consumerId;
    }

    public String consumerName() {
        return this.consumerName;
    }

    void notifyActiveConsumerChange(Consumer activeConsumer) {
        if (log.isDebugEnabled()) {
            log.debug("notify consumer {} - that [{}] for subscription {} has new active consumer : {}", new Object[]{this.consumerId, this.topicName, this.subscription.getName(), activeConsumer});
        }
        this.cnx.getCommandSender().sendActiveConsumerChange(this.consumerId, this == activeConsumer);
    }

    public boolean readCompacted() {
        return this.readCompacted;
    }

    public Future<Void> sendMessages(List<? extends Entry> entries, EntryBatchSizes batchSizes, EntryBatchIndexesAcks batchIndexesAcks, int totalMessages, long totalBytes, long totalChunkedMessages, RedeliveryTracker redeliveryTracker) {
        return this.sendMessages(entries, batchSizes, batchIndexesAcks, totalMessages, totalBytes, totalChunkedMessages, redeliveryTracker, -1L);
    }

    public Future<Void> sendMessages(List<? extends Entry> entries, EntryBatchSizes batchSizes, EntryBatchIndexesAcks batchIndexesAcks, int totalMessages, long totalBytes, long totalChunkedMessages, RedeliveryTracker redeliveryTracker, long epoch) {
        return this.sendMessages(entries, null, batchSizes, batchIndexesAcks, totalMessages, totalBytes, totalChunkedMessages, redeliveryTracker, epoch);
    }

    public Future<Void> sendMessages(List<? extends Entry> entries, List<Integer> stickyKeyHashes, EntryBatchSizes batchSizes, EntryBatchIndexesAcks batchIndexesAcks, int totalMessages, long totalBytes, long totalChunkedMessages, RedeliveryTracker redeliveryTracker, long epoch) {
        this.lastConsumedTimestamp = System.currentTimeMillis();
        if (entries.isEmpty() || totalMessages == 0) {
            if (log.isDebugEnabled()) {
                log.debug("[{}-{}] List of messages is empty, triggering write future immediately for consumerId {}", new Object[]{this.topicName, this.subscription, this.consumerId});
            }
            batchSizes.recyle();
            if (batchIndexesAcks != null) {
                batchIndexesAcks.recycle();
            }
            Promise<Void> writePromise = this.cnx.newPromise();
            writePromise.setSuccess(null);
            return writePromise;
        }
        int unackedMessages = totalMessages;
        int totalEntries = 0;
        for (int i = 0; i < entries.size(); ++i) {
            long[] ackSet;
            Entry entry = entries.get(i);
            if (entry == null) continue;
            ++totalEntries;
            if (this.pendingAcks == null) continue;
            int batchSize = batchSizes.getBatchSize(i);
            int stickyKeyHash = stickyKeyHashes == null ? this.getStickyKeyHash(entry) : stickyKeyHashes.get(i).intValue();
            long[] lArray = ackSet = batchIndexesAcks == null ? null : batchIndexesAcks.getAckSet(i);
            if (ackSet != null) {
                unackedMessages -= batchSize - BitSet.valueOf(ackSet).cardinality();
            }
            this.pendingAcks.put(entry.getLedgerId(), entry.getEntryId(), (long)batchSize, (long)stickyKeyHash);
            if (!log.isDebugEnabled()) continue;
            log.debug("[{}-{}] Added {}:{} ledger entry with batchSize of {} to pendingAcks in broker.service.Consumer for consumerId: {}", new Object[]{this.topicName, this.subscription, entry.getLedgerId(), entry.getEntryId(), batchSize, this.consumerId});
        }
        if (this.avgMessagesPerEntry.get() < 1.0) {
            this.avgMessagesPerEntry.set(1.0 * (double)totalMessages / (double)totalEntries);
        } else {
            this.avgMessagesPerEntry.set(this.avgMessagesPerEntry.get() * 0.9 + 0.09999999999999998 * (double)totalMessages / (double)totalEntries);
        }
        int ackedCount = batchIndexesAcks == null ? 0 : batchIndexesAcks.getTotalAckedIndexCount();
        MESSAGE_PERMITS_UPDATER.addAndGet(this, ackedCount - totalMessages);
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] Added {} minus {} messages to MESSAGE_PERMITS_UPDATER in broker.service.Consumer for consumerId: {}; avgMessagesPerEntry is {}", new Object[]{this.topicName, this.subscription, ackedCount, totalMessages, this.consumerId, this.avgMessagesPerEntry.get()});
        }
        this.incrementUnackedMessages(unackedMessages);
        Future<Void> writeAndFlushPromise = this.cnx.getCommandSender().sendMessagesToConsumer(this.consumerId, this.topicName, this.subscription, this.partitionIdx, entries, batchSizes, batchIndexesAcks, redeliveryTracker, epoch);
        writeAndFlushPromise.addListener(status -> {
            if (status.isSuccess()) {
                this.msgOut.recordMultipleEvents((long)totalMessages, totalBytes);
                this.msgOutCounter.add(totalMessages);
                this.bytesOutCounter.add(totalBytes);
                this.chunkedMessageRate.recordMultipleEvents(totalChunkedMessages, 0L);
            } else if (log.isDebugEnabled()) {
                log.debug("[{}-{}] Sent messages to client fail by IO exception[{}], close the connection immediately. Consumer: {}", new Object[]{this.topicName, this.subscription, status.cause() == null ? "" : status.cause().getMessage(), this.toString()});
            }
        });
        return writeAndFlushPromise;
    }

    private void incrementUnackedMessages(int unackedMessages) {
        if (Subscription.isIndividualAckMode(this.subType) && this.addAndGetUnAckedMsgs(this, unackedMessages) >= this.getMaxUnackedMessages() && this.getMaxUnackedMessages() > 0) {
            this.blockedConsumerOnUnackedMsgs = true;
        }
    }

    public boolean isWritable() {
        return this.cnx.isWritable();
    }

    public void close() throws BrokerServiceException {
        this.close(false);
    }

    public void close(boolean isResetCursor) throws BrokerServiceException {
        this.subscription.removeConsumer(this, isResetCursor);
        this.cnx.removedConsumer(this);
    }

    public void disconnect() {
        this.disconnect(false);
    }

    public void disconnect(boolean isResetCursor) {
        this.disconnect(isResetCursor, Optional.empty());
    }

    public void disconnect(boolean isResetCursor, Optional<BrokerLookupData> assignedBrokerLookupData) {
        log.info("Disconnecting consumer: {}", (Object)this);
        this.cnx.closeConsumer(this, assignedBrokerLookupData);
        try {
            this.close(isResetCursor);
        }
        catch (BrokerServiceException e) {
            log.warn("Consumer {} was already closed: {}", new Object[]{this, e.getMessage(), e});
        }
    }

    public void doUnsubscribe(long requestId, boolean force) {
        ((CompletableFuture)this.subscription.doUnsubscribe(this, force).thenAccept(v -> {
            log.info("Unsubscribed successfully from {}", (Object)this.subscription);
            this.cnx.removedConsumer(this);
            this.cnx.getCommandSender().sendSuccessResponse(requestId);
        })).exceptionally(exception -> {
            log.warn("Unsubscribe failed for {}", (Object)this.subscription, exception);
            this.cnx.getCommandSender().sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode(exception), exception.getCause().getMessage());
            return null;
        });
    }

    public CompletableFuture<Void> messageAcked(CommandAck ack) {
        CompletionStage<Long> future;
        this.lastAckedTimestamp = System.currentTimeMillis();
        Map<String, Long> properties = Collections.emptyMap();
        if (ack.getPropertiesCount() > 0) {
            properties = ack.getPropertiesList().stream().collect(Collectors.toMap(KeyLongValue::getKey, KeyLongValue::getValue));
        }
        if (ack.getAckType() == CommandAck.AckType.Cumulative) {
            PositionImpl position;
            if (ack.getMessageIdsCount() != 1) {
                log.warn("[{}] [{}] Received multi-message ack", (Object)this.subscription, (Object)this.consumerId);
                return CompletableFuture.completedFuture(null);
            }
            if (Subscription.isIndividualAckMode(this.subType)) {
                log.warn("[{}] [{}] Received cumulative ack on shared subscription, ignoring", (Object)this.subscription, (Object)this.consumerId);
                return CompletableFuture.completedFuture(null);
            }
            MessageIdData msgId = ack.getMessageIdAt(0);
            if (msgId.getAckSetsCount() > 0) {
                long[] ackSets = new long[msgId.getAckSetsCount()];
                for (int j = 0; j < msgId.getAckSetsCount(); ++j) {
                    ackSets[j] = msgId.getAckSetAt(j);
                }
                position = PositionImpl.get((long)msgId.getLedgerId(), (long)msgId.getEntryId(), (long[])ackSets);
            } else {
                position = PositionImpl.get((long)msgId.getLedgerId(), (long)msgId.getEntryId());
            }
            if (ack.hasTxnidMostBits() && ack.hasTxnidLeastBits()) {
                positionsAcked = Collections.singletonList(position);
                future = this.transactionCumulativeAcknowledge(ack.getTxnidMostBits(), ack.getTxnidLeastBits(), positionsAcked).thenApply(unused -> 1L);
            } else {
                positionsAcked = Collections.singletonList(position);
                this.subscription.acknowledgeMessage(positionsAcked, CommandAck.AckType.Cumulative, properties);
                future = CompletableFuture.completedFuture(1L);
            }
        } else {
            future = ack.hasTxnidLeastBits() && ack.hasTxnidMostBits() ? this.individualAckWithTransaction(ack) : this.individualAckNormal(ack, properties);
        }
        return future.thenApply(v -> {
            this.messageAckRate.recordEvent(v.longValue());
            this.messageAckCounter.add((long)v);
            return null;
        });
    }

    private CompletableFuture<Long> individualAckNormal(CommandAck ack, Map<String, Long> properties) {
        ArrayList<Pair> positionsAcked = new ArrayList<Pair>();
        long totalAckCount = 0L;
        for (int i = 0; i < ack.getMessageIdsCount(); ++i) {
            long ackedCount;
            PositionImpl position;
            MessageIdData msgId = ack.getMessageIdAt(i);
            Pair<Consumer, Long> ackOwnerConsumerAndBatchSize = this.getAckOwnerConsumerAndBatchSize(msgId.getLedgerId(), msgId.getEntryId());
            Consumer ackOwnerConsumer = (Consumer)ackOwnerConsumerAndBatchSize.getLeft();
            long batchSize = (Long)ackOwnerConsumerAndBatchSize.getRight();
            if (msgId.getAckSetsCount() > 0) {
                long[] ackSets = new long[msgId.getAckSetsCount()];
                for (int j = 0; j < msgId.getAckSetsCount(); ++j) {
                    ackSets[j] = msgId.getAckSetAt(j);
                }
                position = PositionImpl.get((long)msgId.getLedgerId(), (long)msgId.getEntryId(), (long[])ackSets);
                ackedCount = this.getAckedCountForBatchIndexLevelEnabled(position, batchSize, ackSets, ackOwnerConsumer);
                if (this.isTransactionEnabled() && Subscription.isIndividualAckMode(this.subType)) {
                    ((PersistentSubscription)this.subscription).syncBatchPositionBitSetForPendingAck(position);
                }
                this.addAndGetUnAckedMsgs(ackOwnerConsumer, -((int)ackedCount));
            } else {
                position = PositionImpl.get((long)msgId.getLedgerId(), (long)msgId.getEntryId());
                ackedCount = this.getAckedCountForMsgIdNoAckSets(batchSize, position, ackOwnerConsumer);
                if (this.checkCanRemovePendingAcksAndHandle(ackOwnerConsumer, position, msgId)) {
                    this.addAndGetUnAckedMsgs(ackOwnerConsumer, -((int)ackedCount));
                    this.updateBlockedConsumerOnUnackedMsgs(ackOwnerConsumer);
                }
            }
            positionsAcked.add(Pair.of((Object)ackOwnerConsumer, (Object)position));
            this.checkAckValidationError(ack, position);
            totalAckCount += ackedCount;
        }
        this.subscription.acknowledgeMessage(positionsAcked.stream().map(Pair::getRight).collect(Collectors.toList()), CommandAck.AckType.Individual, properties);
        CompletableFuture<Long> completableFuture = new CompletableFuture<Long>();
        completableFuture.complete(totalAckCount);
        if (this.isTransactionEnabled() && Subscription.isIndividualAckMode(this.subType)) {
            completableFuture.whenComplete((v, e) -> positionsAcked.forEach(positionPair -> {
                Consumer ackOwnerConsumer = (Consumer)positionPair.getLeft();
                Position position = (Position)positionPair.getRight();
                if (((PositionImpl)position).getAckSet() != null && ((PersistentSubscription)this.subscription).checkIsCanDeleteConsumerPendingAck((PositionImpl)position)) {
                    this.removePendingAcks(ackOwnerConsumer, (PositionImpl)position);
                }
            }));
        }
        return completableFuture;
    }

    private CompletableFuture<Long> individualAckWithTransaction(CommandAck ack) {
        ArrayList<Pair> positionsAcked = new ArrayList<Pair>();
        if (!this.isTransactionEnabled()) {
            return FutureUtil.failedFuture((Throwable)new BrokerServiceException.NotAllowedException("Server don't support transaction ack!"));
        }
        LongAdder totalAckCount = new LongAdder();
        for (int i = 0; i < ack.getMessageIdsCount(); ++i) {
            long ackedCount;
            long batchSize;
            MessageIdData msgId = ack.getMessageIdAt(i);
            PositionImpl position = PositionImpl.get((long)msgId.getLedgerId(), (long)msgId.getEntryId());
            Consumer ackOwnerConsumer = (Consumer)this.getAckOwnerConsumerAndBatchSize(msgId.getLedgerId(), msgId.getEntryId()).getLeft();
            if (msgId.hasBatchSize()) {
                batchSize = msgId.getBatchSize();
                ackedCount = msgId.getBatchSize();
                positionsAcked.add(Pair.of((Object)ackOwnerConsumer, (Object)new MutablePair((Object)position, (Object)msgId.getBatchSize())));
            } else {
                batchSize = 0L;
                ackedCount = 1L;
                positionsAcked.add(Pair.of((Object)ackOwnerConsumer, (Object)new MutablePair((Object)position, (Object)((int)batchSize))));
            }
            if (msgId.getAckSetsCount() > 0) {
                long[] ackSets = new long[msgId.getAckSetsCount()];
                for (int j = 0; j < msgId.getAckSetsCount(); ++j) {
                    ackSets[j] = msgId.getAckSetAt(j);
                }
                position.setAckSet(ackSets);
                ackedCount = this.getAckedCountForTransactionAck(batchSize, ackSets);
            }
            this.addAndGetUnAckedMsgs(ackOwnerConsumer, -((int)ackedCount));
            this.checkCanRemovePendingAcksAndHandle(ackOwnerConsumer, position, msgId);
            this.checkAckValidationError(ack, position);
            totalAckCount.add(ackedCount);
        }
        CompletableFuture<Void> completableFuture = this.transactionIndividualAcknowledge(ack.getTxnidMostBits(), ack.getTxnidLeastBits(), positionsAcked.stream().map(Pair::getRight).collect(Collectors.toList()));
        if (Subscription.isIndividualAckMode(this.subType)) {
            completableFuture.whenComplete((v, e) -> positionsAcked.forEach(positionPair -> {
                Consumer ackOwnerConsumer = (Consumer)positionPair.getLeft();
                MutablePair positionLongMutablePair = (MutablePair)positionPair.getRight();
                if (((PositionImpl)positionLongMutablePair.getLeft()).getAckSet() != null && ((PersistentSubscription)this.subscription).checkIsCanDeleteConsumerPendingAck((PositionImpl)positionLongMutablePair.left)) {
                    this.removePendingAcks(ackOwnerConsumer, (PositionImpl)positionLongMutablePair.left);
                }
            }));
        }
        return completableFuture.thenApply(__ -> totalAckCount.sum());
    }

    private long getAckedCountForMsgIdNoAckSets(long batchSize, PositionImpl position, Consumer consumer) {
        long[] cursorAckSet;
        if (this.isAcknowledgmentAtBatchIndexLevelEnabled && Subscription.isIndividualAckMode(this.subType) && (cursorAckSet = this.getCursorAckSet(position)) != null) {
            return this.getAckedCountForBatchIndexLevelEnabled(position, batchSize, EMPTY_ACK_SET, consumer);
        }
        return batchSize;
    }

    private long getAckedCountForBatchIndexLevelEnabled(PositionImpl position, long batchSize, long[] ackSets, Consumer consumer) {
        long ackedCount = 0L;
        if (this.isAcknowledgmentAtBatchIndexLevelEnabled && Subscription.isIndividualAckMode(this.subType) && consumer.getPendingAcks().get(position.getLedgerId(), position.getEntryId()) != null) {
            long[] cursorAckSet = this.getCursorAckSet(position);
            if (cursorAckSet != null) {
                BitSetRecyclable cursorBitSet = BitSetRecyclable.create().resetWords(cursorAckSet);
                int lastCardinality = cursorBitSet.cardinality();
                BitSetRecyclable givenBitSet = BitSetRecyclable.create().resetWords(ackSets);
                cursorBitSet.and(givenBitSet);
                givenBitSet.recycle();
                int currentCardinality = cursorBitSet.cardinality();
                ackedCount = lastCardinality - currentCardinality;
                cursorBitSet.recycle();
            } else {
                ackedCount = batchSize - (long)BitSet.valueOf(ackSets).cardinality();
            }
        }
        return ackedCount;
    }

    private long getAckedCountForTransactionAck(long batchSize, long[] ackSets) {
        BitSetRecyclable bitset = BitSetRecyclable.create().resetWords(ackSets);
        long ackedCount = batchSize - (long)bitset.cardinality();
        bitset.recycle();
        return ackedCount;
    }

    private long getUnAckedCountForBatchIndexLevelEnabled(PositionImpl position, long batchSize) {
        long[] cursorAckSet;
        long unAckedCount = batchSize;
        if (this.isAcknowledgmentAtBatchIndexLevelEnabled && (cursorAckSet = this.getCursorAckSet(position)) != null) {
            BitSetRecyclable cursorBitSet = BitSetRecyclable.create().resetWords(cursorAckSet);
            unAckedCount = cursorBitSet.cardinality();
            cursorBitSet.recycle();
        }
        return unAckedCount;
    }

    private void checkAckValidationError(CommandAck ack, PositionImpl position) {
        if (ack.hasValidationError()) {
            log.error("[{}] [{}] Received ack for corrupted message at {} - Reason: {}", new Object[]{this.subscription, this.consumerId, position, ack.getValidationError()});
        }
    }

    private boolean checkCanRemovePendingAcksAndHandle(Consumer ackOwnedConsumer, PositionImpl position, MessageIdData msgId) {
        if (Subscription.isIndividualAckMode(this.subType) && msgId.getAckSetsCount() == 0) {
            return this.removePendingAcks(ackOwnedConsumer, position);
        }
        return false;
    }

    private Pair<Consumer, Long> getAckOwnerConsumerAndBatchSize(long ledgerId, long entryId) {
        if (Subscription.isIndividualAckMode(this.subType)) {
            ConcurrentLongLongPairHashMap.LongPair longPair = this.getPendingAcks().get(ledgerId, entryId);
            if (longPair != null) {
                return Pair.of((Object)this, (Object)longPair.first);
            }
            for (Consumer consumer : this.subscription.getConsumers()) {
                if (consumer == this || (longPair = consumer.getPendingAcks().get(ledgerId, entryId)) == null) continue;
                return Pair.of((Object)consumer, (Object)longPair.first);
            }
        }
        return Pair.of((Object)this, (Object)1L);
    }

    private long[] getCursorAckSet(PositionImpl position) {
        if (!(this.subscription instanceof PersistentSubscription)) {
            return null;
        }
        return ((PersistentSubscription)this.subscription).getCursor().getDeletedBatchIndexesAsLongArray(position);
    }

    private boolean isTransactionEnabled() {
        return this.subscription instanceof PersistentSubscription && ((PersistentTopic)this.subscription.getTopic()).getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled();
    }

    private CompletableFuture<Void> transactionIndividualAcknowledge(long txnidMostBits, long txnidLeastBits, List<MutablePair<PositionImpl, Integer>> positionList) {
        if (this.subscription instanceof PersistentSubscription) {
            TxnID txnID = new TxnID(txnidMostBits, txnidLeastBits);
            return ((PersistentSubscription)this.subscription).transactionIndividualAcknowledge(txnID, positionList);
        }
        String error = "Transaction acknowledge only support the `PersistentSubscription`.";
        log.error(error);
        return FutureUtil.failedFuture((Throwable)new TransactionConflictException(error));
    }

    private CompletableFuture<Void> transactionCumulativeAcknowledge(long txnidMostBits, long txnidLeastBits, List<PositionImpl> positionList) {
        if (!this.isTransactionEnabled()) {
            return FutureUtil.failedFuture((Throwable)new BrokerServiceException.NotAllowedException("Server don't support transaction ack!"));
        }
        if (this.subscription instanceof PersistentSubscription) {
            TxnID txnID = new TxnID(txnidMostBits, txnidLeastBits);
            return ((PersistentSubscription)this.subscription).transactionCumulativeAcknowledge(txnID, positionList);
        }
        String error = "Transaction acknowledge only support the `PersistentSubscription`.";
        log.error(error);
        return FutureUtil.failedFuture((Throwable)new TransactionConflictException(error));
    }

    public void flowPermits(int additionalNumberOfMessages) {
        int oldPermits;
        Preconditions.checkArgument((additionalNumberOfMessages > 0 ? 1 : 0) != 0);
        this.lastConsumedFlowTimestamp = System.currentTimeMillis();
        if (this.shouldBlockConsumerOnUnackMsgs() && this.unackedMessages >= this.getMaxUnackedMessages()) {
            this.blockedConsumerOnUnackedMsgs = true;
        }
        if (!this.blockedConsumerOnUnackedMsgs) {
            oldPermits = MESSAGE_PERMITS_UPDATER.getAndAdd(this, additionalNumberOfMessages);
            if (log.isDebugEnabled()) {
                log.debug("[{}-{}] Added {} message permits in broker.service.Consumer before updating dispatcher for consumer {}", new Object[]{this.topicName, this.subscription, additionalNumberOfMessages, this.consumerId});
            }
            this.subscription.consumerFlow(this, additionalNumberOfMessages);
        } else {
            oldPermits = PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndAdd(this, additionalNumberOfMessages);
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] Added more flow control message permits {} (old was: {}), blocked = {} ", new Object[]{this.topicName, this.subscription, additionalNumberOfMessages, oldPermits, this.blockedConsumerOnUnackedMsgs});
        }
    }

    void flowConsumerBlockedPermits(Consumer consumer) {
        int additionalNumberOfPermits = PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndSet(consumer, 0);
        MESSAGE_PERMITS_UPDATER.getAndAdd(consumer, additionalNumberOfPermits);
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] Added {} blocked permits to broker.service.Consumer for consumer {}", new Object[]{this.topicName, this.subscription, additionalNumberOfPermits, this.consumerId});
        }
        this.subscription.consumerFlow(consumer, additionalNumberOfPermits);
    }

    public int getAvailablePermits() {
        return MESSAGE_PERMITS_UPDATER.get(this);
    }

    public int getAvgMessagesPerEntry() {
        return (int)Math.round(this.avgMessagesPerEntry.get());
    }

    public boolean isBlocked() {
        return this.blockedConsumerOnUnackedMsgs;
    }

    public void reachedEndOfTopic() {
        this.cnx.getCommandSender().sendReachedEndOfTopic(this.consumerId);
    }

    public void topicMigrated(Optional<ClusterPolicies.ClusterUrl> clusterUrl) {
        if (clusterUrl.isPresent()) {
            ClusterPolicies.ClusterUrl url = clusterUrl.get();
            this.cnx.getCommandSender().sendTopicMigrated(CommandTopicMigrated.ResourceType.Consumer, this.consumerId, url.getBrokerServiceUrl(), url.getBrokerServiceUrlTls());
            this.disconnect();
        }
    }

    public boolean checkAndApplyTopicMigration() {
        Optional<ClusterPolicies.ClusterUrl> clusterUrl;
        if (this.subscription.isSubscriptionMigrated() && (clusterUrl = AbstractTopic.getMigratedClusterUrl(this.cnx.getBrokerService().getPulsar(), this.topicName)).isPresent()) {
            ClusterPolicies.ClusterUrl url = clusterUrl.get();
            this.cnx.getCommandSender().sendTopicMigrated(CommandTopicMigrated.ResourceType.Consumer, this.consumerId, url.getBrokerServiceUrl(), url.getBrokerServiceUrlTls());
            this.disconnect();
            return true;
        }
        return false;
    }

    private boolean shouldBlockConsumerOnUnackMsgs() {
        return Subscription.isIndividualAckMode(this.subType) && this.getMaxUnackedMessages() > 0;
    }

    public void updateRates() {
        this.msgOut.calculateRate();
        this.chunkedMessageRate.calculateRate();
        this.msgRedeliver.calculateRate();
        this.messageAckRate.calculateRate();
        this.stats.msgRateOut = this.msgOut.getRate();
        this.stats.msgThroughputOut = this.msgOut.getValueRate();
        this.stats.msgRateRedeliver = this.msgRedeliver.getRate();
        this.stats.messageAckRate = this.messageAckRate.getValueRate();
        this.stats.chunkedMessageRate = this.chunkedMessageRate.getRate();
    }

    public void updateStats(ConsumerStatsImpl consumerStats) {
        this.msgOutCounter.add(consumerStats.msgOutCounter);
        this.bytesOutCounter.add(consumerStats.bytesOutCounter);
        this.msgOut.recordMultipleEvents(consumerStats.msgOutCounter, consumerStats.bytesOutCounter);
        this.lastAckedTimestamp = consumerStats.lastAckedTimestamp;
        this.lastConsumedTimestamp = consumerStats.lastConsumedTimestamp;
        this.lastConsumedFlowTimestamp = consumerStats.lastConsumedFlowTimestamp;
        MESSAGE_PERMITS_UPDATER.set(this, consumerStats.availablePermits);
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] Setting broker.service.Consumer's messagePermits to {} for consumer {}", new Object[]{this.topicName, this.subscription, consumerStats.availablePermits, this.consumerId});
        }
        this.unackedMessages = consumerStats.unackedMessages;
        this.blockedConsumerOnUnackedMsgs = consumerStats.blockedConsumerOnUnackedMsgs;
        this.avgMessagesPerEntry.set((double)consumerStats.avgMessagesPerEntry);
    }

    public ConsumerStatsImpl getStats() {
        this.stats.msgOutCounter = this.msgOutCounter.longValue();
        this.stats.bytesOutCounter = this.bytesOutCounter.longValue();
        this.stats.lastAckedTimestamp = this.lastAckedTimestamp;
        this.stats.lastConsumedTimestamp = this.lastConsumedTimestamp;
        this.stats.lastConsumedFlowTimestamp = this.lastConsumedFlowTimestamp;
        this.stats.availablePermits = this.getAvailablePermits();
        this.stats.unackedMessages = this.unackedMessages;
        this.stats.blockedConsumerOnUnackedMsgs = this.blockedConsumerOnUnackedMsgs;
        this.stats.avgMessagesPerEntry = this.getAvgMessagesPerEntry();
        this.stats.consumerName = this.consumerName;
        if (this.readPositionWhenJoining != null) {
            this.stats.readPositionWhenJoining = this.readPositionWhenJoining.toString();
        }
        return this.stats;
    }

    public long getMsgOutCounter() {
        return this.msgOutCounter.longValue();
    }

    public long getBytesOutCounter() {
        return this.bytesOutCounter.longValue();
    }

    public long getMessageAckCounter() {
        return this.messageAckCounter.sum();
    }

    public long getMessageRedeliverCounter() {
        return this.msgRedeliverCounter.sum();
    }

    public int getUnackedMessages() {
        return this.unackedMessages;
    }

    public KeySharedMeta getKeySharedMeta() {
        return this.keySharedMeta;
    }

    public String toString() {
        if (this.subscription != null && this.cnx != null) {
            return MoreObjects.toStringHelper((Object)this).add("subscription", (Object)this.subscription).add("consumerId", this.consumerId).add("consumerName", (Object)this.consumerName).add("address", (Object)this.cnx.toString()).toString();
        }
        return MoreObjects.toStringHelper((Object)this).add("consumerId", this.consumerId).add("consumerName", (Object)this.consumerName).toString();
    }

    public CompletableFuture<Void> checkPermissionsAsync() {
        TopicName topicName = TopicName.get((String)this.subscription.getTopicName());
        if (this.cnx.getBrokerService().getAuthorizationService() != null) {
            AuthenticationDataSubscription authData = new AuthenticationDataSubscription(this.cnx.getAuthenticationData(), this.subscription.getName());
            return this.cnx.getBrokerService().getAuthorizationService().allowTopicOperationAsync(topicName, TopicOperation.CONSUME, this.appId, (AuthenticationDataSource)authData).handle((ok, e) -> {
                if (e != null) {
                    log.warn("[{}] Get unexpected error while authorizing [{}]  {}", new Object[]{this.appId, this.subscription.getTopicName(), e.getMessage(), e});
                }
                if (ok == null || !ok.booleanValue()) {
                    log.info("[{}] is not allowed to consume from topic [{}] anymore", (Object)this.appId, (Object)this.subscription.getTopicName());
                    this.disconnect();
                }
                return null;
            });
        }
        return CompletableFuture.completedFuture(null);
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj instanceof Consumer) {
            Consumer other = (Consumer)obj;
            return this.consumerId == other.consumerId && Objects.equals(this.cnx.clientAddress(), other.cnx.clientAddress());
        }
        return false;
    }

    public int hashCode() {
        return this.consumerName.hashCode() + 31 * this.cnx.hashCode();
    }

    private boolean removePendingAcks(Consumer ackOwnedConsumer, PositionImpl position) {
        if (!ackOwnedConsumer.getPendingAcks().remove(position.getLedgerId(), position.getEntryId())) {
            return false;
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] consumer {} received ack {}", new Object[]{this.topicName, this.subscription, this.consumerId, position});
        }
        this.updateBlockedConsumerOnUnackedMsgs(ackOwnedConsumer);
        return true;
    }

    public void updateBlockedConsumerOnUnackedMsgs(Consumer ackOwnedConsumer) {
        int unAckedMsgs = UNACKED_MESSAGES_UPDATER.get(ackOwnedConsumer);
        if (unAckedMsgs <= this.getMaxUnackedMessages() / 2 && ackOwnedConsumer.blockedConsumerOnUnackedMsgs && ackOwnedConsumer.shouldBlockConsumerOnUnackMsgs() || !this.shouldBlockConsumerOnUnackMsgs()) {
            ackOwnedConsumer.blockedConsumerOnUnackedMsgs = false;
            this.flowConsumerBlockedPermits(ackOwnedConsumer);
        }
    }

    public ConcurrentLongLongPairHashMap getPendingAcks() {
        return this.pendingAcks;
    }

    public int getPriorityLevel() {
        return this.priorityLevel;
    }

    public void redeliverUnacknowledgedMessages(long consumerEpoch) {
        this.clearUnAckedMsgs();
        this.blockedConsumerOnUnackedMsgs = false;
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] consumer {} received redelivery", new Object[]{this.topicName, this.subscription, this.consumerId});
        }
        if (this.pendingAcks != null) {
            ArrayList<PositionImpl> pendingPositions = new ArrayList<PositionImpl>((int)this.pendingAcks.size());
            MutableInt totalRedeliveryMessages = new MutableInt(0);
            this.pendingAcks.forEach((ledgerId, entryId, batchSize, stickyKeyHash) -> {
                int unAckedCount = (int)this.getUnAckedCountForBatchIndexLevelEnabled(PositionImpl.get((long)ledgerId, (long)entryId), batchSize);
                totalRedeliveryMessages.add(unAckedCount);
                pendingPositions.add(new PositionImpl(ledgerId, entryId));
            });
            for (PositionImpl p : pendingPositions) {
                this.pendingAcks.remove(p.getLedgerId(), p.getEntryId());
            }
            this.msgRedeliver.recordMultipleEvents((long)totalRedeliveryMessages.intValue(), (long)totalRedeliveryMessages.intValue());
            this.msgRedeliverCounter.add(totalRedeliveryMessages.intValue());
            this.subscription.redeliverUnacknowledgedMessages(this, pendingPositions);
        } else {
            this.subscription.redeliverUnacknowledgedMessages(this, consumerEpoch);
        }
        this.flowConsumerBlockedPermits(this);
    }

    public void redeliverUnacknowledgedMessages(List<MessageIdData> messageIds) {
        int totalRedeliveryMessages = 0;
        ArrayList<PositionImpl> pendingPositions = new ArrayList<PositionImpl>();
        for (MessageIdData msg : messageIds) {
            PositionImpl position = PositionImpl.get((long)msg.getLedgerId(), (long)msg.getEntryId());
            ConcurrentLongLongPairHashMap.LongPair longPair = this.pendingAcks.get(position.getLedgerId(), position.getEntryId());
            if (longPair == null) continue;
            int unAckedCount = (int)this.getUnAckedCountForBatchIndexLevelEnabled(position, longPair.first);
            this.pendingAcks.remove(position.getLedgerId(), position.getEntryId());
            totalRedeliveryMessages += unAckedCount;
            pendingPositions.add(position);
        }
        this.addAndGetUnAckedMsgs(this, -totalRedeliveryMessages);
        this.blockedConsumerOnUnackedMsgs = false;
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] consumer {} received {} msg-redelivery {}", new Object[]{this.topicName, this.subscription, this.consumerId, totalRedeliveryMessages, pendingPositions.size()});
        }
        this.subscription.redeliverUnacknowledgedMessages(this, pendingPositions);
        this.msgRedeliver.recordMultipleEvents((long)totalRedeliveryMessages, (long)totalRedeliveryMessages);
        this.msgRedeliverCounter.add(totalRedeliveryMessages);
        int numberOfBlockedPermits = PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndSet(this, 0);
        if (numberOfBlockedPermits > 0) {
            MESSAGE_PERMITS_UPDATER.getAndAdd(this, numberOfBlockedPermits);
            if (log.isDebugEnabled()) {
                log.debug("[{}-{}] Added {} blockedPermits to broker.service.Consumer's messagePermits for consumer {}", new Object[]{this.topicName, this.subscription, numberOfBlockedPermits, this.consumerId});
            }
            this.subscription.consumerFlow(this, numberOfBlockedPermits);
        }
    }

    public Subscription getSubscription() {
        return this.subscription;
    }

    private int addAndGetUnAckedMsgs(Consumer consumer, int ackedMessages) {
        int unackedMsgs = 0;
        if (this.isPersistentTopic && Subscription.isIndividualAckMode(this.subType)) {
            this.subscription.addUnAckedMessages(ackedMessages);
            unackedMsgs = UNACKED_MESSAGES_UPDATER.addAndGet(consumer, ackedMessages);
        }
        if (unackedMsgs < 0 && System.currentTimeMillis() - this.negativeUnackedMsgsTimestamp >= 10000L) {
            this.negativeUnackedMsgsTimestamp = System.currentTimeMillis();
            log.warn("unackedMsgs is : {}, ackedMessages : {}, consumer : {}", new Object[]{unackedMsgs, ackedMessages, consumer});
        }
        return unackedMsgs;
    }

    private void clearUnAckedMsgs() {
        int unaAckedMsgs = UNACKED_MESSAGES_UPDATER.getAndSet(this, 0);
        this.subscription.addUnAckedMessages(-unaAckedMsgs);
    }

    public boolean isPreciseDispatcherFlowControl() {
        return this.preciseDispatcherFlowControl;
    }

    public void setReadPositionWhenJoining(PositionImpl readPositionWhenJoining) {
        this.readPositionWhenJoining = readPositionWhenJoining;
    }

    public int getMaxUnackedMessages() {
        if (this.isDurable && this.subscription != null) {
            return (Integer)this.subscription.getTopic().getHierarchyTopicPolicies().getMaxUnackedMessagesOnConsumer().get();
        }
        return 0;
    }

    public TransportCnx cnx() {
        return this.cnx;
    }

    public String getClientAddress() {
        return this.clientAddress;
    }

    public String getClientAddressAndPort() {
        return this.cnx.clientSourceAddressAndPort();
    }

    public String getClientVersion() {
        return this.cnx.getClientVersion();
    }

    public MessageId getStartMessageId() {
        return this.startMessageId;
    }

    public Map<String, String> getMetadata() {
        return this.metadata;
    }

    private int getStickyKeyHash(Entry entry) {
        byte[] stickyKey = entry instanceof EntryAndMetadata ? ((EntryAndMetadata)entry).getStickyKey() : Commands.peekStickyKey((ByteBuf)entry.getDataBuffer(), (String)this.topicName, (String)this.subscription.getName());
        return StickyKeyConsumerSelector.makeStickyKeyHash(stickyKey);
    }

    public Attributes getOpenTelemetryAttributes() {
        if (this.openTelemetryAttributes != null) {
            return this.openTelemetryAttributes;
        }
        return OPEN_TELEMETRY_ATTRIBUTES_FIELD_UPDATER.updateAndGet(this, oldValue -> {
            if (oldValue != null) {
                return oldValue;
            }
            TopicName topicName = TopicName.get((String)this.subscription.getTopic().getName());
            AttributesBuilder builder = Attributes.builder().put(OpenTelemetryAttributes.PULSAR_CONSUMER_NAME, (Object)this.consumerName).put(OpenTelemetryAttributes.PULSAR_CONSUMER_ID, (Object)this.consumerId).put(OpenTelemetryAttributes.PULSAR_SUBSCRIPTION_NAME, (Object)this.subscription.getName()).put(OpenTelemetryAttributes.PULSAR_SUBSCRIPTION_TYPE, (Object)this.subType.toString()).put(OpenTelemetryAttributes.PULSAR_DOMAIN, (Object)topicName.getDomain().toString()).put(OpenTelemetryAttributes.PULSAR_TENANT, (Object)topicName.getTenant()).put(OpenTelemetryAttributes.PULSAR_NAMESPACE, (Object)topicName.getNamespace()).put(OpenTelemetryAttributes.PULSAR_TOPIC, (Object)topicName.getPartitionedTopicName());
            if (topicName.isPartitioned()) {
                builder.put(OpenTelemetryAttributes.PULSAR_PARTITION_INDEX, topicName.getPartitionIndex());
            }
            return builder.build();
        });
    }

    @Generated
    public long getConsumerEpoch() {
        return this.consumerEpoch;
    }

    @Generated
    public void setConsumerEpoch(long consumerEpoch) {
        this.consumerEpoch = consumerEpoch;
    }

    @Generated
    public SchemaType getSchemaType() {
        return this.schemaType;
    }

    @Generated
    public Instant getConnectedSince() {
        return this.connectedSince;
    }
}

