/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.internal;

import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.internal.AbstractOffsetCommitter;
import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.internal.ConsumerManager;
import io.confluent.parallelconsumer.internal.InternalRuntimeException;
import io.confluent.parallelconsumer.internal.OffsetCommitter;
import io.confluent.parallelconsumer.state.WorkManager;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerOffsetCommitter<K, V>
extends AbstractOffsetCommitter<K, V>
implements OffsetCommitter {
    private static final Logger log = LoggerFactory.getLogger(ConsumerOffsetCommitter.class);
    private static final int ARBITRARY_RETRY_LIMIT = 50;
    private final ParallelConsumerOptions.CommitMode commitMode;
    private final Duration commitTimeout;
    private Optional<Thread> owningThread = Optional.empty();
    private final Queue<CommitRequest> commitRequestQueue = new ConcurrentLinkedQueue<CommitRequest>();
    private final BlockingQueue<CommitResponse> commitResponseQueue = new LinkedBlockingQueue<CommitResponse>();

    public ConsumerOffsetCommitter(ConsumerManager<K, V> newConsumer, WorkManager<K, V> newWorkManager, ParallelConsumerOptions options) {
        super(newConsumer, newWorkManager);
        this.commitMode = options.getCommitMode();
        this.commitTimeout = options.getOffsetCommitTimeout();
        if (this.commitMode.equals((Object)ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER)) {
            throw new IllegalArgumentException("Cannot use " + (Object)((Object)this.commitMode) + " when using " + this.getClass().getSimpleName());
        }
    }

    void commit() throws TimeoutException, InterruptedException {
        if (this.isOwner()) {
            this.retrieveOffsetsAndCommit();
        } else if (this.isSync()) {
            log.debug("Sync commit");
            this.commitAndWait();
            log.debug("Finished waiting");
        } else {
            log.debug("Async commit to be requested");
            this.requestCommitInternal();
        }
    }

    @Override
    protected void commitOffsets(Map<TopicPartition, OffsetAndMetadata> offsetsToSend, ConsumerGroupMetadata groupMetadata) {
        if (offsetsToSend.isEmpty()) {
            log.trace("Nothing to commit");
            return;
        }
        switch (this.commitMode) {
            case PERIODIC_CONSUMER_SYNC: {
                log.debug("Committing offsets Sync");
                this.consumerMgr.commitSync(offsetsToSend);
                break;
            }
            case PERIODIC_CONSUMER_ASYNCHRONOUS: {
                log.debug("Committing offsets Async");
                this.consumerMgr.commitAsync(offsetsToSend, (offsets, exception) -> {
                    if (exception != null) {
                        log.error("Error committing offsets", (Throwable)exception);
                    }
                });
                break;
            }
            default: {
                throw new IllegalArgumentException("Cannot use " + (Object)((Object)this.commitMode) + " when using " + this.getClass().getSimpleName());
            }
        }
    }

    @Override
    protected void postCommit() {
    }

    private boolean isOwner() {
        return Thread.currentThread().equals(this.owningThread.orElse(null));
    }

    private void commitAndWait() {
        CommitRequest commitRequest = this.requestCommitInternal();
        boolean waitingOnCommitResponse = true;
        int attempts = 0;
        while (waitingOnCommitResponse) {
            if (attempts > 50) {
                throw new InternalRuntimeException("Too many attempts taking commit responses");
            }
            try {
                log.debug("Waiting on a commit response");
                Duration timeout = AbstractParallelEoSStreamProcessor.DEFAULT_TIMEOUT;
                CommitResponse take = this.commitResponseQueue.poll(this.commitTimeout.toMillis(), TimeUnit.MILLISECONDS);
                if (take == null) {
                    throw InternalRuntimeException.msg("Timeout waiting for commit response {} to request {}", timeout, commitRequest);
                }
                waitingOnCommitResponse = take.getRequest().getId() != commitRequest.getId();
            }
            catch (InterruptedException e) {
                log.debug("Interrupted waiting for commit response", (Throwable)e);
            }
            ++attempts;
        }
    }

    private CommitRequest requestCommitInternal() {
        CommitRequest request = new CommitRequest();
        this.commitRequestQueue.add(request);
        this.consumerMgr.wakeup();
        return request;
    }

    void maybeDoCommit() throws TimeoutException, InterruptedException {
        CommitRequest poll = this.commitRequestQueue.poll();
        if (poll != null) {
            log.debug("Commit requested, performing...");
            this.retrieveOffsetsAndCommit();
            if (this.isSync()) {
                log.debug("Adding commit response to queue...");
                this.commitResponseQueue.add(new CommitResponse(poll));
            }
        }
    }

    public boolean isSync() {
        return this.commitMode.equals((Object)ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_SYNC);
    }

    public void claim() {
        this.owningThread = Optional.of(Thread.currentThread());
    }

    public static final class CommitRequest {
        private final UUID id = UUID.randomUUID();
        private final long requestedAtMs = System.currentTimeMillis();

        public UUID getId() {
            return this.id;
        }

        public long getRequestedAtMs() {
            return this.requestedAtMs;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof CommitRequest)) {
                return false;
            }
            CommitRequest other = (CommitRequest)o;
            if (this.getRequestedAtMs() != other.getRequestedAtMs()) {
                return false;
            }
            UUID this$id = this.getId();
            UUID other$id = other.getId();
            return !(this$id == null ? other$id != null : !((Object)this$id).equals(other$id));
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            long $requestedAtMs = this.getRequestedAtMs();
            result = result * 59 + (int)($requestedAtMs >>> 32 ^ $requestedAtMs);
            UUID $id = this.getId();
            result = result * 59 + ($id == null ? 43 : ((Object)$id).hashCode());
            return result;
        }

        public String toString() {
            return "ConsumerOffsetCommitter.CommitRequest(id=" + this.getId() + ", requestedAtMs=" + this.getRequestedAtMs() + ")";
        }
    }

    public static final class CommitResponse {
        private final CommitRequest request;

        public CommitResponse(CommitRequest request) {
            this.request = request;
        }

        public CommitRequest getRequest() {
            return this.request;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof CommitResponse)) {
                return false;
            }
            CommitResponse other = (CommitResponse)o;
            CommitRequest this$request = this.getRequest();
            CommitRequest other$request = other.getRequest();
            return !(this$request == null ? other$request != null : !((Object)this$request).equals(other$request));
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            CommitRequest $request = this.getRequest();
            result = result * 59 + ($request == null ? 43 : ((Object)$request).hashCode());
            return result;
        }

        public String toString() {
            return "ConsumerOffsetCommitter.CommitResponse(request=" + this.getRequest() + ")";
        }
    }
}

