/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.gcp.pubsub.consume;

import com.google.pubsub.v1.ReceivedMessage;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.gcp.pubsub.ConsumeGCPubSub;
import org.apache.nifi.processors.gcp.pubsub.consume.PubSubMessageConverter;
import org.apache.nifi.provenance.ProvenanceReporter;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;

public abstract class AbstractPubSubMessageConverter
implements PubSubMessageConverter {
    protected final RecordReaderFactory readerFactory;
    protected final RecordSetWriterFactory writerFactory;
    protected final ComponentLog logger;

    protected AbstractPubSubMessageConverter(RecordReaderFactory readerFactory, RecordSetWriterFactory writerFactory, ComponentLog logger) {
        this.readerFactory = readerFactory;
        this.writerFactory = writerFactory;
        this.logger = logger;
    }

    @Override
    public void toFlowFiles(ProcessSession session, List<ReceivedMessage> messages, List<String> ackIds, String subscriptionName) {
        try {
            HashMap<RecordSchema, RecordGroup> recordGroups = new HashMap<RecordSchema, RecordGroup>();
            HashMap attributes = new HashMap();
            for (ReceivedMessage message : messages) {
                if (message.hasMessage()) {
                    byte[] payload = message.getMessage().getData().toByteArray();
                    try (ByteArrayInputStream in = new ByteArrayInputStream(payload);
                         RecordReader valueRecordReader = this.readerFactory.createRecordReader(attributes, (InputStream)in, (long)payload.length, this.logger);){
                        Record record;
                        while ((record = valueRecordReader.nextRecord()) != null) {
                            RecordSchema recordSchema = record.getSchema();
                            RecordSchema writeSchema = this.writerFactory.getSchema(attributes, this.getRecordSchema(recordSchema));
                            RecordGroup recordGroup = recordGroups.computeIfAbsent(writeSchema, schema -> {
                                FlowFile flowFile = session.create();
                                try {
                                    OutputStream out = session.write(flowFile);
                                    RecordSetWriter writer = this.writerFactory.createWriter(this.logger, writeSchema, out, attributes);
                                    writer.beginRecordSet();
                                    return new RecordGroup(flowFile, writer);
                                }
                                catch (Exception e) {
                                    session.remove(flowFile);
                                    throw new ProcessException("Failed to create RecordSetWriter", (Throwable)e);
                                }
                            });
                            recordGroup.writer().write(this.getRecord(record, message));
                        }
                    }
                    catch (IOException | SchemaNotFoundException | MalformedRecordException e) {
                        this.logger.error("Failed to parse the record. Transfer to a 'parse.failure' relationship", e);
                        this.handleParseFailure(session, message, payload, ackIds, subscriptionName);
                        continue;
                    }
                }
                ackIds.add(message.getAckId());
            }
            this.finishRecordGroups(session, recordGroups, subscriptionName);
        }
        catch (Exception e) {
            throw new ProcessException("FlowFile Record conversion failed", (Throwable)e);
        }
    }

    protected abstract Record getRecord(Record var1, ReceivedMessage var2);

    protected abstract RecordSchema getRecordSchema(RecordSchema var1);

    private void handleParseFailure(ProcessSession session, ReceivedMessage message, byte[] payload, List<String> ackIds, String subscriptionName) {
        FlowFile flowFile = session.create();
        flowFile = session.putAllAttributes(flowFile, message.getMessage().getAttributesMap());
        flowFile = session.putAttribute(flowFile, "gcp.pubsub.ackId", message.getAckId());
        flowFile = session.putAttribute(flowFile, "gcp.pubsub.messageSize", String.valueOf(message.getSerializedSize()));
        flowFile = session.putAttribute(flowFile, "gcp.pubsub.messageId", message.getMessage().getMessageId());
        flowFile = session.putAttribute(flowFile, "gcp.pubsub.attributesCount", String.valueOf(message.getMessage().getAttributesCount()));
        flowFile = session.putAttribute(flowFile, "gcp.pubsub.publishTime", String.valueOf(message.getMessage().getPublishTime().getSeconds()));
        flowFile = session.putAttribute(flowFile, "gcp.pubsub.subscription", subscriptionName);
        flowFile = session.write(flowFile, out -> out.write(payload));
        session.transfer(flowFile, ConsumeGCPubSub.REL_PARSE_FAILURE);
        session.adjustCounter("Records Received from " + subscriptionName, 1L, false);
        ackIds.add(message.getAckId());
    }

    private void finishRecordGroups(ProcessSession session, Map<RecordSchema, RecordGroup> recordGroups, String subscriptionName) {
        for (RecordGroup recordGroup : recordGroups.values()) {
            int recordCount;
            HashMap<String, String> newAttributes;
            try (RecordSetWriter writer = recordGroup.writer();){
                WriteResult writeResult = writer.finishRecordSet();
                newAttributes = new HashMap<String, String>(writeResult.getAttributes());
                newAttributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
                newAttributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
                newAttributes.put("gcp.pubsub.subscription", subscriptionName);
                recordCount = writeResult.getRecordCount();
            }
            catch (IOException e) {
                throw new ProcessException("Failed to finish writing records", (Throwable)e);
            }
            FlowFile flowFile = recordGroup.flowFile();
            flowFile = session.putAllAttributes(flowFile, newAttributes);
            ProvenanceReporter provenanceReporter = session.getProvenanceReporter();
            provenanceReporter.receive(flowFile, subscriptionName);
            session.transfer(flowFile, ConsumeGCPubSub.REL_SUCCESS);
            session.adjustCounter("Records Received from " + subscriptionName, (long)recordCount, false);
        }
    }

    private record RecordGroup(FlowFile flowFile, RecordSetWriter writer) {
    }
}

