/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import lombok.Generated;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.ProducerInterceptors;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.api.proto.KeyValue;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.functions.runtime.shaded.io.netty.util.ReferenceCountUtil;
import org.apache.pulsar.functions.runtime.shaded.org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GeoReplicationProducerImpl
extends ProducerImpl {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(GeoReplicationProducerImpl.class);
    public static final String MSG_PROP_REPL_SOURCE_POSITION = "__MSG_PROP_REPL_SOURCE_POSITION";
    public static final String MSG_PROP_IS_REPL_MARKER = "__MSG_PROP_IS_REPL_MARKER";
    private long lastPersistedSourceLedgerId;
    private long lastPersistedSourceEntryId;
    private final boolean isPersistentTopic;

    public GeoReplicationProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf, CompletableFuture producerCreatedFuture, int partitionIndex, Schema schema, ProducerInterceptors interceptors, Optional overrideProducerName) {
        super(client, topic, conf, producerCreatedFuture, partitionIndex, schema, interceptors, overrideProducerName);
        this.isPersistentTopic = TopicName.get(topic).isPersistent();
    }

    private boolean isBrokerSupportsReplDedupByLidAndEid(ClientCnx cnx) {
        return cnx.isBrokerSupportsReplDedupByLidAndEid() && this.isPersistentTopic;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void ackReceived(ClientCnx cnx, long seq, long highSeq, long ledgerId, long entryId) {
        if (!this.isBrokerSupportsReplDedupByLidAndEid(cnx)) {
            super.ackReceived(cnx, seq, highSeq, ledgerId, entryId);
            return;
        }
        GeoReplicationProducerImpl geoReplicationProducerImpl = this;
        synchronized (geoReplicationProducerImpl) {
            ProducerImpl.OpSendMsg op = this.pendingMessages.peek();
            if (op == null) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] [{}] Got ack for timed out msg {}:{}", new Object[]{this.topic, this.producerName, seq, highSeq});
                }
                return;
            }
            if (this.isReplicationMarker(highSeq)) {
                this.ackReceivedReplMarker(cnx, op, seq, highSeq, ledgerId, entryId);
                return;
            }
            this.ackReceivedReplicatedMsg(cnx, op, seq, highSeq, ledgerId, entryId);
        }
    }

    private void ackReceivedReplicatedMsg(ClientCnx cnx, ProducerImpl.OpSendMsg op, long sourceLId, long sourceEId, long targetLId, long targetEid) {
        Long pendingLId = null;
        Long pendingEId = null;
        List<KeyValue> kvPairList = op.msg.getMessageBuilder().getPropertiesList();
        for (KeyValue kvPair : kvPairList) {
            String[] ledgerIdAndEntryId;
            if (!kvPair.getKey().equals(MSG_PROP_REPL_SOURCE_POSITION)) continue;
            if (!kvPair.getValue().contains(":") || (ledgerIdAndEntryId = kvPair.getValue().split(":")).length != 2 || !StringUtils.isNumeric(ledgerIdAndEntryId[0]) || !StringUtils.isNumeric(ledgerIdAndEntryId[1])) break;
            pendingLId = Long.valueOf(ledgerIdAndEntryId[0]);
            pendingEId = Long.valueOf(ledgerIdAndEntryId[1]);
            break;
        }
        if (pendingLId != null && pendingEId != null && (pendingLId < this.lastPersistedSourceLedgerId || pendingLId == this.lastPersistedSourceLedgerId && pendingEId <= this.lastPersistedSourceEntryId)) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Received an msg send receipt[pending send is repeated due to repl cursor rewind]: source entry {}:{}, pending send: {}:{}, latest persisted: {}:{}", new Object[]{this.topic, this.producerName, sourceLId, sourceEId, pendingLId, pendingEId, this.lastPersistedSourceLedgerId, this.lastPersistedSourceEntryId});
            }
            this.removeAndApplyCallback(op, sourceLId, sourceEId, targetLId, targetEid, false);
            this.ackReceived(cnx, sourceLId, sourceEId, targetLId, targetEid);
            return;
        }
        if (sourceLId < this.lastPersistedSourceLedgerId || sourceLId == this.lastPersistedSourceLedgerId && sourceEId <= this.lastPersistedSourceEntryId) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Received an msg send receipt[repeated]: source entry {}:{}, latest persisted: {}:{}", new Object[]{this.topic, this.producerName, sourceLId, sourceEId, this.lastPersistedSourceLedgerId, this.lastPersistedSourceEntryId});
            }
            return;
        }
        if (pendingLId != null && pendingEId != null && sourceLId == pendingLId && sourceEId == pendingEId) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Received an msg send receipt[expected]: source entry {}:{}, target entry: {}:{}", new Object[]{this.topic, this.producerName, sourceLId, sourceEId, targetLId, targetEid});
            }
            this.lastPersistedSourceLedgerId = sourceLId;
            this.lastPersistedSourceEntryId = sourceEId;
            this.removeAndApplyCallback(op, sourceLId, sourceEId, targetLId, targetEid, false);
            return;
        }
        log.error("[{}] [{}] Received an msg send receipt[error]: source entry {}:{}, target entry: {}:{}, pending send: {}:{}, latest persisted: {}:{}, queue-size: {}", new Object[]{this.topic, this.producerName, sourceLId, sourceEId, targetLId, targetEid, pendingLId, pendingEId, this.lastPersistedSourceLedgerId, this.lastPersistedSourceEntryId, this.pendingMessages.messagesCount()});
        cnx.channel().close();
    }

    protected void ackReceivedReplMarker(ClientCnx cnx, ProducerImpl.OpSendMsg op, long seq, long isSourceMarker, long ledgerId, long entryId) {
        long lastSeqPersisted = LAST_SEQ_ID_PUBLISHED_UPDATER.get(this);
        if (lastSeqPersisted != 0L && seq <= lastSeqPersisted) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Received an repl marker send receipt[repeated]. seq: {}, seqPersisted: {}, isSourceMarker: {}, target entry: {}:{}", new Object[]{this.topic, this.producerName, seq, lastSeqPersisted, isSourceMarker, ledgerId, entryId});
            }
            return;
        }
        boolean pendingMsgIsReplMarker = this.isReplicationMarker(op);
        if (pendingMsgIsReplMarker && seq == op.sequenceId) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Received an repl marker send receipt[expected]. seq: {}, seqPersisted: {}, isReplMarker: {}, target entry: {}:{}", new Object[]{this.topic, this.producerName, seq, lastSeqPersisted, isSourceMarker, ledgerId, entryId});
            }
            long calculatedSeq = this.getHighestSequenceId(op);
            LAST_SEQ_ID_PUBLISHED_UPDATER.getAndUpdate(this, last -> Math.max(last, calculatedSeq));
            this.removeAndApplyCallback(op, seq, isSourceMarker, ledgerId, entryId, true);
            return;
        }
        long lastInProgressSend = LAST_SEQ_ID_PUSHED_UPDATER.get(this);
        String logText = String.format("[%s] [%s] Received an repl marker send receipt[error]. seq: %s, seqPending: %s. sequenceIdPersisted: %s, lastInProgressSend: %s, isSourceMarker: %s, target entry: %s:%s, queue-size: %s", this.topic, this.producerName, seq, pendingMsgIsReplMarker ? Long.valueOf(op.sequenceId) : "unknown", lastSeqPersisted, lastInProgressSend, isSourceMarker, ledgerId, entryId, this.pendingMessages.messagesCount());
        if (seq < lastInProgressSend) {
            log.warn(logText);
        } else {
            log.error(logText);
        }
        cnx.channel().close();
    }

    private void removeAndApplyCallback(ProducerImpl.OpSendMsg op, long lIdSent, long eIdSent, long ledgerId, long entryId, boolean isMarker) {
        this.pendingMessages.remove();
        this.releaseSemaphoreForSendOp(op);
        op.setMessageId(ledgerId, entryId, this.partitionIndex);
        try {
            op.sendComplete(null);
        }
        catch (Throwable t) {
            log.warn("[{}] [{}] Got exception while completing the callback for -- source-message: {}:{} -- target-msg: {}:{} -- isMarker: {}", new Object[]{this.topic, this.producerName, lIdSent, eIdSent, ledgerId, entryId, isMarker, t});
        }
        ReferenceCountUtil.safeRelease(op.cmd);
        op.recycle();
    }

    private boolean isReplicationMarker(ProducerImpl.OpSendMsg op) {
        return op.msg != null && op.msg.getMessageBuilder().hasMarkerType() && Markers.isReplicationMarker(op.msg.getMessageBuilder().getMarkerType());
    }

    private boolean isReplicationMarker(long highestSeq) {
        return Long.MIN_VALUE == highestSeq;
    }

    @Override
    protected void updateLastSeqPushed(ProducerImpl.OpSendMsg op) {
        if (this.isReplicationMarker(op)) {
            super.updateLastSeqPushed(op);
        }
    }
}

