/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.reporting.util.provenance;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.EventAccess;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.util.provenance.ComponentMapHolder;
import org.apache.nifi.reporting.util.provenance.ParentProcessGroupSearchNode;

public class ProvenanceEventConsumer {
    public static final String LAST_EVENT_ID_KEY = "last_event_id";
    public static final AllowableValue BEGINNING_OF_STREAM = new AllowableValue("beginning-of-stream", "Beginning of Stream", "Start reading provenance Events from the beginning of the stream (the oldest event first)");
    public static final AllowableValue END_OF_STREAM = new AllowableValue("end-of-stream", "End of Stream", "Start reading provenance Events from the end of the stream, ignoring old events");
    public static final PropertyDescriptor PROVENANCE_START_POSITION = new PropertyDescriptor.Builder().name("provenance-start-position").displayName("Provenance Record Start Position").description("If the Reporting Task has never been run, or if its state has been reset by a user, specifies where in the stream of Provenance Events the Reporting Task should start").allowableValues(new DescribedValue[]{BEGINNING_OF_STREAM, END_OF_STREAM}).defaultValue(BEGINNING_OF_STREAM.getValue()).required(true).build();
    public static final PropertyDescriptor PROVENANCE_BATCH_SIZE = new PropertyDescriptor.Builder().name("provenance-batch-size").displayName("Provenance Record Batch Size").description("Specifies how many records to send in a single batch, at most.").required(true).defaultValue("1000").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
    private String startPositionValue = PROVENANCE_START_POSITION.getDefaultValue();
    private Pattern componentTypeRegex;
    private Pattern componentTypeRegexExclude;
    private Pattern componentNameRegex;
    private Pattern componentNameRegexExclude;
    private List<ProvenanceEventType> eventTypes = new ArrayList<ProvenanceEventType>();
    private List<ProvenanceEventType> eventTypesExclude = new ArrayList<ProvenanceEventType>();
    private List<String> componentIds = new ArrayList<String>();
    private List<String> componentIdsExclude = new ArrayList<String>();
    private int batchSize = Integer.parseInt(PROVENANCE_BATCH_SIZE.getDefaultValue());
    private volatile long firstEventId = -1L;
    private volatile boolean scheduled = false;
    private ComponentLog logger;

    public void setStartPositionValue(String startPositionValue) {
        this.startPositionValue = startPositionValue;
    }

    public void setBatchSize(int batchSize) {
        this.batchSize = batchSize;
    }

    public void setComponentTypeRegex(String componentTypeRegex) {
        if (this.isNotEmpty(componentTypeRegex)) {
            this.componentTypeRegex = Pattern.compile(componentTypeRegex);
        }
    }

    public void setComponentTypeRegexExclude(String componentTypeRegex) {
        if (this.isNotEmpty(componentTypeRegex)) {
            this.componentTypeRegexExclude = Pattern.compile(componentTypeRegex);
        }
    }

    public void setComponentNameRegex(String componentNameRegex) {
        if (this.isNotEmpty(componentNameRegex)) {
            this.componentNameRegex = Pattern.compile(componentNameRegex);
        }
    }

    public void setComponentNameRegexExclude(String componentNameRegexExclude) {
        if (this.isNotEmpty(componentNameRegexExclude)) {
            this.componentNameRegexExclude = Pattern.compile(componentNameRegexExclude);
        }
    }

    public void addTargetEventType(ProvenanceEventType ... types) {
        Collections.addAll(this.eventTypes, types);
    }

    public void addTargetEventTypeExclude(ProvenanceEventType ... types) {
        Collections.addAll(this.eventTypesExclude, types);
    }

    public void addTargetComponentId(String ... ids) {
        Collections.addAll(this.componentIds, ids);
    }

    public void addTargetComponentIdExclude(String ... ids) {
        Collections.addAll(this.componentIdsExclude, ids);
    }

    public void setScheduled(boolean scheduled) {
        this.scheduled = scheduled;
    }

    public boolean isScheduled() {
        return this.scheduled;
    }

