/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.hadoop;

import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processors.hadoop.GetHDFS;
import org.apache.nifi.processors.hadoop.KeyValueReader;
import org.apache.nifi.processors.hadoop.PutHDFS;
import org.apache.nifi.processors.hadoop.ValueReader;
import org.apache.nifi.processors.hadoop.util.GSSExceptionRollbackYieldSessionHandler;
import org.apache.nifi.processors.hadoop.util.SequenceFileReader;
import org.apache.nifi.util.StopWatch;

@TriggerWhenEmpty
@Tags(value={"hadoop", "HCFS", "HDFS", "get", "fetch", "ingest", "source", "sequence file"})
@CapabilityDescription(value="Fetch sequence files from Hadoop Distributed File System (HDFS) into FlowFiles")
@SeeAlso(value={PutHDFS.class})
public class GetHDFSSequenceFile
extends GetHDFS {
    static final String VALUE_ONLY = "VALUE ONLY";
    static final PropertyDescriptor FLOWFILE_CONTENT = new PropertyDescriptor.Builder().name("FlowFile Content").description("Indicate if the content is to be both the key and value of the Sequence File, or just the value.").allowableValues(new String[]{"VALUE ONLY", "KEY VALUE PAIR"}).defaultValue("VALUE ONLY").required(true).build();

    @Override
    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList<PropertyDescriptor> someProps = new ArrayList<PropertyDescriptor>(super.getSupportedPropertyDescriptors());
        someProps.add(FLOWFILE_CONTENT);
        return Collections.unmodifiableList(someProps);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void processBatchOfFiles(List<Path> files, ProcessContext context, ProcessSession session) {
        Configuration conf = this.getConfiguration();
        FileSystem hdfs = this.getFileSystem();
        String flowFileContentValue = context.getProperty(FLOWFILE_CONTENT).getValue();
        boolean keepSourceFiles = context.getProperty(KEEP_SOURCE_FILE).asBoolean();
        Double bufferSizeProp = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B);
        if (bufferSizeProp != null) {
            int bufferSize = bufferSizeProp.intValue();
            conf.setInt("io.file.buffer.size", bufferSize);
        }
        ComponentLog logger = this.getLogger();
        SequenceFileReader<Set<FlowFile>> reader = flowFileContentValue.equalsIgnoreCase(VALUE_ONLY) ? new ValueReader(session) : new KeyValueReader(session);
        Set<Object> flowFiles = Collections.emptySet();
        for (Path file : files) {
            Object dataRate;
            long totalSize;
            StopWatch stopWatch;
            block14: {
                block13: {
                    if (!this.isScheduled()) break;
                    stopWatch = new StopWatch(false);
                    stopWatch.start();
                    if (hdfs.exists(file)) break block13;
                    stopWatch.stop();
                    totalSize = 0L;
                    for (FlowFile flowFile : flowFiles) {
                        totalSize += flowFile.getSize();
                        session.getProvenanceReporter().receive(flowFile, file.toString());
                    }
                    if (totalSize <= 0L) continue;
                    dataRate = stopWatch.calculateDataRate(totalSize);
                    long l = stopWatch.getDuration(TimeUnit.MILLISECONDS);
                    logger.info("Created {} flowFiles from SequenceFile {}. Ingested in {} milliseconds at a rate of {}", new Object[]{flowFiles.size(), file.toUri().toASCIIString(), l, dataRate});
                    logger.info("Transferred flowFiles {}  to success", new Object[]{flowFiles});
                    session.transfer(flowFiles, REL_SUCCESS);
                    continue;
                }
                try {
                    logger.debug("Reading file");
                    flowFiles = this.getFlowFiles(conf, hdfs, reader, file);
                    if (keepSourceFiles || hdfs.delete(file, false)) break block14;
                    logger.warn("Unable to delete path {} from HDFS.  Will likely be picked up over and over...", new Object[]{file});
                }
                catch (Throwable t) {
                    block15: {
                        try {
                            String errorString = "Error retrieving file {} from HDFS due to {}";
                            if (!this.handleAuthErrors(t, session, context, new GSSExceptionRollbackYieldSessionHandler())) {
                                logger.error("Error retrieving file {} from HDFS due to {}", new Object[]{file, t});
                                session.rollback();
                                context.yield();
                                break block15;
                            }
                            logger.warn("Error retrieving file {} from HDFS due to {}", new Object[]{file, t});
                        }
                        catch (Throwable throwable) {
                            stopWatch.stop();
                            long totalSize2 = 0L;
                            for (FlowFile flowFile : flowFiles) {
                                totalSize2 += flowFile.getSize();
                                session.getProvenanceReporter().receive(flowFile, file.toString());
                            }
                            if (totalSize2 > 0L) {
                                String dataRate2 = stopWatch.calculateDataRate(totalSize2);
                                long l = stopWatch.getDuration(TimeUnit.MILLISECONDS);
                                logger.info("Created {} flowFiles from SequenceFile {}. Ingested in {} milliseconds at a rate of {}", new Object[]{flowFiles.size(), file.toUri().toASCIIString(), l, dataRate2});
                                logger.info("Transferred flowFiles {}  to success", new Object[]{flowFiles});
                                session.transfer(flowFiles, REL_SUCCESS);
                            }
                            throw throwable;
                        }
                    }
                    stopWatch.stop();
                    totalSize = 0L;
                    for (FlowFile flowFile : flowFiles) {
                        totalSize += flowFile.getSize();
                        session.getProvenanceReporter().receive(flowFile, file.toString());
                    }
                    if (totalSize <= 0L) continue;
                    dataRate = stopWatch.calculateDataRate(totalSize);
                    long l = stopWatch.getDuration(TimeUnit.MILLISECONDS);
                    logger.info("Created {} flowFiles from SequenceFile {}. Ingested in {} milliseconds at a rate of {}", new Object[]{flowFiles.size(), file.toUri().toASCIIString(), l, dataRate});
                    logger.info("Transferred flowFiles {}  to success", new Object[]{flowFiles});
                    session.transfer(flowFiles, REL_SUCCESS);
                    continue;
                }
            }
            stopWatch.stop();
            totalSize = 0L;
            for (FlowFile flowFile : flowFiles) {
                totalSize += flowFile.getSize();
                session.getProvenanceReporter().receive(flowFile, file.toString());
            }
            if (totalSize <= 0L) continue;
            dataRate = stopWatch.calculateDataRate(totalSize);
            long l = stopWatch.getDuration(TimeUnit.MILLISECONDS);
            logger.info("Created {} flowFiles from SequenceFile {}. Ingested in {} milliseconds at a rate of {}", new Object[]{flowFiles.size(), file.toUri().toASCIIString(), l, dataRate});
            logger.info("Transferred flowFiles {}  to success", new Object[]{flowFiles});
            session.transfer(flowFiles, REL_SUCCESS);
        }
    }

    protected Set<FlowFile> getFlowFiles(Configuration conf, FileSystem hdfs, SequenceFileReader<Set<FlowFile>> reader, Path file) throws Exception {
        PrivilegedExceptionAction<Set> privilegedExceptionAction = () -> (Set)reader.readSequenceFile(file, conf, hdfs);
        UserGroupInformation userGroupInformation = this.getUserGroupInformation();
        if (userGroupInformation == null) {
            return privilegedExceptionAction.run();
        }
        return (Set)userGroupInformation.doAs(privilegedExceptionAction);
    }
}

