/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.runtime;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.AbstractWorkerSourceTask;
import org.apache.kafka.connect.runtime.ConnectMetrics;
import org.apache.kafka.connect.runtime.SubmittedRecords;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.TransformationChain;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.WorkerSourceTaskContext;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.Stage;
import org.apache.kafka.connect.runtime.errors.ToleranceType;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.TopicAdmin;
import org.apache.kafka.connect.util.TopicCreationGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class WorkerSourceTask
extends AbstractWorkerSourceTask {
    private static final Logger log = LoggerFactory.getLogger(WorkerSourceTask.class);
    private volatile SubmittedRecords.CommittableOffsets committableOffsets = SubmittedRecords.CommittableOffsets.EMPTY;
    private final SubmittedRecords submittedRecords = new SubmittedRecords();
    private final AtomicReference<Exception> producerSendException = new AtomicReference();

    public WorkerSourceTask(ConnectorTaskId id, SourceTask task, TaskStatus.Listener statusListener, TargetState initialState, Converter keyConverter, Converter valueConverter, HeaderConverter headerConverter, TransformationChain<SourceRecord> transformationChain, Producer<byte[], byte[]> producer, TopicAdmin admin, Map<String, TopicCreationGroup> topicGroups, CloseableOffsetStorageReader offsetReader, OffsetStorageWriter offsetWriter, ConnectorOffsetBackingStore offsetStore, WorkerConfig workerConfig, ClusterConfigState configState, ConnectMetrics connectMetrics, ClassLoader loader, Time time, RetryWithToleranceOperator retryWithToleranceOperator, StatusBackingStore statusBackingStore, Executor closeExecutor) {
        super(id, task, statusListener, initialState, keyConverter, valueConverter, headerConverter, transformationChain, new WorkerSourceTaskContext(offsetReader, id, configState, null), producer, admin, topicGroups, offsetReader, offsetWriter, offsetStore, workerConfig, connectMetrics, loader, time, retryWithToleranceOperator, statusBackingStore, closeExecutor);
    }

    @Override
    protected void prepareToInitializeTask() {
    }

    @Override
    protected void prepareToEnterSendLoop() {
    }

    @Override
    protected void beginSendIteration() {
        this.updateCommittableOffsets();
    }

    @Override
    protected void prepareToPollTask() {
        this.maybeThrowProducerSendException();
    }

    @Override
    protected void recordDropped(SourceRecord record) {
        this.commitTaskRecord(record, null);
    }

    @Override
    protected Optional<SubmittedRecords.SubmittedRecord> prepareToSendRecord(SourceRecord sourceRecord, ProducerRecord<byte[], byte[]> producerRecord) {
        this.maybeThrowProducerSendException();
        return Optional.of(this.submittedRecords.submit(sourceRecord));
    }

    @Override
    protected void recordDispatched(SourceRecord record) {
    }

    @Override
    protected void batchDispatched() {
    }

    @Override
    protected void recordSent(SourceRecord sourceRecord, ProducerRecord<byte[], byte[]> producerRecord, RecordMetadata recordMetadata) {
        this.commitTaskRecord(sourceRecord, recordMetadata);
    }

    @Override
    protected void producerSendFailed(boolean synchronous, ProducerRecord<byte[], byte[]> producerRecord, SourceRecord preTransformRecord, Exception e) {
        if (synchronous) {
            throw new ConnectException("Unrecoverable exception trying to send", (Throwable)e);
        }
        String topic = producerRecord.topic();
        if (this.retryWithToleranceOperator.getErrorToleranceType() == ToleranceType.ALL) {
            log.trace("Ignoring failed record send: {} failed to send record to {}: ", new Object[]{this, topic, e});
            this.retryWithToleranceOperator.executeFailed(Stage.KAFKA_PRODUCE, WorkerSourceTask.class, preTransformRecord, (Throwable)e);
            this.commitTaskRecord(preTransformRecord, null);
        } else {
            this.producerSendException.compareAndSet(null, e);
        }
    }

    @Override
    protected void finalOffsetCommit(boolean failed) {
        this.submittedRecords.awaitAllMessages(this.workerConfig.getLong("offset.flush.timeout.ms"), TimeUnit.MILLISECONDS);
        this.updateCommittableOffsets();
        this.commitOffsets();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean commitOffsets() {
        SubmittedRecords.CommittableOffsets offsetsToCommit;
        long commitTimeoutMs = this.workerConfig.getLong("offset.flush.timeout.ms");
        log.debug("{} Committing offsets", (Object)this);
        long started = this.time.milliseconds();
        long timeout = started + commitTimeoutMs;
        WorkerSourceTask workerSourceTask = this;
        synchronized (workerSourceTask) {
            offsetsToCommit = this.committableOffsets;
            this.committableOffsets = SubmittedRecords.CommittableOffsets.EMPTY;
        }
        if (this.committableOffsets.isEmpty()) {
            log.debug("{} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors.", (Object)this);
        } else {
            log.info("{} Committing offsets for {} acknowledged messages", (Object)this, (Object)this.committableOffsets.numCommittableMessages());
            if (this.committableOffsets.hasPending()) {
                log.debug("{} There are currently {} pending messages spread across {} source partitions whose offsets will not be committed. The source partition with the most pending messages is {}, with {} pending messages", new Object[]{this, this.committableOffsets.numUncommittableMessages(), this.committableOffsets.numDeques(), this.committableOffsets.largestDequePartition(), this.committableOffsets.largestDequeSize()});
            } else {
                log.debug("{} There are currently no pending messages for this offset commit; all messages dispatched to the task's producer since the last commit have been acknowledged", (Object)this);
            }
        }
        offsetsToCommit.offsets().forEach(this.offsetWriter::offset);
        if (!this.offsetWriter.beginFlush()) {
            long durationMillis = this.time.milliseconds() - started;
            this.recordCommitSuccess(durationMillis);
            log.debug("{} Finished offset commitOffsets successfully in {} ms", (Object)this, (Object)durationMillis);
            this.commitSourceTask();
            return true;
        }
        Future<Void> flushFuture = this.offsetWriter.doFlush((error, result) -> {
            if (error != null) {
                log.error("{} Failed to flush offsets to storage: ", (Object)this, (Object)error);
            } else {
                log.trace("{} Finished flushing offsets to storage", (Object)this);
            }
        });
        if (flushFuture == null) {
            this.offsetWriter.cancelFlush();
            this.recordCommitFailure(this.time.milliseconds() - started, null);
            return false;
        }
        try {
            flushFuture.get(Math.max(timeout - this.time.milliseconds(), 0L), TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            log.warn("{} Flush of offsets interrupted, cancelling", (Object)this);
            this.offsetWriter.cancelFlush();
            this.recordCommitFailure(this.time.milliseconds() - started, e);
            return false;
        }
        catch (ExecutionException e) {
            log.error("{} Flush of offsets threw an unexpected exception: ", (Object)this, (Object)e);
            this.offsetWriter.cancelFlush();
            this.recordCommitFailure(this.time.milliseconds() - started, e);
            return false;
        }
        catch (TimeoutException e) {
            log.error("{} Timed out waiting to flush offsets to storage; will try again on next flush interval with latest offsets", (Object)this);
            this.offsetWriter.cancelFlush();
            this.recordCommitFailure(this.time.milliseconds() - started, null);
            return false;
        }
        long durationMillis = this.time.milliseconds() - started;
        this.recordCommitSuccess(durationMillis);
        log.debug("{} Finished commitOffsets successfully in {} ms", (Object)this, (Object)durationMillis);
        this.commitSourceTask();
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateCommittableOffsets() {
        SubmittedRecords.CommittableOffsets newOffsets = this.submittedRecords.committableOffsets();
        WorkerSourceTask workerSourceTask = this;
        synchronized (workerSourceTask) {
            this.committableOffsets = this.committableOffsets.updatedWith(newOffsets);
        }
    }

    private void maybeThrowProducerSendException() {
        if (this.producerSendException.get() != null) {
            throw new ConnectException("Unrecoverable exception from producer send callback", (Throwable)this.producerSendException.get());
        }
    }

    public String toString() {
        return "WorkerSourceTask{id=" + this.id + '}';
    }
}

