/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.record.sink.RecordSinkService;
import org.apache.nifi.record.sink.RetryableIOException;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;

@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@Tags(value={"record", "put", "sink"})
@CapabilityDescription(value="The PutRecord processor uses a specified RecordReader to input (possibly multiple) records from an incoming flow file, and sends them to a destination specified by a Record Destination Service (i.e. record sink).")
public class PutRecord
extends AbstractProcessor {
    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("Record Reader").description("Specifies the Controller Service to use for reading incoming data").identifiesControllerService(RecordReaderFactory.class).required(true).build();
    public static final PropertyDescriptor RECORD_SINK = new PropertyDescriptor.Builder().name("Record Destination Service").description("Specifies the Controller Service to use for writing out the query result records to some destination.").identifiesControllerService(RecordSinkService.class).required(true).build();
    public static final PropertyDescriptor INCLUDE_ZERO_RECORD_RESULTS = new PropertyDescriptor.Builder().name("Include Zero Record Results").description("If no records are read from the incoming FlowFile, this property specifies whether or not an empty record set will be transmitted. The original FlowFile will still be routed to success, but if no transmission occurs, no provenance SEND event will be generated.").allowableValues(new String[]{"true", "false"}).defaultValue("false").required(true).build();
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(RECORD_READER, RECORD_SINK, INCLUDE_ZERO_RECORD_RESULTS);
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("The original FlowFile will be routed to this relationship if the records were transmitted successfully").build();
    static final Relationship REL_RETRY = new Relationship.Builder().name("retry").description("The original FlowFile is routed to this relationship if the records could not be transmitted but attempting the operation again may succeed").build();
    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("A FlowFile is routed to this relationship if the records could not be transmitted and retrying the operation will also fail").build();
    private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE, REL_RETRY);
    private volatile RecordSinkService recordSinkService;

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    @OnScheduled
    public void onScheduled(ProcessContext context) {
        this.recordSinkService = (RecordSinkService)context.getProperty(RECORD_SINK).asControllerService(RecordSinkService.class);
        this.recordSinkService.reset();
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        StopWatch stopWatch = new StopWatch(true);
        try (InputStream in = session.read(flowFile);){
            RecordReaderFactory recordParserFactory = (RecordReaderFactory)context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
            RecordReader recordParser = recordParserFactory.createRecordReader(flowFile, in, this.getLogger());
            RecordSet recordSet = recordParser.createRecordSet();
            boolean transmitZeroRecords = context.getProperty(INCLUDE_ZERO_RECORD_RESULTS).asBoolean();
            WriteResult writeResult = this.recordSinkService.sendData(recordSet, new HashMap(flowFile.getAttributes()), transmitZeroRecords);
            String recordSinkURL = (String)writeResult.getAttributes().get("record.sink.url");
            if (StringUtils.isEmpty((String)recordSinkURL)) {
                recordSinkURL = "unknown://";
            }
            long transmissionMillis = stopWatch.getElapsed(TimeUnit.MILLISECONDS);
            if (writeResult.getRecordCount() > 0 || transmitZeroRecords) {
                session.getProvenanceReporter().send(flowFile, recordSinkURL, transmissionMillis);
            }
        }
        catch (RetryableIOException rioe) {
            this.getLogger().warn("Error during transmission of records due to {}, routing to retry", new Object[]{rioe.getMessage(), rioe});
            session.transfer(flowFile, REL_RETRY);
            return;
        }
        catch (SchemaNotFoundException snfe) {
            throw new ProcessException("Error determining schema of flowfile records: " + snfe.getMessage(), (Throwable)snfe);
        }
        catch (MalformedRecordException e) {
            this.getLogger().error("Error reading records from {} due to {}, routing to failure", new Object[]{flowFile, e.getMessage(), e});
            session.penalize(flowFile);
            session.transfer(flowFile, REL_FAILURE);
            return;
        }
        catch (IOException ioe) {
            if (ioe.getCause() instanceof MalformedRecordException) {
                this.getLogger().error("Error reading records from {} due to {}, routing to failure", new Object[]{flowFile, ioe.getMessage(), ioe});
                session.penalize(flowFile);
                session.transfer(flowFile, REL_FAILURE);
                return;
            }
            throw new ProcessException("Error reading from flowfile input stream: " + ioe.getMessage(), (Throwable)ioe);
        }
        catch (Exception e) {
            this.getLogger().error("Error during transmission of records due to {}, routing to failure", new Object[]{e.getMessage(), e});
            session.transfer(flowFile, REL_FAILURE);
            return;
        }
        session.transfer(flowFile, REL_SUCCESS);
    }

    public void migrateProperties(PropertyConfiguration config) {
        config.renameProperty("put-record-reader", RECORD_READER.getName());
        config.renameProperty("put-record-sink", RECORD_SINK.getName());
        config.renameProperty("put-record-include-zero-record-results", INCLUDE_ZERO_RECORD_RESULTS.getName());
    }
}

