/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.jms.processors.ioconcept.reader;

import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.jms.processors.ioconcept.reader.FlowFileReader;
import org.apache.nifi.jms.processors.ioconcept.reader.FlowFileReaderCallback;
import org.apache.nifi.jms.processors.ioconcept.reader.MessageHandler;
import org.apache.nifi.jms.processors.ioconcept.reader.record.RecordSupplier;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.util.StopWatch;

public class StateTrackingFlowFileReader
implements FlowFileReader {
    public static final String ATTR_READ_FAILED_INDEX_SUFFIX = ".read.failed.index";
    private final String identifier;
    private final RecordSupplier recordSupplier;
    private final ComponentLog logger;

    public StateTrackingFlowFileReader(String identifier, RecordSupplier recordSupplier, ComponentLog logger) {
        this.identifier = identifier;
        this.recordSupplier = recordSupplier;
        this.logger = logger;
    }

    @Override
    public void read(ProcessSession session, FlowFile flowFile, MessageHandler messageHandler, FlowFileReaderCallback flowFileReaderCallback) {
        StopWatch stopWatch = new StopWatch(true);
        AtomicInteger processedRecords = new AtomicInteger();
        String publishFailedIndexAttributeName = this.identifier + ATTR_READ_FAILED_INDEX_SUFFIX;
        try {
            boolean isRecover;
            Long previousProcessFailedAt = Optional.ofNullable(flowFile.getAttribute(publishFailedIndexAttributeName)).map(Long::valueOf).orElse(null);
            session.read(flowFile, in -> this.recordSupplier.process(flowFile, in, processedRecords, previousProcessFailedAt, this.logger, messageHandler));
            FlowFile successFlowFile = flowFile;
            boolean bl = isRecover = previousProcessFailedAt != null;
            if (isRecover) {
                successFlowFile = session.removeAttribute(flowFile, publishFailedIndexAttributeName);
            }
            flowFileReaderCallback.onSuccess(successFlowFile, processedRecords.get(), isRecover, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
        }
        catch (Exception e) {
            this.logger.error("An error happened while processing records. Routing to failure.", (Throwable)e);
            FlowFile failedFlowFile = session.putAttribute(flowFile, publishFailedIndexAttributeName, String.valueOf(processedRecords.get()));
            flowFileReaderCallback.onFailure(failedFlowFile, processedRecords.get(), stopWatch.getElapsed(TimeUnit.MILLISECONDS), e);
        }
    }
}