    public void setLogger(ComponentLog logger) {
        this.logger = logger;
    }

    public void consumeEvents(ReportingContext context, BiConsumer<ComponentMapHolder, List<ProvenanceEventRecord>> consumer) throws ProcessException {
        List<ProvenanceEventRecord> filteredEvents;
        List rawEvents;
        if (context == null) {
            this.logger.debug("No ReportingContext available.");
            return;
        }
        EventAccess eventAccess = context.getEventAccess();
        ProcessGroupStatus procGroupStatus = eventAccess.getControllerStatus();
        ComponentMapHolder componentMapHolder = ComponentMapHolder.createComponentMap(procGroupStatus);
        StateManager stateManager = context.getStateManager();
        Long currMaxId = eventAccess.getProvenanceRepository().getMaxEventId();
        if (currMaxId == null) {
            this.logger.debug("No events to send because no events have been created yet.");
            return;
        }
        if (this.firstEventId < 0L) {
            Map state;
            try {
                state = stateManager.getState(Scope.LOCAL).toMap();
            }
            catch (IOException e) {
                this.logger.error("Failed to get state at start up", (Throwable)e);
                return;
            }
            if (state.containsKey(LAST_EVENT_ID_KEY)) {
                this.firstEventId = Long.parseLong((String)state.get(LAST_EVENT_ID_KEY)) + 1L;
            } else if (END_OF_STREAM.getValue().equals(this.startPositionValue)) {
                this.firstEventId = currMaxId;
            }
            if (currMaxId < this.firstEventId - 1L) {
                if (BEGINNING_OF_STREAM.getValue().equals(this.startPositionValue)) {
                    this.logger.warn("Current provenance max id is {} which is less than what was stored in state as the last queried event, which was {}. This means the provenance restarted its ids. Restarting querying from the beginning.", new Object[]{currMaxId, this.firstEventId});
                    this.firstEventId = -1L;
                } else {
                    this.logger.warn("Current provenance max id is {} which is less than what was stored in state as the last queried event, which was {}. This means the provenance restarted its ids. Restarting querying from the latest event in the Provenance Repository.", new Object[]{currMaxId, this.firstEventId});
                    this.firstEventId = currMaxId;
                }
            }
        }
        if (currMaxId == this.firstEventId - 1L) {
            this.logger.debug("No events to send due to the current max id being equal to the last id that was queried.");
            return;
        }
        try {
            rawEvents = eventAccess.getProvenanceEvents(this.firstEventId, this.batchSize);
            filteredEvents = this.filterEvents(componentMapHolder, rawEvents);
        }
        catch (IOException ioe) {
            this.logger.error("Failed to retrieve Provenance Events from repository", (Throwable)ioe);
            return;
        }
        if (rawEvents == null || rawEvents.isEmpty()) {
            this.logger.debug("No events to send due to 'events' being null or empty.");
            return;
        }
        while (rawEvents != null && !rawEvents.isEmpty() && this.isScheduled()) {
            if (!filteredEvents.isEmpty()) {
                consumer.accept(componentMapHolder, filteredEvents);
            }
            this.firstEventId = this.updateLastEventId(rawEvents, stateManager);
            try {
                rawEvents = eventAccess.getProvenanceEvents(this.firstEventId, this.batchSize);
                filteredEvents = this.filterEvents(componentMapHolder, rawEvents);
            }
            catch (IOException ioe) {
                this.logger.error("Failed to retrieve Provenance Events from repository", (Throwable)ioe);
                return;
            }
        }
    }

    private long updateLastEventId(List<ProvenanceEventRecord> events, StateManager stateManager) {
        if (events == null || events.isEmpty()) {
            return this.firstEventId;
        }
        ProvenanceEventRecord lastEvent = events.get(events.size() - 1);
        String lastEventId = String.valueOf(lastEvent.getEventId());
        try {
            HashMap<String, String> newMapOfState = new HashMap<String, String>();
            newMapOfState.put(LAST_EVENT_ID_KEY, lastEventId);
            stateManager.setState(newMapOfState, Scope.LOCAL);
        }
        catch (IOException ioe) {
            this.logger.error("Failed to update state to {} due to {}; this could result in events being re-sent after a restart. The message of {} was: {}", new Object[]{lastEventId, ioe, ioe, ioe.getMessage(), ioe});
        }
        return lastEvent.getEventId() + 1L;
    }

