/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processor.util.list;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
import org.apache.nifi.distributed.cache.client.exception.SerializationException;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.list.EntityListing;
import org.apache.nifi.processor.util.list.ListableEntity;
import org.apache.nifi.processor.util.list.ListedEntityTracker;
import org.apache.nifi.util.StringUtils;

@TriggerSerially
@Stateful(scopes={Scope.LOCAL, Scope.CLUSTER}, description="After a listing of resources is performed, the latest timestamp of any of the resources is stored in the component's state. The scope used depends on the implementation.")
public abstract class AbstractListProcessor<T extends ListableEntity>
extends AbstractProcessor {
    public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder().name("Distributed Cache Service").description("NOTE: This property is used merely for migration from old NiFi version before state management was introduced at version 0.5.0. The stored value in the cache service will be migrated into the state when this processor is started at the first time. The specified Controller Service was used to maintain state about what had been pulled from the remote server so that if a new node begins pulling data, it won't duplicate all of the work that has been done. If not specified, the information was not shared across the cluster. This property did not need to be set for standalone instances of NiFi but was supposed to be configured if NiFi had been running within a cluster.").required(false).identifiesControllerService(DistributedMapCacheClient.class).build();
    public static final AllowableValue PRECISION_AUTO_DETECT = new AllowableValue("auto-detect", "Auto Detect", "Automatically detect time unit deterministically based on candidate entries timestamp. Please note that this option may take longer to list entities unnecessarily, if none of entries has a precise precision timestamp. E.g. even if a target system supports millis, if all entries only have timestamps without millis, such as '2017-06-16 09:06:34.000', then its precision is determined as 'seconds'.");
    public static final AllowableValue PRECISION_MILLIS = new AllowableValue("millis", "Milliseconds", "This option provides the minimum latency for an entry from being available to being listed if target system supports millis, if not, use other options.");
    public static final AllowableValue PRECISION_SECONDS = new AllowableValue("seconds", "Seconds", "For a target system that does not have millis precision, but has in seconds.");
    public static final AllowableValue PRECISION_MINUTES = new AllowableValue("minutes", "Minutes", "For a target system that only supports precision in minutes.");
    public static final PropertyDescriptor TARGET_SYSTEM_TIMESTAMP_PRECISION = new PropertyDescriptor.Builder().name("target-system-timestamp-precision").displayName("Target System Timestamp Precision").description("Specify timestamp precision at the target system. Since this processor uses timestamp of entities to decide which should be listed, it is crucial to use the right timestamp precision.").required(true).allowableValues(new AllowableValue[]{PRECISION_AUTO_DETECT, PRECISION_MILLIS, PRECISION_SECONDS, PRECISION_MINUTES}).defaultValue(PRECISION_AUTO_DETECT.getValue()).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are received are routed to success").build();
    public static final AllowableValue BY_TIMESTAMPS = new AllowableValue("timestamps", "Tracking Timestamps", "This strategy tracks the latest timestamp of listed entity to determine new/updated entities. Since it only tracks few timestamps, it can manage listing state efficiently. However, any newly added, or updated entity having timestamp older than the tracked latest timestamp can not be picked by this strategy. For example, such situation can happen in a file system if a file with old timestamp is copied or moved into the target directory without its last modified timestamp being updated.");
    public static final AllowableValue BY_ENTITIES = new AllowableValue("entities", "Tracking Entities", "This strategy tracks information of all the listed entities within the latest 'Entity Tracking Time Window' to determine new/updated entities. This strategy can pick entities having old timestamp that can be missed with 'Tracing Timestamps'. However additional DistributedMapCache controller service is required and more JVM heap memory is used. See the description of 'Entity Tracking Time Window' property for further details on how it works.");
    public static final PropertyDescriptor LISTING_STRATEGY = new PropertyDescriptor.Builder().name("listing-strategy").displayName("Listing Strategy").description("Specify how to determine new/updated entities. See each strategy descriptions for detail.").required(true).allowableValues(new AllowableValue[]{BY_TIMESTAMPS, BY_ENTITIES}).defaultValue(BY_TIMESTAMPS.getValue()).build();
    private volatile Long lastListedLatestEntryTimestampMillis = null;
    private volatile Long lastProcessedLatestEntryTimestampMillis = 0L;
    private volatile Long lastRunTimeNanos = 0L;
    private volatile boolean justElectedPrimaryNode = false;
    private volatile boolean resetState = false;
    private volatile boolean resetEntityTrackingState = false;
    private volatile List<String> latestIdentifiersProcessed = new ArrayList<String>();
    private volatile ListedEntityTracker<T> listedEntityTracker;
    public static final Map<TimeUnit, Long> LISTING_LAG_MILLIS;
    static final String LATEST_LISTED_ENTRY_TIMESTAMP_KEY = "listing.timestamp";
    static final String LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY = "processed.timestamp";
    static final String IDENTIFIER_PREFIX = "id";

    public File getPersistenceFile() {
        return new File("conf/state/" + this.getIdentifier());
    }

    public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
        if (this.isConfigurationRestored() && this.isListingResetNecessary(descriptor)) {
            this.resetTimeStates();
            this.resetState = true;
            this.resetEntityTrackingState = true;
        }
    }

    public Set<Relationship> getRelationships() {
        HashSet<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(REL_SUCCESS);
        return relationships;
    }

    protected final Collection<ValidationResult> customValidate(ValidationContext context) {
        ArrayList<ValidationResult> results = new ArrayList<ValidationResult>();
        String listingStrategy = context.getProperty(LISTING_STRATEGY).getValue();
        if (BY_ENTITIES.equals((Object)listingStrategy)) {
            ListedEntityTracker.validateProperties(context, results, this.getStateScope((PropertyContext)context));
        }
        this.customValidate(context, results);
        return results;
    }

    protected void customValidate(ValidationContext validationContext, Collection<ValidationResult> validationResults) {
    }

    @OnPrimaryNodeStateChange
    public void onPrimaryNodeChange(PrimaryNodeState newState) {
        this.justElectedPrimaryNode = newState == PrimaryNodeState.ELECTED_PRIMARY_NODE;
    }

    @OnScheduled
    public final void updateState(ProcessContext context) throws IOException {
        String path = this.getPath(context);
        DistributedMapCacheClient client = (DistributedMapCacheClient)context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
        StateMap stateMap = context.getStateManager().getState(this.getStateScope((PropertyContext)context));
        if (stateMap.getVersion() == -1L) {
            try {
                this.migrateState(path, client, context.getStateManager(), this.getStateScope((PropertyContext)context));
            }
            catch (IOException ioe) {
                throw new IOException("Failed to properly migrate state to State Manager", ioe);
            }
        }
        if (this.lastListedLatestEntryTimestampMillis != null && stateMap.get(LATEST_LISTED_ENTRY_TIMESTAMP_KEY) == null) {
            this.getLogger().info("Detected that state was cleared for this component.  Resetting internal values.");
            this.resetTimeStates();
        }
        if (this.resetState) {
            context.getStateManager().clear(this.getStateScope((PropertyContext)context));
            this.resetState = false;
        }
    }

    private void migrateState(String path, DistributedMapCacheClient client, StateManager stateManager, Scope scope) throws IOException {
        File persistenceFile;
        Long minTimestamp = null;
        if (client != null) {
            StringSerDe serde = new StringSerDe();
            String serializedState = (String)client.get((Object)this.getKey(path), (Serializer)serde, (Deserializer)serde);
            if (serializedState != null && !serializedState.isEmpty()) {
                EntityListing listing = this.deserialize(serializedState);
                minTimestamp = listing.getLatestTimestamp().getTime();
            }
            if (client != null) {
                try {
                    client.remove((Object)path, (Serializer)new StringSerDe());
                }
                catch (IOException ioe) {
                    this.getLogger().warn("Failed to remove entry from Distributed Cache Service. However, the state has already been migrated to use the new State Management service, so the Distributed Cache Service is no longer needed.");
                }
            }
        }
        if ((persistenceFile = this.getPersistenceFile()).exists()) {
            Properties props = new Properties();
            try (FileInputStream fis = new FileInputStream(persistenceFile);){
                props.load(fis);
            }
            String locallyPersistedValue = props.getProperty(path);
            if (locallyPersistedValue != null) {
                EntityListing listing = this.deserialize(locallyPersistedValue);
                long localTimestamp = listing.getLatestTimestamp().getTime();
                if (minTimestamp == null || localTimestamp > minTimestamp) {
                    minTimestamp = localTimestamp;
                    this.latestIdentifiersProcessed.clear();
                    this.latestIdentifiersProcessed.addAll(listing.getMatchingIdentifiers());
                }
            }
            if (persistenceFile.exists() && !persistenceFile.delete()) {
                this.getLogger().warn("Migrated state but failed to delete local persistence file");
            }
        }
        if (minTimestamp != null) {
            this.persist(minTimestamp, minTimestamp, this.latestIdentifiersProcessed, stateManager, scope);
        }
    }

    private void persist(long latestListedEntryTimestampThisCycleMillis, long lastProcessedLatestEntryTimestampMillis, List<String> processedIdentifiesWithLatestTimestamp, StateManager stateManager, Scope scope) throws IOException {
        HashMap<String, String> updatedState = new HashMap<String, String>(processedIdentifiesWithLatestTimestamp.size() + 2);
        updatedState.put(LATEST_LISTED_ENTRY_TIMESTAMP_KEY, String.valueOf(latestListedEntryTimestampThisCycleMillis));
        updatedState.put(LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, String.valueOf(lastProcessedLatestEntryTimestampMillis));
        for (int i = 0; i < processedIdentifiesWithLatestTimestamp.size(); ++i) {
            updatedState.put("id." + i, processedIdentifiesWithLatestTimestamp.get(i));
        }
        stateManager.setState(updatedState, scope);
    }

    protected String getKey(String directory) {
        return this.getIdentifier() + ".lastListingTime." + directory;
    }

    private EntityListing deserialize(String serializedState) throws JsonParseException, JsonMappingException, IOException {
        ObjectMapper mapper = new ObjectMapper();
        return (EntityListing)mapper.readValue(serializedState, EntityListing.class);
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        String listingStrategy = context.getProperty(LISTING_STRATEGY).getValue();
        if (BY_TIMESTAMPS.equals((Object)listingStrategy)) {
            this.listByTrackingTimestamps(context, session);
        } else if (BY_ENTITIES.equals((Object)listingStrategy)) {
            this.listByTrackingEntities(context, session);
        } else {
            throw new ProcessException("Unknown listing strategy: " + listingStrategy);
        }
    }

    public void listByTrackingTimestamps(ProcessContext context, ProcessSession session) throws ProcessException {
        List<T> entityList;
        Long minTimestampToListMillis = this.lastListedLatestEntryTimestampMillis;
        if (this.lastListedLatestEntryTimestampMillis == null || this.lastProcessedLatestEntryTimestampMillis == null || this.justElectedPrimaryNode) {
            try {
                boolean noUpdateRequired = false;
                StateMap stateMap = context.getStateManager().getState(this.getStateScope((PropertyContext)context));
                this.latestIdentifiersProcessed.clear();
                for (Map.Entry state : stateMap.toMap().entrySet()) {
                    String k = (String)state.getKey();
                    String v = (String)state.getValue();
                    if (v == null || v.isEmpty()) continue;
                    if (LATEST_LISTED_ENTRY_TIMESTAMP_KEY.equals(k)) {
                        minTimestampToListMillis = Long.parseLong(v);
                        if (minTimestampToListMillis.equals(this.lastListedLatestEntryTimestampMillis)) {
                            noUpdateRequired = true;
                            continue;
                        }
                        this.lastListedLatestEntryTimestampMillis = minTimestampToListMillis;
                        continue;
                    }
                    if (LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY.equals(k)) {
                        this.lastProcessedLatestEntryTimestampMillis = Long.parseLong(v);
                        continue;
                    }
                    if (!k.startsWith(IDENTIFIER_PREFIX)) continue;
                    this.latestIdentifiersProcessed.add(v);
                }
                this.justElectedPrimaryNode = false;
                if (noUpdateRequired) {
                    context.yield();
                    return;
                }
            }
            catch (IOException ioe) {
                this.getLogger().error("Failed to retrieve timestamp of last listing from the State Manager. Will not perform listing until this is accomplished.");
                context.yield();
                return;
            }
        }
        long currentRunTimeNanos = System.nanoTime();
        long currentRunTimeMillis = System.currentTimeMillis();
        try {
            entityList = this.performListing(context, minTimestampToListMillis);
        }
        catch (IOException e) {
            this.getLogger().error("Failed to perform listing on remote host due to {}", new Object[]{e.getMessage()}, (Throwable)e);
            context.yield();
            return;
        }
        if (entityList == null || entityList.isEmpty()) {
            context.yield();
            return;
        }
        Long latestListedEntryTimestampThisCycleMillis = null;
        TreeMap<Long, ArrayList<ListableEntity>> orderedEntries = new TreeMap<Long, ArrayList<ListableEntity>>();
        boolean targetSystemHasMilliseconds = false;
        boolean targetSystemHasSeconds = false;
        for (ListableEntity entity2 : entityList) {
            boolean newEntry;
            long entityTimestampMillis = entity2.getTimestamp();
            if (!targetSystemHasMilliseconds) {
                boolean bl = targetSystemHasMilliseconds = entityTimestampMillis % 1000L > 0L;
            }
            if (!targetSystemHasSeconds) {
                targetSystemHasSeconds = entityTimestampMillis % 60000L > 0L;
            }
            if (!(newEntry = minTimestampToListMillis == null || entityTimestampMillis >= minTimestampToListMillis && entityTimestampMillis >= this.lastProcessedLatestEntryTimestampMillis)) continue;
            ArrayList<ListableEntity> entitiesForTimestamp = (ArrayList<ListableEntity>)orderedEntries.get(entity2.getTimestamp());
            if (entitiesForTimestamp == null) {
                entitiesForTimestamp = new ArrayList<ListableEntity>();
                orderedEntries.put(entity2.getTimestamp(), entitiesForTimestamp);
            }
            entitiesForTimestamp.add(entity2);
        }
        int flowfilesCreated = 0;
        if (orderedEntries.size() > 0) {
            latestListedEntryTimestampThisCycleMillis = (Long)orderedEntries.lastKey();
            String specifiedPrecision = context.getProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION).getValue();
            if (StringUtils.isBlank((String)specifiedPrecision)) {
                specifiedPrecision = this.getDefaultTimePrecision();
            }
            TimeUnit targetSystemTimePrecision = PRECISION_AUTO_DETECT.getValue().equals(specifiedPrecision) ? (targetSystemHasMilliseconds ? TimeUnit.MILLISECONDS : (targetSystemHasSeconds ? TimeUnit.SECONDS : TimeUnit.MINUTES)) : (PRECISION_MILLIS.getValue().equals(specifiedPrecision) ? TimeUnit.MILLISECONDS : (PRECISION_SECONDS.getValue().equals(specifiedPrecision) ? TimeUnit.SECONDS : TimeUnit.MINUTES));
            Long listingLagMillis = LISTING_LAG_MILLIS.get((Object)targetSystemTimePrecision);
            if (latestListedEntryTimestampThisCycleMillis.equals(this.lastListedLatestEntryTimestampMillis)) {
                long listingLagNanos = TimeUnit.MILLISECONDS.toNanos(listingLagMillis);
                if (currentRunTimeNanos - this.lastRunTimeNanos < listingLagNanos || latestListedEntryTimestampThisCycleMillis.equals(this.lastProcessedLatestEntryTimestampMillis) && ((List)orderedEntries.get(latestListedEntryTimestampThisCycleMillis)).stream().allMatch(entity -> this.latestIdentifiersProcessed.contains(entity.getIdentifier()))) {
                    context.yield();
                    return;
                }
            } else {
                long minimumReliableTimestampInFilesystemTimeUnit = targetSystemTimePrecision.convert(currentRunTimeMillis - listingLagMillis, TimeUnit.MILLISECONDS);
                long minimumReliableTimestampMillis = targetSystemTimePrecision.toMillis(minimumReliableTimestampInFilesystemTimeUnit);
                if (minimumReliableTimestampMillis < latestListedEntryTimestampThisCycleMillis) {
                    orderedEntries.remove(latestListedEntryTimestampThisCycleMillis);
                }
            }
            for (Map.Entry timestampEntities : orderedEntries.entrySet()) {
                List entities = (List)timestampEntities.getValue();
                if (((Long)timestampEntities.getKey()).equals(this.lastProcessedLatestEntryTimestampMillis)) {
                    entities = entities.stream().filter(entity -> !this.latestIdentifiersProcessed.contains(entity.getIdentifier())).collect(Collectors.toList());
                }
                for (ListableEntity entity3 : entities) {
                    Map<String, String> attributes = this.createAttributes(entity3, context);
                    FlowFile flowFile = session.create();
                    flowFile = session.putAllAttributes(flowFile, attributes);
                    session.transfer(flowFile, REL_SUCCESS);
                    ++flowfilesCreated;
                }
            }
        }
        if (latestListedEntryTimestampThisCycleMillis != null) {
            boolean processedNewFiles;
            boolean bl = processedNewFiles = flowfilesCreated > 0;
            if (processedNewFiles) {
                if (!((Long)orderedEntries.lastKey()).equals(this.lastProcessedLatestEntryTimestampMillis)) {
                    this.latestIdentifiersProcessed.clear();
                }
                this.latestIdentifiersProcessed.addAll(((List)orderedEntries.lastEntry().getValue()).stream().map(ListableEntity::getIdentifier).collect(Collectors.toList()));
                this.lastProcessedLatestEntryTimestampMillis = (Long)orderedEntries.lastKey();
                this.getLogger().info("Successfully created listing with {} new objects", new Object[]{flowfilesCreated});
                session.commit();
            }
            this.lastRunTimeNanos = currentRunTimeNanos;
            if (!latestListedEntryTimestampThisCycleMillis.equals(this.lastListedLatestEntryTimestampMillis) || processedNewFiles) {
                try {
                    this.lastListedLatestEntryTimestampMillis = latestListedEntryTimestampThisCycleMillis;
                    this.persist(latestListedEntryTimestampThisCycleMillis, this.lastProcessedLatestEntryTimestampMillis, this.latestIdentifiersProcessed, context.getStateManager(), this.getStateScope((PropertyContext)context));
                }
                catch (IOException ioe) {
                    this.getLogger().warn("Unable to save state due to {}. If NiFi is restarted before state is saved, or if another node begins executing this Processor, data duplication may occur.", (Throwable)ioe);
                }
            }
        } else {
            this.getLogger().debug("There is no data to list. Yielding.");
            context.yield();
            if (this.lastListedLatestEntryTimestampMillis == null) {
                this.lastListedLatestEntryTimestampMillis = 0L;
            }
            return;
        }
    }

    protected String getDefaultTimePrecision() {
        return TARGET_SYSTEM_TIMESTAMP_PRECISION.getDefaultValue();
    }

    private void resetTimeStates() {
        this.lastListedLatestEntryTimestampMillis = null;
        this.lastProcessedLatestEntryTimestampMillis = 0L;
        this.lastRunTimeNanos = 0L;
        this.latestIdentifiersProcessed.clear();
    }

    protected abstract Map<String, String> createAttributes(T var1, ProcessContext var2);

    protected abstract String getPath(ProcessContext var1);

    protected abstract List<T> performListing(ProcessContext var1, Long var2) throws IOException;

    protected abstract boolean isListingResetNecessary(PropertyDescriptor var1);

    protected abstract Scope getStateScope(PropertyContext var1);

    @OnScheduled
    public void initListedEntityTracker(ProcessContext context) {
        boolean isTrackingEntityStrategy = BY_ENTITIES.getValue().equals(context.getProperty(LISTING_STRATEGY).getValue());
        if (this.listedEntityTracker != null && (this.resetEntityTrackingState || !isTrackingEntityStrategy)) {
            try {
                this.listedEntityTracker.clearListedEntities();
            }
            catch (IOException e) {
                throw new RuntimeException("Failed to reset previously listed entities due to " + e, e);
            }
        }
        this.resetEntityTrackingState = false;
        if (isTrackingEntityStrategy) {
            if (this.listedEntityTracker == null) {
                this.listedEntityTracker = this.createListedEntityTracker();
            }
        } else {
            this.listedEntityTracker = null;
        }
    }

    protected ListedEntityTracker<T> createListedEntityTracker() {
        return new ListedEntityTracker(this.getIdentifier(), this.getLogger());
    }

    private void listByTrackingEntities(ProcessContext context, ProcessSession session) throws ProcessException {
        this.listedEntityTracker.trackEntities(context, session, this.justElectedPrimaryNode, this.getStateScope((PropertyContext)context), minTimestampToList -> {
            try {
                return this.performListing(context, (Long)minTimestampToList);
            }
            catch (IOException e) {
                this.getLogger().error("Failed to perform listing on remote host due to {}", new Object[]{e.getMessage()}, (Throwable)e);
                return Collections.emptyList();
            }
        }, entity -> this.createAttributes(entity, context));
        this.justElectedPrimaryNode = false;
    }

    static {
        HashMap<TimeUnit, Long> nanos = new HashMap<TimeUnit, Long>();
        nanos.put(TimeUnit.MILLISECONDS, 100L);
        nanos.put(TimeUnit.SECONDS, 1000L);
        nanos.put(TimeUnit.MINUTES, 60000L);
        LISTING_LAG_MILLIS = Collections.unmodifiableMap(nanos);
    }

    private static class StringSerDe
    implements Serializer<String>,
    Deserializer<String> {
        private StringSerDe() {
        }

        public String deserialize(byte[] value) throws DeserializationException, IOException {
            if (value == null) {
                return null;
            }
            return new String(value, StandardCharsets.UTF_8);
        }

        public void serialize(String value, OutputStream out) throws SerializationException, IOException {
            out.write(value.getBytes(StandardCharsets.UTF_8));
        }
    }
}

