/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.runtime.errors;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
import org.apache.kafka.connect.runtime.errors.ErrorReporter;
import org.apache.kafka.connect.runtime.errors.Operation;
import org.apache.kafka.connect.runtime.errors.ProcessingContext;
import org.apache.kafka.connect.runtime.errors.Stage;
import org.apache.kafka.connect.runtime.errors.ToleranceType;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RetryWithToleranceOperator
implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(RetryWithToleranceOperator.class);
    public static final long RETRIES_DELAY_MIN_MS = 300L;
    private static final Map<Stage, Class<? extends Exception>> TOLERABLE_EXCEPTIONS = new HashMap<Stage, Class<? extends Exception>>();
    private final long errorRetryTimeout;
    private final long errorMaxDelayInMillis;
    private final ToleranceType errorToleranceType;
    private long totalFailures = 0L;
    private final Time time;
    private ErrorHandlingMetrics errorHandlingMetrics;
    protected final ProcessingContext context;

    public RetryWithToleranceOperator(long errorRetryTimeout, long errorMaxDelayInMillis, ToleranceType toleranceType, Time time) {
        this(errorRetryTimeout, errorMaxDelayInMillis, toleranceType, time, new ProcessingContext());
    }

    RetryWithToleranceOperator(long errorRetryTimeout, long errorMaxDelayInMillis, ToleranceType toleranceType, Time time, ProcessingContext context) {
        this.errorRetryTimeout = errorRetryTimeout;
        this.errorMaxDelayInMillis = errorMaxDelayInMillis;
        this.errorToleranceType = toleranceType;
        this.time = time;
        this.context = context;
    }

    public synchronized Future<Void> executeFailed(Stage stage, Class<?> executingClass, ConsumerRecord<byte[], byte[]> consumerRecord, Throwable error) {
        this.markAsFailed();
        this.context.consumerRecord(consumerRecord);
        this.context.currentContext(stage, executingClass);
        this.context.error(error);
        this.errorHandlingMetrics.recordFailure();
        Future<Void> errantRecordFuture = this.context.report();
        if (!this.withinToleranceLimits()) {
            this.errorHandlingMetrics.recordError();
            throw new ConnectException("Tolerance exceeded in error handler", error);
        }
        return errantRecordFuture;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized <V> V execute(Operation<V> operation, Stage stage, Class<?> executingClass) {
        this.context.currentContext(stage, executingClass);
        if (this.context.failed()) {
            log.debug("ProcessingContext is already in failed state. Ignoring requested operation.");
            return null;
        }
        try {
            Class<RetriableException> ex = TOLERABLE_EXCEPTIONS.getOrDefault((Object)this.context.stage(), RetriableException.class);
            V v = this.execAndHandleError(operation, ex);
            return v;
        }
        finally {
            if (this.context.failed()) {
                this.errorHandlingMetrics.recordError();
                this.context.report();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected <V> V execAndRetry(Operation<V> operation) throws Exception {
        int attempt = 0;
        long startTime = this.time.milliseconds();
        long deadline = startTime + this.errorRetryTimeout;
        while (true) {
            try {
                ++attempt;
                Object v = operation.call();
                return v;
            }
            catch (RetriableException e) {
                log.trace("Caught a retriable exception while executing {} operation with {}", (Object)this.context.stage(), this.context.executingClass());
                this.errorHandlingMetrics.recordFailure();
                if (this.checkRetry(startTime)) {
                    this.backoff(attempt, deadline);
                    if (Thread.currentThread().isInterrupted()) {
                        log.trace("Thread was interrupted. Marking operation as failed.");
                        this.context.error(e);
                        V v = null;
                        return v;
                    }
                    this.errorHandlingMetrics.recordRetry();
                    continue;
                }
                log.trace("Can't retry. start={}, attempt={}, deadline={}", new Object[]{startTime, attempt, deadline});
                this.context.error(e);
                V v = null;
                return v;
            }
            finally {
                this.context.attempt(attempt);
                continue;
            }
            break;
        }
    }

    protected <V> V execAndHandleError(Operation<V> operation, Class<? extends Exception> tolerated) {
        try {
            V result = this.execAndRetry(operation);
            if (this.context.failed()) {
                this.markAsFailed();
                this.errorHandlingMetrics.recordSkipped();
            }
            return result;
        }
        catch (Exception e) {
            this.errorHandlingMetrics.recordFailure();
            this.markAsFailed();
            this.context.error(e);
            if (!tolerated.isAssignableFrom(e.getClass())) {
                throw new ConnectException("Unhandled exception in error handler", (Throwable)e);
            }
            if (!this.withinToleranceLimits()) {
                throw new ConnectException("Tolerance exceeded in error handler", (Throwable)e);
            }
            this.errorHandlingMetrics.recordSkipped();
            return null;
        }
    }

    void markAsFailed() {
        this.errorHandlingMetrics.recordErrorTimestamp();
        ++this.totalFailures;
    }

    public synchronized boolean withinToleranceLimits() {
        switch (this.errorToleranceType) {
            case NONE: {
                if (this.totalFailures > 0L) {
                    return false;
                }
            }
            case ALL: {
                return true;
            }
        }
        throw new ConfigException("Unknown tolerance type: {}", (Object)this.errorToleranceType);
    }

    boolean checkRetry(long startTime) {
        return this.time.milliseconds() - startTime < this.errorRetryTimeout;
    }

    void backoff(int attempt, long deadline) {
        int numRetry = attempt - 1;
        long delay = 300L << numRetry;
        if (delay > this.errorMaxDelayInMillis) {
            delay = ThreadLocalRandom.current().nextLong(this.errorMaxDelayInMillis);
        }
        if (delay + this.time.milliseconds() > deadline) {
            delay = deadline - this.time.milliseconds();
        }
        log.debug("Sleeping for {} millis", (Object)delay);
        this.time.sleep(delay);
    }

    public synchronized void metrics(ErrorHandlingMetrics errorHandlingMetrics) {
        this.errorHandlingMetrics = errorHandlingMetrics;
    }

    public String toString() {
        return "RetryWithToleranceOperator{errorRetryTimeout=" + this.errorRetryTimeout + ", errorMaxDelayInMillis=" + this.errorMaxDelayInMillis + ", errorToleranceType=" + (Object)((Object)this.errorToleranceType) + ", totalFailures=" + this.totalFailures + ", time=" + this.time + ", context=" + this.context + '}';
    }

    public synchronized void reporters(List<ErrorReporter> reporters) {
        this.context.reporters(reporters);
    }

    public synchronized void sourceRecord(SourceRecord preTransformRecord) {
        this.context.sourceRecord(preTransformRecord);
    }

    public synchronized void consumerRecord(ConsumerRecord<byte[], byte[]> consumedMessage) {
        this.context.consumerRecord(consumedMessage);
    }

    public synchronized boolean failed() {
        return this.context.failed();
    }

    public synchronized Throwable error() {
        return this.context.error();
    }

    @Override
    public synchronized void close() {
        this.context.close();
    }

    static {
        TOLERABLE_EXCEPTIONS.put(Stage.TRANSFORMATION, Exception.class);
        TOLERABLE_EXCEPTIONS.put(Stage.HEADER_CONVERTER, Exception.class);
        TOLERABLE_EXCEPTIONS.put(Stage.KEY_CONVERTER, Exception.class);
        TOLERABLE_EXCEPTIONS.put(Stage.VALUE_CONVERTER, Exception.class);
    }
}

