/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
import org.apache.nifi.distributed.cache.client.exception.SerializationException;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

@SupportsBatching
@Tags(value={"map", "cache", "put", "distributed"})
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription(value="Gets the content of a FlowFile and puts it to a map cache, using a cache key computed from FlowFile attributes. If the cache already contains the entry and the cache update strategy is 'keep original' the entry is not replaced.'")
@WritesAttribute(attribute="cached", description="All FlowFiles will have an attribute 'cached'. The value of this attribute is true, is the FlowFile is cached, otherwise false.")
@SeeAlso(classNames={"org.apache.nifi.distributed.cache.client.MapCacheClientService", "org.apache.nifi.distributed.cache.server.map.MapCacheServer", "org.apache.nifi.processors.standard.FetchDistributedMapCache"})
public class PutDistributedMapCache
extends AbstractProcessor {
    public static final String CACHED_ATTRIBUTE_NAME = "cached";
    public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder().name("Distributed Cache Service").description("The Controller Service that is used to cache flow files").required(true).identifiesControllerService(DistributedMapCacheClient.class).build();
    public static final PropertyDescriptor CACHE_ENTRY_IDENTIFIER = new PropertyDescriptor.Builder().name("Cache Entry Identifier").description("A FlowFile attribute, or the results of an Attribute Expression Language statement, which will be evaluated against a FlowFile in order to determine the cache key").required(true).addValidator(StandardValidators.createAttributeExpressionLanguageValidator((AttributeExpression.ResultType)AttributeExpression.ResultType.STRING, (boolean)true)).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final AllowableValue CACHE_UPDATE_REPLACE = new AllowableValue("replace", "Replace if present", "Adds the specified entry to the cache, replacing any value that is currently set.");
    public static final AllowableValue CACHE_UPDATE_KEEP_ORIGINAL = new AllowableValue("keeporiginal", "Keep original", "Adds the specified entry to the cache, if the key does not exist.");
    public static final PropertyDescriptor CACHE_UPDATE_STRATEGY = new PropertyDescriptor.Builder().name("Cache Update Strategy").description("Determines how the cache is updated if the cache already contains the entry").required(true).allowableValues(new DescribedValue[]{CACHE_UPDATE_REPLACE, CACHE_UPDATE_KEEP_ORIGINAL}).defaultValue(CACHE_UPDATE_REPLACE.getValue()).build();
    public static final PropertyDescriptor CACHE_ENTRY_MAX_BYTES = new PropertyDescriptor.Builder().name("Max Cache Size").description("The maximum amount of data to put into cache").required(false).addValidator(StandardValidators.DATA_SIZE_VALIDATOR).defaultValue("1 MB").expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(CACHE_ENTRY_IDENTIFIER, DISTRIBUTED_CACHE_SERVICE, CACHE_UPDATE_STRATEGY, CACHE_ENTRY_MAX_BYTES);
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Any FlowFile that is successfully inserted into cache will be routed to this relationship").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Any FlowFile that cannot be inserted into the cache will be routed to this relationship").build();
    private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE);
    private final Serializer<String> keySerializer = new StringSerializer();
    private final Serializer<byte[]> valueSerializer = new CacheValueSerializer();
    private final Deserializer<byte[]> valueDeserializer = new CacheValueDeserializer();

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        ComponentLog logger = this.getLogger();
        String cacheKey = context.getProperty(CACHE_ENTRY_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue();
        if (StringUtils.isBlank((CharSequence)cacheKey)) {
            logger.error("FlowFile {} has no attribute for given Cache Entry Identifier", new Object[]{flowFile});
            flowFile = session.penalize(flowFile);
            session.transfer(flowFile, REL_FAILURE);
            return;
        }
        DistributedMapCacheClient cache = (DistributedMapCacheClient)context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
        try {
            byte[] oldValue;
            long maxCacheEntrySize = context.getProperty(CACHE_ENTRY_MAX_BYTES).asDataSize(DataUnit.B).longValue();
            long flowFileSize = flowFile.getSize();
            if (flowFileSize > maxCacheEntrySize) {
                logger.warn("Flow file {} size {} exceeds the max cache entry size ({} B).", new Object[]{flowFile, flowFileSize, maxCacheEntrySize});
                session.transfer(flowFile, REL_FAILURE);
                return;
            }
            if (flowFileSize == 0L) {
                logger.warn("Flow file {} is empty, there is nothing to cache.", new Object[]{flowFile});
                session.transfer(flowFile, REL_FAILURE);
                return;
            }
            ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
            session.exportTo(flowFile, (OutputStream)byteStream);
            byte[] cacheValue = byteStream.toByteArray();
            String updateStrategy = context.getProperty(CACHE_UPDATE_STRATEGY).getValue();
            boolean cached = false;
            if (updateStrategy.equals(CACHE_UPDATE_REPLACE.getValue())) {
                cache.put((Object)cacheKey, (Object)cacheValue, this.keySerializer, this.valueSerializer);
                cached = true;
            } else if (updateStrategy.equals(CACHE_UPDATE_KEEP_ORIGINAL.getValue()) && (oldValue = (byte[])cache.getAndPutIfAbsent((Object)cacheKey, (Object)cacheValue, this.keySerializer, this.valueSerializer, this.valueDeserializer)) == null) {
                cached = true;
            }
            flowFile = session.putAttribute(flowFile, CACHED_ATTRIBUTE_NAME, String.valueOf(cached));
            if (cached) {
                session.transfer(flowFile, REL_SUCCESS);
            } else {
                session.transfer(flowFile, REL_FAILURE);
            }
        }
        catch (IOException e) {
            flowFile = session.penalize(flowFile);
            session.transfer(flowFile, REL_FAILURE);
            logger.error("Unable to communicate with cache when processing {}", new Object[]{flowFile, e});
        }
    }

    public void migrateProperties(PropertyConfiguration config) {
        config.renameProperty("Cache update strategy", CACHE_UPDATE_STRATEGY.getName());
        config.renameProperty("Max cache entry size", CACHE_ENTRY_MAX_BYTES.getName());
    }

    public static class StringSerializer
    implements Serializer<String> {
        public void serialize(String value, OutputStream out) throws SerializationException, IOException {
            out.write(value.getBytes(StandardCharsets.UTF_8));
        }
    }

    public static class CacheValueSerializer
    implements Serializer<byte[]> {
        public void serialize(byte[] bytes, OutputStream out) throws SerializationException, IOException {
            out.write(bytes);
        }
    }

    public static class CacheValueDeserializer
    implements Deserializer<byte[]> {
        public byte[] deserialize(byte[] input) throws DeserializationException, IOException {
            if (input == null || input.length == 0) {
                return null;
            }
            return input;
        }
    }
}

