/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.iceberg;

import java.io.IOException;
import java.io.InputStream;
import java.time.Clock;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.types.Types;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.iceberg.TableIdentifierFlowFileFilter;
import org.apache.nifi.processors.iceberg.record.DelegatedRecord;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.services.iceberg.IcebergCatalog;
import org.apache.nifi.services.iceberg.IcebergRowWriter;
import org.apache.nifi.services.iceberg.IcebergWriter;

@Tags(value={"iceberg", "analytics", "polaris", "s3"})
@CapabilityDescription(value="Store records in Iceberg Table using configurable Catalog for managing namespaces and tables.")
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
public class PutIcebergRecord
extends AbstractProcessor {
    static final PropertyDescriptor ICEBERG_CATALOG = new PropertyDescriptor.Builder().name("Iceberg Catalog").description("Provider Service for Iceberg Catalog").required(true).identifiesControllerService(IcebergCatalog.class).build();
    static final PropertyDescriptor ICEBERG_WRITER = new PropertyDescriptor.Builder().name("Iceberg Writer").description("Provider Service for Iceberg Row Writers responsible for producing formatted Iceberg Data Files").required(true).identifiesControllerService(IcebergWriter.class).build();
    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("Record Reader").description("Record Reader for incoming FlowFiles").required(true).identifiesControllerService(RecordReaderFactory.class).build();
    static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder().name("Namespace").description("Iceberg Namespace containing Tables").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_BLANK_VALIDATOR).build();
    static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder().name("Table Name").description("Iceberg Table Name").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_BLANK_VALIDATOR).build();
    static final Relationship SUCCESS = new Relationship.Builder().name("success").description("FlowFiles transferred to Iceberg").build();
    static final Relationship FAILURE = new Relationship.Builder().name("failure").description("FlowFiles not transferred to Iceberg").build();
    static final String RECORDS_PROCESSED_COUNTER = "Records Processed";
    static final String DATA_FILES_PROCESSED_COUNTER = "Data Files Processed";
    private static final List<PropertyDescriptor> properties = List.of(ICEBERG_CATALOG, ICEBERG_WRITER, RECORD_READER, NAMESPACE, TABLE_NAME);
    private static final Set<Relationship> relationships = Set.of(SUCCESS, FAILURE);
    private static final long MAXIMUM_BYTES = 0x20000000L;
    private final Clock clock = Clock.systemDefaultZone();
    private volatile Catalog catalog;
    private volatile IcebergWriter icebergWriter;

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return properties;
    }

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    @OnScheduled
    public void onScheduled(ProcessContext context) {
        IcebergCatalog icebergCatalog = (IcebergCatalog)context.getProperty(ICEBERG_CATALOG).asControllerService(IcebergCatalog.class);
        this.catalog = icebergCatalog.getCatalog();
        this.icebergWriter = (IcebergWriter)context.getProperty(ICEBERG_WRITER).asControllerService(IcebergWriter.class);
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        TableIdentifierFlowFileFilter flowFileFilter = new TableIdentifierFlowFileFilter(context, 0x20000000L);
        List flowFiles = session.get((FlowFileFilter)flowFileFilter);
        if (flowFiles.isEmpty()) {
            return;
        }
        TableIdentifier tableIdentifier = flowFileFilter.getTableIdentifier();
        RecordReaderFactory recordReaderFactory = (RecordReaderFactory)context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
        this.processFlowFiles(session, flowFiles, tableIdentifier, recordReaderFactory);
    }

    private void processFlowFiles(ProcessSession session, List<FlowFile> flowFiles, TableIdentifier tableIdentifier, RecordReaderFactory recordReaderFactory) {
        long started = this.clock.millis();
        AtomicReference<Relationship> relationship = new AtomicReference<Relationship>(SUCCESS);
        Table table = this.getTable(tableIdentifier);
        Schema schema = table.schema();
        Types.StructType struct = schema.asStruct();
        IcebergRowWriter rowWriter = this.icebergWriter.getRowWriter(table);
        for (FlowFile flowFile : flowFiles) {
            try {
                InputStream inputStream = session.read(flowFile);
                try {
                    RecordReader recordReader = recordReaderFactory.createRecordReader(flowFile, inputStream, this.getLogger());
                    try {
                        AtomicLong recordsProcessed = new AtomicLong();
                        try {
                            this.writeRecords(recordReader, rowWriter, struct, recordsProcessed);
                            session.adjustCounter(RECORDS_PROCESSED_COUNTER, recordsProcessed.get(), false);
                        }
                        catch (Exception e) {
                            this.getLogger().error("Write Rows to Table [{}] failed {}", new Object[]{tableIdentifier, flowFile, e});
                            this.abortWriter(rowWriter, tableIdentifier);
                            relationship.set(FAILURE);
                        }
                    }
                    finally {
                        if (recordReader == null) continue;
                        recordReader.close();
                    }
                }
                finally {
                    if (inputStream == null) continue;
                    inputStream.close();
                }
            }
            catch (Exception e) {
                this.getLogger().error("Processing Records for Table [{}] failed {}", new Object[]{tableIdentifier, flowFile, e});
                this.abortWriter(rowWriter, tableIdentifier);
                relationship.set(FAILURE);
            }
        }
        if (SUCCESS.equals((Object)relationship.get())) {
            try {
                DataFile[] dataFiles = rowWriter.dataFiles();
                this.appendDataFiles(table, dataFiles);
                session.adjustCounter(DATA_FILES_PROCESSED_COUNTER, (long)dataFiles.length, false);
            }
            catch (Exception e) {
                this.getLogger().error("Appending Data Files to Table [{}] failed", new Object[]{tableIdentifier, e});
                relationship.set(FAILURE);
            }
        }
        try {
            rowWriter.close();
        }
        catch (IOException e) {
            this.getLogger().warn("Failed to close Row Writer for Table [{}]", new Object[]{tableIdentifier, e});
        }
        if (SUCCESS.equals((Object)relationship.get())) {
            long elapsed = this.clock.millis() - started;
            String transitUri = table.location();
            for (FlowFile flowFile : flowFiles) {
                session.getProvenanceReporter().send(flowFile, transitUri, elapsed);
            }
        }
        session.transfer(flowFiles, relationship.get());
    }

    private Table getTable(TableIdentifier tableIdentifier) {
        if (!this.catalog.tableExists(tableIdentifier)) {
            throw new IllegalStateException("Table [%s] not found in Catalog".formatted(tableIdentifier));
        }
        Table table = this.catalog.loadTable(tableIdentifier);
        return table;
    }

    private void writeRecords(RecordReader recordReader, IcebergRowWriter rowWriter, Types.StructType struct, AtomicLong recordsProcessed) throws IOException, MalformedRecordException {
        org.apache.nifi.serialization.record.Record inputRecord = recordReader.nextRecord();
        while (inputRecord != null) {
            DelegatedRecord delegatedRecord = new DelegatedRecord(inputRecord, struct);
            rowWriter.write((Record)delegatedRecord);
            inputRecord = recordReader.nextRecord();
            recordsProcessed.incrementAndGet();
        }
    }

    private void appendDataFiles(Table table, DataFile[] dataFiles) {
        AppendFiles appendFiles = table.newAppend();
        for (DataFile dataFile : dataFiles) {
            appendFiles.appendFile(dataFile);
        }
        appendFiles.commit();
    }

    private void abortWriter(IcebergRowWriter rowWriter, TableIdentifier tableIdentifier) {
        try {
            rowWriter.abort();
        }
        catch (IOException e) {
            this.getLogger().warn("Abort Writing to Table [{}] failed", new Object[]{tableIdentifier, e});
        }
    }
}

