/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.internal;

import io.confluent.csid.utils.StringUtils;
import io.confluent.csid.utils.TimeUtils;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.internal.AbstractOffsetCommitter;
import io.confluent.parallelconsumer.internal.ConsumerManager;
import io.confluent.parallelconsumer.internal.InternalRuntimeError;
import io.confluent.parallelconsumer.internal.OffsetCommitter;
import io.confluent.parallelconsumer.state.WorkManager;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.ConcurrentModificationException;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.TransactionManager;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProducerManager<K, V>
extends AbstractOffsetCommitter<K, V>
implements OffsetCommitter {
    private static final Logger log = LoggerFactory.getLogger(ProducerManager.class);
    protected final Producer<K, V> producer;
    private final ParallelConsumerOptions options;
    private final boolean producerIsConfiguredForTransactions;
    private ReentrantReadWriteLock producerTransactionLock;
    private Field txManagerField;
    private Method txManagerMethodIsCompleting;
    private Method txManagerMethodIsReady;
    private final long sendTimeoutSeconds = 5L;

    public ProducerManager(Producer<K, V> newProducer, ConsumerManager<K, V> newConsumer, WorkManager<K, V> wm, ParallelConsumerOptions options) {
        super(newConsumer, wm);
        this.producer = newProducer;
        this.options = options;
        this.producerIsConfiguredForTransactions = this.setupReflection();
        this.initProducer();
    }

    private void initProducer() {
        this.producerTransactionLock = new ReentrantReadWriteLock(true);
        if (this.options.isUsingTransactionalProducer()) {
            if (!this.producerIsConfiguredForTransactions) {
                throw new IllegalArgumentException("Using transactional option, yet Producer doesn't have a transaction ID - Producer needs a transaction id");
            }
            try {
                log.debug("Initialising producer transaction session...");
                this.producer.initTransactions();
                this.producer.beginTransaction();
            }
            catch (KafkaException e) {
                log.error("Make sure your producer is setup for transactions - specifically make sure it's {} is set.", (Object)"transactional.id", (Object)e);
                throw e;
            }
        } else if (this.producerIsConfiguredForTransactions) {
            throw new IllegalArgumentException("Using non-transactional producer option, but Producer has a transaction ID - the Producer must not have a transaction ID for this option. This is because having such an ID forces the Producer into transactional mode - i.e. you cannot use it without using transactions.");
        }
    }

    private boolean getProducerIsTransactional() {
        if (this.producer instanceof MockProducer) {
            return this.options.isUsingTransactionalProducer();
        }
        TransactionManager transactionManager = this.getTransactionManager();
        if (transactionManager == null) {
            return false;
        }
        return transactionManager.isTransactional();
    }

    private TransactionManager getTransactionManager() {
        if (this.txManagerField == null) {
            return null;
        }
        TransactionManager transactionManager = (TransactionManager)this.txManagerField.get(this.producer);
        return transactionManager;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public RecordMetadata produceMessage(ProducerRecord<K, V> outMsg) {
        Future send;
        Callback callback = (metadata, exception) -> {
            if (exception != null) {
                log.error("Error producing result message", (Throwable)exception);
                throw new RuntimeException("Error producing result message", exception);
            }
        };
        ReentrantReadWriteLock.ReadLock readLock = this.producerTransactionLock.readLock();
        readLock.lock();
        try {
            send = this.producer.send(outMsg, callback);
        }
        finally {
            readLock.unlock();
        }
        try {
            log.trace("Blocking on produce result");
            RecordMetadata recordMetadata = TimeUtils.time(() -> (RecordMetadata)send.get(5L, TimeUnit.SECONDS));
            log.trace("Produce result received");
            return recordMetadata;
        }
        catch (Exception e) {
            throw new InternalRuntimeError(e);
        }
    }

    @Override
    protected void preAcquireWork() {
        this.acquireCommitLock();
    }

    @Override
    protected void postCommit() {
        if (this.producerTransactionLock.getWriteHoldCount() > 1) {
            throw new ConcurrentModificationException("Lock held too many times, won't be released problem and will cause deadlock");
        }
        this.releaseCommitLock();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void commitOffsets(Map<TopicPartition, OffsetAndMetadata> offsetsToSend, ConsumerGroupMetadata groupMetadata) {
        log.debug("Transactional offset commit starting");
        if (!this.options.isUsingTransactionalProducer()) {
            throw new IllegalStateException("Bug: cannot use if not using transactional producer");
        }
        this.producer.sendOffsetsToTransaction(offsetsToSend, groupMetadata);
        boolean committed = false;
        int retryCount = 0;
        int arbitrarilyChosenLimitForArbitraryErrorSituation = 200;
        Exception lastErrorSavedForRethrow = null;
        while (!committed) {
            if (retryCount > arbitrarilyChosenLimitForArbitraryErrorSituation) {
                String msg = StringUtils.msg("Retired too many times ({} > limit of {}), giving up. See error above.", retryCount, arbitrarilyChosenLimitForArbitraryErrorSituation);
                log.error(msg, (Throwable)lastErrorSavedForRethrow);
                throw new RuntimeException(msg, lastErrorSavedForRethrow);
            }
            try {
                if (this.producer instanceof MockProducer) {
                    Producer<K, V> msg = this.producer;
                    synchronized (msg) {
                        this.producer.commitTransaction();
                        this.producer.beginTransaction();
                    }
                } else {
                    boolean retrying;
                    this.ensureLockHeld();
                    boolean bl = retrying = retryCount > 0;
                    if (retrying) {
                        boolean ready;
                        if (this.isTransactionCompleting()) {
                            this.producer.commitTransaction();
                        }
                        if (this.isTransactionReady()) {
                            this.producer.beginTransaction();
                        }
                        boolean bl2 = ready = lastErrorSavedForRethrow == null || !lastErrorSavedForRethrow.getMessage().contains("Invalid transition attempted from state READY to state COMMITTING_TRANSACTION");
                        if (ready) {
                            log.error("Transaction was already in READY state - tx completed between interrupt and retry");
                        }
                    } else {
                        this.producer.commitTransaction();
                        this.producer.beginTransaction();
                    }
                }
                committed = true;
                if (retryCount <= 0) continue;
                log.warn("Commit success, but took {} tries.", (Object)retryCount);
            }
            catch (Exception e) {
                log.warn("Commit exception, will retry, have tried {} times (see KafkaProducer#commit)", (Object)retryCount, (Object)e);
                lastErrorSavedForRethrow = e;
                ++retryCount;
            }
        }
    }

    private boolean setupReflection() {
        if (this.producer instanceof KafkaProducer) {
            this.txManagerField = this.producer.getClass().getDeclaredField("transactionManager");
            this.txManagerField.setAccessible(true);
            boolean producerIsConfiguredForTransactions = this.getProducerIsTransactional();
            if (producerIsConfiguredForTransactions) {
                TransactionManager transactionManager = this.getTransactionManager();
                this.txManagerMethodIsCompleting = transactionManager.getClass().getDeclaredMethod("isCompleting", new Class[0]);
                this.txManagerMethodIsCompleting.setAccessible(true);
                this.txManagerMethodIsReady = transactionManager.getClass().getDeclaredMethod("isReady", new Class[0]);
                this.txManagerMethodIsReady.setAccessible(true);
            }
            return producerIsConfiguredForTransactions;
        }
        if (this.producer instanceof MockProducer) {
            return this.options.isUsingTransactionalProducer();
        }
        return false;
    }

    private boolean isTransactionCompleting() {
        if (this.producer instanceof MockProducer) {
            return false;
        }
        return (Boolean)this.txManagerMethodIsCompleting.invoke((Object)this.getTransactionManager(), new Object[0]);
    }

    private boolean isTransactionReady() {
        if (this.producer instanceof MockProducer) {
            return true;
        }
        return (Boolean)this.txManagerMethodIsReady.invoke((Object)this.getTransactionManager(), new Object[0]);
    }

    public void close(Duration timeout) {
        log.debug("Closing producer, assuming no more in flight...");
        if (this.options.isUsingTransactionalProducer() && !this.isTransactionReady()) {
            this.acquireCommitLock();
            try {
                this.producer.abortTransaction();
            }
            finally {
                this.releaseCommitLock();
            }
        }
        this.producer.close(timeout);
    }

    private void acquireCommitLock() {
        if (this.producerTransactionLock.getWriteHoldCount() > 0) {
            throw new ConcurrentModificationException("Lock already held");
        }
        ReentrantReadWriteLock.WriteLock writeLock = this.producerTransactionLock.writeLock();
        if (this.producerTransactionLock.isWriteLocked() && !this.producerTransactionLock.isWriteLockedByCurrentThread()) {
            throw new ConcurrentModificationException(this.getClass().getSimpleName() + " is not safe for multi-threaded access");
        }
        writeLock.lock();
    }

    private void releaseCommitLock() {
        log.trace("Release commit lock");
        ReentrantReadWriteLock.WriteLock writeLock = this.producerTransactionLock.writeLock();
        if (!this.producerTransactionLock.isWriteLockedByCurrentThread()) {
            throw new IllegalStateException("Not held be me");
        }
        writeLock.unlock();
    }

    private void ensureLockHeld() {
        if (!this.producerTransactionLock.isWriteLockedByCurrentThread()) {
            throw new IllegalStateException("Expected commit lock to be held");
        }
    }

    public boolean isTransactionInProgress() {
        return this.producerTransactionLock.isWriteLocked();
    }
}