    protected boolean isFilteringEnabled() {
        boolean anyPatternPresent = Stream.of(this.componentTypeRegex, this.componentTypeRegexExclude, this.componentNameRegex, this.componentNameRegexExclude).filter(Objects::nonNull).map(Pattern::toString).anyMatch(this::isNotEmpty);
        boolean anyListPresent = Stream.of(this.eventTypes, this.eventTypesExclude, this.componentIds, this.componentIdsExclude).filter(Objects::nonNull).anyMatch(list -> !list.isEmpty());
        return anyPatternPresent || anyListPresent;
    }

    private List<ProvenanceEventRecord> filterEvents(ComponentMapHolder componentMapHolder, List<ProvenanceEventRecord> provenanceEvents) {
        if (this.isFilteringEnabled()) {
            ArrayList<ProvenanceEventRecord> filteredEvents = new ArrayList<ProvenanceEventRecord>();
            for (ProvenanceEventRecord provenanceEventRecord : provenanceEvents) {
                ParentProcessGroupSearchNode parentProcessGroup;
                String processGroupId;
                if (!this.eventTypesExclude.isEmpty() && this.eventTypesExclude.contains(provenanceEventRecord.getEventType()) || !this.eventTypes.isEmpty() && !this.eventTypes.contains(provenanceEventRecord.getEventType())) continue;
                String componentId = provenanceEventRecord.getComponentId();
                if (!this.componentIdsExclude.isEmpty()) {
                    if (this.componentIdsExclude.contains(componentId) || componentMapHolder == null) continue;
                    processGroupId = componentMapHolder.getProcessGroupId(componentId, provenanceEventRecord.getComponentType());
                    if (!this.isEmpty(processGroupId)) {
                        if (this.componentIdsExclude.contains(processGroupId)) continue;
                        for (parentProcessGroup = componentMapHolder.getProcessGroupParent(processGroupId); parentProcessGroup != null && !this.componentIdsExclude.contains(parentProcessGroup.getId()); parentProcessGroup = parentProcessGroup.getParent()) {
                        }
                        if (parentProcessGroup != null) continue;
                    }
                }
                if (!this.componentIds.isEmpty() && !this.componentIds.contains(componentId)) {
                    if (componentMapHolder == null || this.isEmpty(processGroupId = componentMapHolder.getProcessGroupId(componentId, provenanceEventRecord.getComponentType()))) continue;
                    if (!this.componentIds.contains(processGroupId)) {
                        for (parentProcessGroup = componentMapHolder.getProcessGroupParent(processGroupId); parentProcessGroup != null && !this.componentIds.contains(parentProcessGroup.getId()); parentProcessGroup = parentProcessGroup.getParent()) {
                        }
                        if (parentProcessGroup == null) continue;
                    }
                }
                if (this.componentTypeRegexExclude != null && this.componentTypeRegexExclude.matcher(provenanceEventRecord.getComponentType()).matches() || this.componentTypeRegex != null && !this.componentTypeRegex.matcher(provenanceEventRecord.getComponentType()).matches()) continue;
                String componentName = componentMapHolder.getComponentName(provenanceEventRecord.getComponentId());
                if (this.componentNameRegexExclude != null && componentName != null && this.componentNameRegexExclude.matcher(componentName).matches() || this.componentNameRegex != null && componentName != null && !this.componentNameRegex.matcher(componentName).matches()) continue;
                filteredEvents.add(provenanceEventRecord);
            }
            return filteredEvents;
        }
        return provenanceEvents;
    }

    private boolean isNotEmpty(String string) {
        return !this.isEmpty(string);
    }

    private boolean isEmpty(String string) {
        return string == null || string.isEmpty();
    }
}

