/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer;

import io.confluent.parallelconsumer.AbstractOffsetCommitter;
import io.confluent.parallelconsumer.ConsumerManager;
import io.confluent.parallelconsumer.OffsetCommitter;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.WorkManager;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerOffsetCommitter<K, V>
extends AbstractOffsetCommitter<K, V>
implements OffsetCommitter {
    private static final Logger log = LoggerFactory.getLogger(ConsumerOffsetCommitter.class);
    private final ParallelConsumerOptions.CommitMode commitMode;
    private final ReentrantLock commitLock = new ReentrantLock(true);
    private Condition commitPerformed = this.commitLock.newCondition();
    private final AtomicLong commitCount = new AtomicLong(0L);
    private final AtomicBoolean commitRequested = new AtomicBoolean(false);
    private Optional<Thread> owningThread = Optional.empty();
    private boolean direct = false;

    public ConsumerOffsetCommitter(ConsumerManager<K, V> newConsumer, WorkManager<K, V> newWorkManager, ParallelConsumerOptions options) {
        super(newConsumer, newWorkManager);
        this.commitMode = options.getCommitMode();
        if (this.commitMode.equals((Object)ParallelConsumerOptions.CommitMode.TRANSACTIONAL_PRODUCER)) {
            throw new IllegalArgumentException("Cannot use " + (Object)((Object)this.commitMode) + " when using " + this.getClass().getSimpleName());
        }
    }

    void commit() {
        if (this.isOwner()) {
            this.direct = true;
            this.retrieveOffsetsAndCommit();
        } else if (this.isSync()) {
            log.debug("Sync commit");
            this.commitAndWaitForCondition();
            log.debug("Finished waiting");
        } else {
            log.debug("Async commit to be requested");
            this.requestCommitInternal();
        }
    }

    @Override
    protected void commitOffsets(Map<TopicPartition, OffsetAndMetadata> offsetsToSend, ConsumerGroupMetadata groupMetadata) {
        if (offsetsToSend.isEmpty()) {
            log.trace("Nothing to commit");
            return;
        }
        switch (this.commitMode) {
            case CONSUMER_SYNC: {
                log.debug("Committing offsets Sync");
                this.consumerMgr.commitSync(offsetsToSend);
                break;
            }
            case CONSUMER_ASYNCHRONOUS: {
                log.debug("Committing offsets Async");
                this.consumerMgr.commitAsync(offsetsToSend, (offsets, exception) -> {
                    if (exception != null) {
                        log.error("Error committing offsets", (Throwable)exception);
                    }
                });
                break;
            }
            default: {
                throw new IllegalArgumentException("Cannot use " + (Object)((Object)this.commitMode) + " when using " + this.getClass().getSimpleName());
            }
        }
    }

    @Override
    protected void postCommit() {
        if (!this.direct && this.commitMode.equals((Object)ParallelConsumerOptions.CommitMode.CONSUMER_SYNC)) {
            this.signalCommitPerformed();
        }
    }

    private boolean isOwner() {
        return Thread.currentThread().equals(this.owningThread.orElse(null));
    }

    private void signalCommitPerformed() {
        log.debug("Starting Signaling commit finished");
        if (!this.commitLock.isHeldByCurrentThread()) {
            throw new IllegalStateException("Lock already held");
        }
        this.commitLock.lock();
        try {
            this.commitCount.incrementAndGet();
            log.debug("Signaling commit finished");
            this.commitPerformed.signalAll();
            log.debug("Finished Signaling commit finished");
        }
        finally {
            this.commitLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void commitAndWaitForCondition() {
        this.commitLock.lock();
        try {
            this.commitPerformed = this.commitLock.newCondition();
            long currentCount = this.commitCount.get();
            this.requestCommitInternal();
            this.consumerMgr.wakeup();
            while (currentCount == this.commitCount.get()) {
                if (currentCount == this.commitCount.get()) {
                    log.debug("Requesting commit again");
                    this.requestCommitInternal();
                } else {
                    this.commitRequested.set(false);
                }
                try {
                    log.debug("Waiting on commit");
                    this.commitPerformed.await();
                }
                catch (InterruptedException e) {
                    log.debug("Interrupted waiting for commit condition", (Throwable)e);
                }
            }
            log.debug("Signaled");
        }
        finally {
            this.commitLock.unlock();
        }
    }

    private void requestCommitInternal() {
        this.commitLock.lock();
        try {
            this.commitRequested.set(true);
            this.consumerMgr.wakeup();
        }
        finally {
            this.commitLock.unlock();
        }
    }

    void maybeDoCommit() {
        this.commitLock.lock();
        try {
            if (this.commitRequested.get()) {
                this.retrieveOffsetsAndCommit();
            }
        }
        finally {
            this.commitLock.unlock();
        }
    }

    public boolean isSync() {
        return this.commitMode.equals((Object)ParallelConsumerOptions.CommitMode.CONSUMER_SYNC);
    }

    public void claim() {
        this.owningThread = Optional.of(Thread.currentThread());
    }
}

