/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.common;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.connector.common.OffsetReader;
import io.debezium.pipeline.ChangeEventSourceCoordinator;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.pipeline.spi.Partition;
import io.debezium.util.Clock;
import io.debezium.util.ElapsedTimeStrategy;
import io.debezium.util.Metronome;
import io.debezium.util.Strings;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseSourceTask<P extends Partition, O extends OffsetContext>
extends SourceTask {
    private static final Logger LOGGER = LoggerFactory.getLogger(BaseSourceTask.class);
    private static final Duration INITIAL_POLL_PERIOD_IN_MILLIS = Duration.ofMillis(TimeUnit.SECONDS.toMillis(5L));
    private static final Duration MAX_POLL_PERIOD_IN_MILLIS = Duration.ofMillis(TimeUnit.HOURS.toMillis(1L));
    private final AtomicReference<State> state = new AtomicReference<State>(State.STOPPED);
    private final ReentrantLock stateLock = new ReentrantLock();
    private volatile ElapsedTimeStrategy restartDelay;
    private volatile Map<String, String> props;
    private ChangeEventSourceCoordinator<P, O> coordinator;
    private final Map<Map<String, ?>, Map<String, ?>> lastOffsets = new HashMap();
    private Duration retriableRestartWait;
    private final ElapsedTimeStrategy pollOutputDelay;
    private final Clock clock = Clock.system();
    private Instant previousOutputInstant;
    private int previousOutputBatchSize;

    protected BaseSourceTask() {
        this.pollOutputDelay = ElapsedTimeStrategy.exponential(this.clock, INITIAL_POLL_PERIOD_IN_MILLIS, MAX_POLL_PERIOD_IN_MILLIS);
        this.pollOutputDelay.hasElapsed();
        this.previousOutputInstant = this.clock.currentTimeAsInstant();
    }

    public final void start(Map<String, String> props) {
        if (this.context == null) {
            throw new ConnectException("Unexpected null context");
        }
        this.stateLock.lock();
        try {
            if (!this.state.compareAndSet(State.STOPPED, State.RUNNING)) {
                LOGGER.info("Connector has already been started");
                return;
            }
            this.props = props;
            Configuration config = Configuration.from(props);
            this.retriableRestartWait = config.getDuration(CommonConnectorConfig.RETRIABLE_RESTART_WAIT, ChronoUnit.MILLIS);
            this.restartDelay = null;
            if (!config.validateAndRecord(this.getAllConfigurationFields(), arg_0 -> ((Logger)LOGGER).error(arg_0))) {
                throw new ConnectException("Error configuring an instance of " + ((Object)((Object)this)).getClass().getSimpleName() + "; check the logs for details");
            }
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Starting {} with configuration:", (Object)((Object)((Object)this)).getClass().getSimpleName());
                config.withMaskedPasswords().forEach((propName, propValue) -> LOGGER.info("   {} = {}", propName, propValue));
            }
            this.coordinator = this.start(config);
        }
        finally {
            this.stateLock.unlock();
        }
    }

    protected abstract ChangeEventSourceCoordinator<P, O> start(Configuration var1);

    public final List<SourceRecord> poll() throws InterruptedException {
        boolean started = this.startIfNeededAndPossible();
        if (!started) {
            Metronome.parker(Duration.of(2L, ChronoUnit.SECONDS), Clock.SYSTEM).pause();
            return Collections.emptyList();
        }
        try {
            List<SourceRecord> records = this.doPoll();
            this.logStatistics(records);
            return records;
        }
        catch (RetriableException e) {
            this.stop(true);
            throw e;
        }
    }

    protected void logStatistics(List<SourceRecord> records) {
        if (records == null || !LOGGER.isInfoEnabled()) {
            return;
        }
        int batchSize = records.size();
        if (batchSize > 0) {
            if (LOGGER.isDebugEnabled()) {
                LinkedHashMap topicCounts = new LinkedHashMap();
                records.forEach(r -> topicCounts.merge(r.topic(), 1, Integer::sum));
                for (Map.Entry topicCount : topicCounts.entrySet()) {
                    LOGGER.debug("Sending {} records to topic {}", topicCount.getValue(), topicCount.getKey());
                }
            }
            SourceRecord lastRecord = records.get(batchSize - 1);
            this.updateLastOffset(lastRecord.sourcePartition(), lastRecord.sourceOffset());
            this.previousOutputBatchSize += batchSize;
            if (this.pollOutputDelay.hasElapsed()) {
                Instant currentTime = this.clock.currentTime();
                LOGGER.info("{} records sent during previous {}, last recorded offset of {} partition is {}", new Object[]{this.previousOutputBatchSize, Strings.duration(Duration.between(this.previousOutputInstant, currentTime).toMillis()), lastRecord.sourcePartition(), lastRecord.sourceOffset()});
                this.previousOutputInstant = currentTime;
                this.previousOutputBatchSize = 0;
            }
        }
    }

    private void updateLastOffset(Map<String, ?> partition, Map<String, ?> lastOffset) {
        this.stateLock.lock();
        this.lastOffsets.put(partition, lastOffset);
        this.stateLock.unlock();
    }

    protected abstract List<SourceRecord> doPoll() throws InterruptedException;

    private boolean startIfNeededAndPossible() {
        this.stateLock.lock();
        try {
            if (this.state.get() == State.RUNNING) {
                boolean bl = true;
                return bl;
            }
            if (this.restartDelay != null && this.restartDelay.hasElapsed()) {
                this.start(this.props);
                boolean bl = true;
                return bl;
            }
            LOGGER.info("Awaiting end of restart backoff period after a retriable error");
            boolean bl = false;
            return bl;
        }
        finally {
            this.stateLock.unlock();
        }
    }

    public final void stop() {
        this.stop(false);
    }

    private void stop(boolean restart) {
        this.stateLock.lock();
        try {
            if (!this.state.compareAndSet(State.RUNNING, State.STOPPED)) {
                LOGGER.info("Connector has already been stopped");
                return;
            }
            if (restart) {
                LOGGER.warn("Going to restart connector after {} sec. after a retriable exception", (Object)this.retriableRestartWait.getSeconds());
            } else {
                LOGGER.info("Stopping down connector");
            }
            try {
                if (this.coordinator != null) {
                    this.coordinator.stop();
                }
            }
            catch (InterruptedException e) {
                Thread.interrupted();
                LOGGER.error("Interrupted while stopping coordinator", (Throwable)e);
                throw new ConnectException("Interrupted while stopping coordinator, failing the task");
            }
            this.doStop();
            if (restart && this.restartDelay == null) {
                this.restartDelay = ElapsedTimeStrategy.constant(Clock.system(), this.retriableRestartWait.toMillis());
                this.restartDelay.hasElapsed();
            }
        }
        finally {
            this.stateLock.unlock();
        }
    }

    protected abstract void doStop();

    public void commitRecord(SourceRecord record) throws InterruptedException {
        Map currentOffset = record.sourceOffset();
        if (currentOffset != null) {
            this.updateLastOffset(record.sourcePartition(), currentOffset);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public void commit() throws InterruptedException {
        boolean locked = this.stateLock.tryLock();
        if (locked) {
            try {
                if (this.coordinator == null) return;
                Iterator<Map<String, ?>> iterator = this.lastOffsets.keySet().iterator();
                while (iterator.hasNext()) {
                    Map<String, ?> partition = iterator.next();
                    Map<String, ?> lastOffset = this.lastOffsets.get(partition);
                    this.coordinator.commitOffset(partition, lastOffset);
                    iterator.remove();
                }
                return;
            }
            finally {
                this.stateLock.unlock();
            }
        } else {
            LOGGER.warn("Couldn't commit processed log positions with the source database due to a concurrent connector shutdown or restart");
        }
    }

    protected abstract Iterable<Field> getAllConfigurationFields();

    protected Offsets<P, O> getPreviousOffsets(Partition.Provider<P> provider, OffsetContext.Loader<O> loader) {
        Set<P> partitions = provider.getPartitions();
        OffsetReader reader = new OffsetReader(this.context.offsetStorageReader(), loader);
        Map offsets = reader.offsets(partitions);
        boolean found = false;
        for (Partition partition : partitions) {
            OffsetContext offset = (OffsetContext)offsets.get(partition);
            if (offset == null) continue;
            found = true;
            LOGGER.info("Found previous partition offset {}: {}", (Object)partition, offset.getOffset());
        }
        if (!found) {
            LOGGER.info("No previous offsets found");
        }
        return Offsets.of(offsets);
    }

    protected static enum State {
        RUNNING,
        STOPPED;

    }
}

