/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.aws.dynamodb;

import java.io.ByteArrayOutputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processors.aws.dynamodb.AbstractDynamoDBProcessor;
import org.apache.nifi.processors.aws.dynamodb.DeleteDynamoDB;
import org.apache.nifi.processors.aws.dynamodb.GetDynamoDB;
import org.apache.nifi.processors.aws.dynamodb.ItemKeys;
import org.apache.nifi.processors.aws.dynamodb.PutDynamoDBRecord;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse;
import software.amazon.awssdk.services.dynamodb.model.PutRequest;
import software.amazon.awssdk.services.dynamodb.model.WriteRequest;

@SupportsBatching
@SeeAlso(value={DeleteDynamoDB.class, GetDynamoDB.class, PutDynamoDBRecord.class})
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@Tags(value={"Amazon", "DynamoDB", "AWS", "Put", "Insert"})
@CapabilityDescription(value="Puts a document from DynamoDB based on hash and range key.  The table can have either hash and range or hash key alone. Currently the keys supported are string and number and value can be json document. In case of hash and range keys both key are required for the operation. The FlowFile content must be JSON. FlowFile content is mapped to the specified Json Document attribute in the DynamoDB item.")
@WritesAttributes(value={@WritesAttribute(attribute="dynamodb.key.error.unprocessed", description="DynamoDB unprocessed keys"), @WritesAttribute(attribute="dynmodb.range.key.value.error", description="DynamoDB range key error"), @WritesAttribute(attribute="dynamodb.key.error.not.found", description="DynamoDB key not found"), @WritesAttribute(attribute="dynamodb.error.exception.message", description="DynamoDB exception message"), @WritesAttribute(attribute="dynamodb.error.code", description="DynamoDB error code"), @WritesAttribute(attribute="dynamodb.error.message", description="DynamoDB error message"), @WritesAttribute(attribute="dynamodb.error.service", description="DynamoDB error service"), @WritesAttribute(attribute="dynamodb.error.retryable", description="DynamoDB error is retryable"), @WritesAttribute(attribute="dynamodb.error.request.id", description="DynamoDB error request id"), @WritesAttribute(attribute="dynamodb.error.status.code", description="DynamoDB error status code"), @WritesAttribute(attribute="dynamodb.item.io.error", description="IO exception message on creating item")})
@ReadsAttributes(value={@ReadsAttribute(attribute="  dynamodb.item.hash.key.value", description="Items hash key value"), @ReadsAttribute(attribute="  dynamodb.item.range.key.value", description="Items range key value")})
@SystemResourceConsideration(resource=SystemResource.MEMORY)
public class PutDynamoDB
extends AbstractDynamoDBProcessor {
    public static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(TABLE, REGION, AWS_CREDENTIALS_PROVIDER_SERVICE, JSON_DOCUMENT, HASH_KEY_NAME, RANGE_KEY_NAME, HASH_KEY_VALUE, RANGE_KEY_VALUE, HASH_KEY_VALUE_TYPE, RANGE_KEY_VALUE_TYPE, DOCUMENT_CHARSET, BATCH_SIZE, TIMEOUT, ENDPOINT_OVERRIDE, SSL_CONTEXT_SERVICE, PROXY_CONFIGURATION_SERVICE);
    public static final int DYNAMODB_MAX_ITEM_SIZE = 409600;

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    public void onTrigger(ProcessContext context, ProcessSession session) {
        List failedFlowFiles;
        List flowFiles = session.get(context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger().intValue());
        if (flowFiles == null || flowFiles.size() == 0) {
            return;
        }
        HashMap<ItemKeys, FlowFile> keysToFlowFileMap = new HashMap<ItemKeys, FlowFile>();
        String table = context.getProperty(TABLE).evaluateAttributeExpressions().getValue();
        String hashKeyName = context.getProperty(HASH_KEY_NAME).evaluateAttributeExpressions().getValue();
        String rangeKeyName = context.getProperty(RANGE_KEY_NAME).evaluateAttributeExpressions().getValue();
        String jsonDocument = context.getProperty(JSON_DOCUMENT).evaluateAttributeExpressions().getValue();
        String charset = context.getProperty(DOCUMENT_CHARSET).evaluateAttributeExpressions().getValue();
        HashMap tableNameRequestItemsMap = new HashMap();
        ArrayList<WriteRequest> requestItems = new ArrayList<WriteRequest>();
        tableNameRequestItemsMap.put(table, requestItems);
        for (FlowFile flowFile : flowFiles) {
            AttributeValue hashKeyValue = this.getAttributeValue(context, HASH_KEY_VALUE_TYPE, HASH_KEY_VALUE, flowFile.getAttributes());
            AttributeValue rangeKeyValue = this.getAttributeValue(context, RANGE_KEY_VALUE_TYPE, RANGE_KEY_VALUE, flowFile.getAttributes());
            if (!this.isHashKeyValueConsistent(hashKeyName, hashKeyValue, session, flowFile) || !this.isRangeKeyValueConsistent(rangeKeyName, rangeKeyValue, session, flowFile)) continue;
            if (!this.isDataValid(flowFile, jsonDocument)) {
                flowFile = session.putAttribute(flowFile, "dynamodb.item.size.error", "Max size of item + attribute should be 400kb but was " + flowFile.getSize() + jsonDocument.length());
                session.transfer(flowFile, REL_FAILURE);
                continue;
            }
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            session.exportTo(flowFile, (OutputStream)baos);
            HashMap<String, AttributeValue> item = new HashMap<String, AttributeValue>();
            item.put(hashKeyName, hashKeyValue);
            if (!PutDynamoDB.isBlank((AttributeValue)rangeKeyValue)) {
                item.put(rangeKeyName, rangeKeyValue);
            }
            String jsonText = IOUtils.toString((byte[])baos.toByteArray(), (String)charset);
            AttributeValue jsonValue = (AttributeValue)AttributeValue.builder().s(jsonText).build();
            item.put(jsonDocument, jsonValue);
            PutRequest putRequest = (PutRequest)PutRequest.builder().item(item).build();
            WriteRequest writeRequest = (WriteRequest)WriteRequest.builder().putRequest(putRequest).build();
            requestItems.add(writeRequest);
            keysToFlowFileMap.put(new ItemKeys(hashKeyValue, rangeKeyValue), flowFile);
        }
        if (keysToFlowFileMap.isEmpty()) {
            return;
        }
        DynamoDbClient client = (DynamoDbClient)this.getClient(context);
        try {
            List unprocessedItems;
            BatchWriteItemRequest batchWriteItemRequest = (BatchWriteItemRequest)BatchWriteItemRequest.builder().requestItems(tableNameRequestItemsMap).build();
            BatchWriteItemResponse response = client.batchWriteItem(batchWriteItemRequest);
            if (response.unprocessedItems() != null && (unprocessedItems = (List)response.unprocessedItems().get(table)) != null) {
                for (WriteRequest request : unprocessedItems) {
                    Map item = request.putRequest().item();
                    AttributeValue hashKeyValue = (AttributeValue)item.get(hashKeyName);
                    AttributeValue rangeKeyValue = (AttributeValue)item.get(rangeKeyName);
                    this.sendUnprocessedToUnprocessedRelationship(session, keysToFlowFileMap, hashKeyValue, rangeKeyValue);
                }
            }
            for (FlowFile flowFile : keysToFlowFileMap.values()) {
                this.getLogger().debug("Successful posted items to dynamodb : {}", new Object[]{table});
                session.transfer(flowFile, REL_SUCCESS);
            }
        }
        catch (AwsServiceException exception) {
            this.getLogger().error("Could not process flowFiles due to service exception", (Throwable)exception);
            failedFlowFiles = this.processServiceException(session, flowFiles, exception);
            session.transfer((Collection)failedFlowFiles, REL_FAILURE);
        }
        catch (SdkException exception) {
            this.getLogger().error("Could not process flowFiles due to SDK exception", (Throwable)exception);
            failedFlowFiles = this.processSdkException(session, flowFiles, exception);
            session.transfer((Collection)failedFlowFiles, REL_FAILURE);
        }
        catch (Exception exception) {
            this.getLogger().error("Could not process flowFiles", (Throwable)exception);
            failedFlowFiles = this.processException(session, flowFiles, exception);
            session.transfer((Collection)failedFlowFiles, REL_FAILURE);
        }
    }

    private boolean isDataValid(FlowFile flowFile, String jsonDocument) {
        return flowFile.getSize() + (long)jsonDocument.length() < 409600L;
    }
}

