/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.tests.system;

import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;

public class ConcatenateFlowFiles
extends AbstractSessionFactoryProcessor {
    static final PropertyDescriptor FLOWFILE_COUNT = new PropertyDescriptor.Builder().name("FlowFile Count").displayName("FlowFile Count").description("Number of FlowFiles to concatenate together").required(true).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
    static final Relationship ORIGINAL = new Relationship.Builder().name("original").build();
    static final Relationship MERGED = new Relationship.Builder().name("merged").build();
    private int flowFileCount;
    private List<FlowFile> flowFiles;
    private ProcessSession mergeSession;

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return Collections.singletonList(FLOWFILE_COUNT);
    }

    public Set<Relationship> getRelationships() {
        return new HashSet<Relationship>(Arrays.asList(ORIGINAL, MERGED));
    }

    @OnScheduled
    public void setup(ProcessContext context) {
        this.flowFileCount = context.getProperty(FLOWFILE_COUNT).asInteger();
        this.flowFiles = new ArrayList<FlowFile>();
    }

    @OnStopped
    public void reset() {
        this.flowFiles.clear();
        this.mergeSession.rollback();
        this.mergeSession = null;
    }

    public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
        ProcessSession pollSession = sessionFactory.createSession();
        FlowFile flowFile = pollSession.get();
        if (flowFile == null) {
            return;
        }
        if (this.mergeSession == null) {
            this.mergeSession = sessionFactory.createSession();
        }
        pollSession.migrate(this.mergeSession, Collections.singleton(flowFile));
        this.flowFiles.add(flowFile);
        if (this.flowFiles.size() == this.flowFileCount) {
            FlowFile merged = this.mergeSession.create(this.flowFiles);
            try (OutputStream out = this.mergeSession.write(merged);){
                for (FlowFile input : this.flowFiles) {
                    InputStream in = this.mergeSession.read(input);
                    try {
                        StreamUtils.copy((InputStream)in, (OutputStream)out);
                    }
                    finally {
                        if (in == null) continue;
                        in.close();
                    }
                }
            }
            catch (Exception e) {
                this.mergeSession.rollback();
                throw new ProcessException("Failed to merge", (Throwable)e);
            }
            this.mergeSession.transfer(merged, MERGED);
            this.mergeSession.transfer(this.flowFiles, ORIGINAL);
            this.flowFiles.clear();
            this.mergeSession.commitAsync();
        } else {
            context.yield();
        }
    }
}

