/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors;

import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.session.Session;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.AbstractIoTDB;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.Field;
import org.apache.tsfile.read.common.RowRecord;

@SupportsBatching
@Tags(value={"IoT", "Timeseries"})
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription(value="Query Apache IoTDB and write results as Records")
@WritesAttributes(value={@WritesAttribute(attribute="iotdb.error.message", description="Error message written on query failures"), @WritesAttribute(attribute="mime.type", description="Content Type based on configured Record Set Writer")})
public class QueryIoTDBRecord
extends AbstractIoTDB {
    public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder().name("Query").description("IoTDB query to be executed").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder().name("Fetch Size").description("Maximum number of results to return in a single chunk. Configuring 1 or more enables result set chunking").defaultValue(String.valueOf(10000)).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.createLongValidator((long)0L, (long)100000L, (boolean)true)).required(true).build();
    public static final PropertyDescriptor RECORD_WRITER_FACTORY = new PropertyDescriptor.Builder().name("Record Writer").description("Service for writing IoTDB query results as records").identifiesControllerService(RecordSetWriterFactory.class).required(true).build();
    public static final String IOTDB_ERROR_MESSAGE = "iotdb.error.message";
    public static final String MIME_TYPE = "mime.type";

    @Override
    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList<PropertyDescriptor> propertyDescriptors = new ArrayList<PropertyDescriptor>(super.getSupportedPropertyDescriptors());
        propertyDescriptors.add(QUERY);
        propertyDescriptors.add(FETCH_SIZE);
        propertyDescriptors.add(RECORD_WRITER_FACTORY);
        return Collections.unmodifiableList(propertyDescriptors);
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        String query = context.getProperty(QUERY).evaluateAttributeExpressions(flowFile).getValue();
        int fetchSize = context.getProperty(FETCH_SIZE).evaluateAttributeExpressions(flowFile).asInteger();
        RecordSetWriterFactory recordSetWriterFactory = (RecordSetWriterFactory)context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
        try (SessionDataSet sessionDataSet = ((Session)this.session.get()).executeQueryStatement(query);
             OutputStream outputStream = session.write(flowFile);){
            sessionDataSet.setFetchSize(fetchSize);
            RecordSchema recordSchema = this.getRecordSchema(sessionDataSet);
            RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(this.getLogger(), recordSchema, outputStream, flowFile);
            while (sessionDataSet.hasNext()) {
                RowRecord rowRecord = sessionDataSet.next();
                Record record = this.getRecord(recordSchema, rowRecord);
                recordSetWriter.write(record);
            }
            recordSetWriter.close();
            flowFile = session.putAttribute(flowFile, MIME_TYPE, recordSetWriter.getMimeType());
            session.transfer(flowFile, REL_SUCCESS);
        }
        catch (Exception e) {
            flowFile = session.putAttribute(flowFile, IOTDB_ERROR_MESSAGE, e.getMessage());
            this.getLogger().error("IoTDB query failed {}", new Object[]{flowFile, e});
            session.transfer(flowFile, REL_FAILURE);
        }
    }

    private Record getRecord(RecordSchema schema, RowRecord rowRecord) {
        LinkedHashMap<String, Object> row = new LinkedHashMap<String, Object>();
        Iterator recordFieldNames = schema.getFieldNames().iterator();
        row.put((String)recordFieldNames.next(), rowRecord.getTimestamp());
        Iterator rowRecordFields = rowRecord.getFields().iterator();
        while (recordFieldNames.hasNext()) {
            String recordFieldName = (String)recordFieldNames.next();
            if (!rowRecordFields.hasNext()) continue;
            Field rowRecordField = (Field)rowRecordFields.next();
            TSDataType dataType = rowRecordField.getDataType();
            Object objectValue = rowRecordField.getObjectValue(dataType);
            row.put(recordFieldName, objectValue);
        }
        return new MapRecord(schema, row);
    }

    private RecordSchema getRecordSchema(SessionDataSet sessionDataSet) {
        Iterator columnTypes = sessionDataSet.getColumnTypes().iterator();
        Iterator columnNames = sessionDataSet.getColumnNames().iterator();
        ArrayList<RecordField> recordFields = new ArrayList<RecordField>();
        while (columnNames.hasNext()) {
            String recordFieldName = (String)columnNames.next();
            String columnType = (String)columnTypes.next();
            RecordFieldType recordFieldType = this.getType(columnType);
            DataType recordDataType = recordFieldType.getDataType();
            RecordField recordField = new RecordField(recordFieldName, recordDataType);
            recordFields.add(recordField);
        }
        return new SimpleRecordSchema(recordFields);
    }
}

