/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder.kafka.streams;

import java.util.function.BiConsumer;
import java.util.function.Function;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.springframework.util.Assert;

public class RecordRecoverableProcessor<KIn, VIn, KOut, VOut>
implements Processor<KIn, VIn, KOut, VOut> {
    private static final Log LOG = LogFactory.getLog(RecordRecoverableProcessor.class);
    private final Function<Record<KIn, VIn>, Record<KOut, VOut>> delegateFunction;
    private BiConsumer<Record<KIn, VIn>, Exception> processorRecordRecoverer;
    private ProcessorContext<KOut, VOut> context;

    public RecordRecoverableProcessor(Function<Record<KIn, VIn>, Record<KOut, VOut>> delegateFunction) {
        this.delegateFunction = delegateFunction;
    }

    public RecordRecoverableProcessor(Function<Record<KIn, VIn>, Record<KOut, VOut>> delegateFunction, BiConsumer<Record<KIn, VIn>, Exception> processorRecordRecoverer) {
        this.delegateFunction = delegateFunction;
        Assert.notNull(processorRecordRecoverer, (String)"You must provide a valid processor recoverer");
        this.processorRecordRecoverer = processorRecordRecoverer;
    }

    public void init(ProcessorContext<KOut, VOut> context) {
        super.init(context);
        this.context = context;
    }

    public void process(Record<KIn, VIn> record) {
        try {
            Record<KOut, VOut> downstreamRecord = this.delegateFunction.apply(record);
            this.context.forward(downstreamRecord);
        }
        catch (Exception exception) {
            if (this.processorRecordRecoverer == null) {
                this.processorRecordRecoverer = this.defaultProcessorRecordRecoverer();
            }
            this.processorRecordRecoverer.accept(record, exception);
        }
    }

    public void close() {
        super.close();
    }

    protected BiConsumer<Record<KIn, VIn>, Exception> defaultProcessorRecordRecoverer() {
        return (r, e) -> LOG.warn((Object)"Runtime Exceptions: ", (Throwable)e);
    }
}

