/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.aws.kinesis.stream;

import java.io.ByteArrayOutputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.aws.AbstractAwsSyncProcessor;
import org.apache.nifi.processors.aws.kinesis.KinesisProcessorUtils;
import org.apache.nifi.processors.aws.kinesis.stream.ConsumeKinesisStream;
import org.apache.nifi.processors.aws.region.RegionUtil;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.KinesisClientBuilder;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.PutRecordsResultEntry;

@SupportsBatching
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@Tags(value={"amazon", "aws", "kinesis", "put", "stream"})
@CapabilityDescription(value="Sends the contents to a specified Amazon Kinesis. In order to send data to Kinesis, the stream name has to be specified.")
@WritesAttributes(value={@WritesAttribute(attribute="aws.kinesis.error.message", description="Error message on posting message to AWS Kinesis"), @WritesAttribute(attribute="aws.kinesis.error.code", description="Error code for the message when posting to AWS Kinesis"), @WritesAttribute(attribute="aws.kinesis.sequence.number", description="Sequence number for the message when posting to AWS Kinesis"), @WritesAttribute(attribute="aws.kinesis.shard.id", description="Shard id of the message posted to AWS Kinesis")})
@SeeAlso(value={ConsumeKinesisStream.class})
public class PutKinesisStream
extends AbstractAwsSyncProcessor<KinesisClient, KinesisClientBuilder> {
    public static final String AWS_KINESIS_ERROR_MESSAGE = "aws.kinesis.error.message";
    public static final String AWS_KINESIS_ERROR_CODE = "aws.kinesis.error.code";
    public static final String AWS_KINESIS_SHARD_ID = "aws.kinesis.shard.id";
    public static final String AWS_KINESIS_SEQUENCE_NUMBER = "aws.kinesis.sequence.number";
    public static final PropertyDescriptor KINESIS_PARTITION_KEY = new PropertyDescriptor.Builder().name("Stream Partition Key").description("The partition key attribute.  If it is not set, a random value is used").expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).defaultValue("${kinesis.partition.key}").required(false).addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR).build();
    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder().name("Message Batch Size").description("Batch size for messages (1-500).").defaultValue("250").required(false).addValidator(StandardValidators.createLongValidator((long)1L, (long)500L, (boolean)true)).sensitive(false).build();
    public static final PropertyDescriptor MAX_MESSAGE_BUFFER_SIZE_MB = new PropertyDescriptor.Builder().name("Max Message Buffer Size").description("Max message buffer size defined with standard data size units").defaultValue("1 MB").required(false).addValidator(StandardValidators.DATA_SIZE_VALIDATOR).sensitive(false).build();
    static final PropertyDescriptor KINESIS_STREAM_NAME = new PropertyDescriptor.Builder().name("Stream Name").description("The name of Kinesis Stream").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(KINESIS_STREAM_NAME, RegionUtil.REGION, RegionUtil.CUSTOM_REGION, AWS_CREDENTIALS_PROVIDER_SERVICE, KINESIS_PARTITION_KEY, BATCH_SIZE, MAX_MESSAGE_BUFFER_SIZE_MB, TIMEOUT, PROXY_CONFIGURATION_SERVICE, ENDPOINT_OVERRIDE);
    protected Random randomPartitionKeyGenerator = new Random();

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    public void onTrigger(ProcessContext context, ProcessSession session) {
        int batchSize = context.getProperty(BATCH_SIZE).asInteger();
        long maxBufferSizeBytes = context.getProperty(MAX_MESSAGE_BUFFER_SIZE_MB).asDataSize(DataUnit.B).longValue();
        List flowFiles = KinesisProcessorUtils.filterMessagesByMaxSize((ProcessSession)session, (int)batchSize, (long)maxBufferSizeBytes, (String)AWS_KINESIS_ERROR_MESSAGE, (ComponentLog)this.getLogger());
        HashMap<String, List> hashFlowFiles = new HashMap<String, List>();
        HashMap<String, List> recordHash = new HashMap<String, List>();
        KinesisClient client = (KinesisClient)this.getClient(context);
        try {
            String streamName;
            ArrayList<FlowFile> failedFlowFiles = new ArrayList<FlowFile>();
            ArrayList<FlowFile> successfulFlowFiles = new ArrayList<FlowFile>();
            for (FlowFile flowFile : flowFiles) {
                streamName = context.getProperty(KINESIS_STREAM_NAME).evaluateAttributeExpressions(flowFile).getValue();
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                session.exportTo(flowFile, (OutputStream)baos);
                PutRecordsRequestEntry.Builder recordBuilder = PutRecordsRequestEntry.builder().data(SdkBytes.fromByteArray((byte[])baos.toByteArray()));
                String partitionKey = context.getProperty(KINESIS_PARTITION_KEY).evaluateAttributeExpressions(flowFile).getValue();
                recordBuilder.partitionKey(StringUtils.isBlank((CharSequence)partitionKey) ? Integer.toString(this.randomPartitionKeyGenerator.nextInt()) : partitionKey);
                hashFlowFiles.computeIfAbsent(streamName, key -> new ArrayList()).add(flowFile);
                recordHash.computeIfAbsent(streamName, key -> new ArrayList()).add((PutRecordsRequestEntry)recordBuilder.build());
            }
            for (Map.Entry entry : recordHash.entrySet()) {
                streamName = (String)entry.getKey();
                List records = (List)entry.getValue();
                if (!records.isEmpty()) {
                    PutRecordsRequest putRecordRequest = (PutRecordsRequest)PutRecordsRequest.builder().streamName(streamName).records((Collection)records).build();
                    PutRecordsResponse response = client.putRecords(putRecordRequest);
                    List responseEntries = response.records();
                    for (int i = 0; i < responseEntries.size(); ++i) {
                        PutRecordsResultEntry entry2 = (PutRecordsResultEntry)responseEntries.get(i);
                        FlowFile flowFile = (FlowFile)((List)hashFlowFiles.get(streamName)).get(i);
                        HashMap<String, String> attributes = new HashMap<String, String>();
                        attributes.put(AWS_KINESIS_SHARD_ID, entry2.shardId());
                        attributes.put(AWS_KINESIS_SEQUENCE_NUMBER, entry2.sequenceNumber());
                        if (StringUtils.isNotBlank((CharSequence)entry2.errorCode())) {
                            attributes.put(AWS_KINESIS_ERROR_CODE, entry2.errorCode());
                            attributes.put(AWS_KINESIS_ERROR_MESSAGE, entry2.errorMessage());
                            flowFile = session.putAllAttributes(flowFile, attributes);
                            failedFlowFiles.add(flowFile);
                            continue;
                        }
                        flowFile = session.putAllAttributes(flowFile, attributes);
                        successfulFlowFiles.add(flowFile);
                    }
                }
                ((List)recordHash.get(streamName)).clear();
                records.clear();
            }
            if (!failedFlowFiles.isEmpty()) {
                session.transfer(failedFlowFiles, REL_FAILURE);
                this.getLogger().error("Failed to publish to kinesis records {}", new Object[]{failedFlowFiles});
            }
            if (!successfulFlowFiles.isEmpty()) {
                session.transfer(successfulFlowFiles, REL_SUCCESS);
                this.getLogger().debug("Successfully published to kinesis records {}", new Object[]{successfulFlowFiles});
            }
        }
        catch (Exception exception) {
            this.getLogger().error("Failed to publish due to exception {} flowfiles {} ", new Object[]{exception, flowFiles});
            session.transfer((Collection)flowFiles, REL_FAILURE);
            context.yield();
        }
    }

    public void migrateProperties(PropertyConfiguration config) {
        super.migrateProperties(config);
        config.renameProperty("amazon-kinesis-stream-partition-key", KINESIS_PARTITION_KEY.getName());
        config.renameProperty("message-batch-size", BATCH_SIZE.getName());
        config.renameProperty("max-message-buffer-size", MAX_MESSAGE_BUFFER_SIZE_MB.getName());
        config.renameProperty("kinesis-stream-name", KINESIS_STREAM_NAME.getName());
    }

    protected KinesisClientBuilder createClientBuilder(ProcessContext context) {
        return KinesisClient.builder();
    }
}

