/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.hubspot;

import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.configuration.DefaultSettings;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.hubspot.HubSpotObjectType;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.web.client.api.HttpResponseEntity;
import org.apache.nifi.web.client.api.HttpResponseStatus;
import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;

@PrimaryNodeOnly
@TriggerSerially
@InputRequirement(value=InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags(value={"hubspot"})
@CapabilityDescription(value="Retrieves JSON data from a private HubSpot application. This processor is intended to be run on the Primary Node only.")
@Stateful(scopes={Scope.CLUSTER}, description="In case of incremental loading, the start and end timestamps of the last query time window are stored in the state. When the 'Result Limit' property is set, the paging cursor is saved after executing a request. Only the objects after the paging cursor will be retrieved. The maximum number of retrieved objects can be set in the 'Result Limit' property.")
@DefaultSettings(yieldDuration="10 sec")
@DefaultSchedule(strategy=SchedulingStrategy.TIMER_DRIVEN, period="1 min")
@WritesAttribute(attribute="mime.type", description="Sets the MIME type to application/json")
public class GetHubSpot
extends AbstractProcessor {
    static final PropertyDescriptor OBJECT_TYPE = new PropertyDescriptor.Builder().name("Object Type").description("The HubSpot Object Type requested").required(true).allowableValues(HubSpotObjectType.class).build();
    static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder().name("Access Token").description("Access Token to authenticate requests").required(true).sensitive(true).addValidator(StandardValidators.NON_BLANK_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    static final PropertyDescriptor RESULT_LIMIT = new PropertyDescriptor.Builder().name("Result Limit").description("The maximum number of results to request for each invocation of the Processor").expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).required(false).addValidator(StandardValidators.createLongValidator((long)1L, (long)100L, (boolean)true)).build();
    static final PropertyDescriptor IS_INCREMENTAL = new PropertyDescriptor.Builder().name("Incremental Loading").description("The processor can incrementally load the queried objects so that each object is queried exactly once. For each query, the processor queries objects within a time window where the objects were modified between the previous run time and the current time (optionally adjusted by the Incremental Delay property).").required(true).allowableValues(new String[]{"true", "false"}).defaultValue("true").build();
    static final PropertyDescriptor INCREMENTAL_DELAY = new PropertyDescriptor.Builder().name("Incremental Delay").description("The ending timestamp of the time window will be adjusted earlier by the amount configured in this property. For example, with a property value of 10 seconds, an ending timestamp of 12:30:45 would be changed to 12:30:35. Set this property to avoid missing objects when the clock of your local machines and HubSpot servers' clock are not in sync and to protect against HubSpot's mechanism that changes last updated timestamps after object creation.").required(true).defaultValue("30 sec").expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).dependsOn(IS_INCREMENTAL, "true", new String[0]).build();
    static final PropertyDescriptor INCREMENTAL_INITIAL_START_TIME = new PropertyDescriptor.Builder().name("Incremental Initial Start Time").description("This property specifies the start time that the processor applies when running the first request. The expected format is a UTC date-time such as '2011-12-03T10:15:30Z'").required(false).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).addValidator(StandardValidators.ISO8601_INSTANT_VALIDATOR).dependsOn(IS_INCREMENTAL, "true", new String[0]).build();
    static final PropertyDescriptor WEB_CLIENT_SERVICE_PROVIDER = new PropertyDescriptor.Builder().name("Web Client Service Provider").description("Controller service for HTTP client operations").identifiesControllerService(WebClientServiceProvider.class).required(true).build();
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("For FlowFiles created as a result of a successful HTTP request.").build();
    private static final String API_BASE_URI = "api.hubapi.com";
    private static final String HTTPS = "https";
    private static final int TOO_MANY_REQUESTS = 429;
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private static final JsonFactory JSON_FACTORY = OBJECT_MAPPER.getFactory();
    private static final Map<String, HubSpotObjectType> OBJECT_TYPE_LOOKUP_MAP = GetHubSpot.createObjectTypeLookupMap();
    private static final String NO_PAGING = "no paging";
    private static final String PAGING_CURSOR = "after";
    static final String CURSOR_KEY = "paging_next";
    static final String START_INCREMENTAL_KEY = "time_window_start";
    static final String END_INCREMENTAL_KEY = "time_window_end";
    private volatile WebClientServiceProvider webClientServiceProvider;
    private volatile boolean isObjectTypeModified;
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(OBJECT_TYPE, ACCESS_TOKEN, RESULT_LIMIT, IS_INCREMENTAL, INCREMENTAL_DELAY, INCREMENTAL_INITIAL_START_TIME, WEB_CLIENT_SERVICE_PROVIDER);
    private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS);

    private static Map<String, HubSpotObjectType> createObjectTypeLookupMap() {
        return Arrays.stream(HubSpotObjectType.values()).collect(Collectors.toMap(HubSpotObjectType::getValue, Function.identity()));
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
        if (this.isConfigurationRestored() && (OBJECT_TYPE.equals((Object)descriptor) || IS_INCREMENTAL.equals((Object)descriptor))) {
            this.isObjectTypeModified = true;
        }
    }

    @OnScheduled
    public void onScheduled(ProcessContext context) {
        this.webClientServiceProvider = (WebClientServiceProvider)context.getProperty(WEB_CLIENT_SERVICE_PROVIDER).asControllerService(WebClientServiceProvider.class);
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        if (this.isObjectTypeModified) {
            this.clearState(context);
            this.isObjectTypeModified = false;
        }
        String accessToken = context.getProperty(ACCESS_TOKEN).getValue();
        String endpoint = context.getProperty(OBJECT_TYPE).getValue();
        URI uri = this.getBaseUri(context);
        AtomicInteger total = new AtomicInteger(-1);
        Map<String, String> stateMap = this.getStateMap(session);
        String filters = this.createIncrementalFilters(context, stateMap);
        HttpResponseEntity response = this.getHttpResponseEntity(accessToken, uri, filters);
        if (response.statusCode() == HttpResponseStatus.OK.getCode()) {
            FlowFile flowFile = session.create();
            flowFile = session.write(flowFile, this.parseHttpResponse(response, total, stateMap));
            if (total.get() > 0) {
                flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json");
                session.transfer(flowFile, REL_SUCCESS);
                session.getProvenanceReporter().receive(flowFile, uri.toString());
            } else {
                this.getLogger().debug("Empty response when requested HubSpot endpoint: [{}]", new Object[]{endpoint});
                context.yield();
                session.remove(flowFile);
            }
            this.updateState(session, stateMap);
        } else {
            if (response.statusCode() == 429) {
                context.yield();
                throw new ProcessException(String.format("Rate limit exceeded, yielding before retrying request. HTTP %d error for requested URI [%s]", response.statusCode(), uri));
            }
            String responseBody = this.getResponseBodyAsString(context, response, uri);
            this.getLogger().warn("HTTP {} error for requested URI [{}] with response [{}]", new Object[]{response.statusCode(), uri, responseBody});
        }
    }

    public void migrateProperties(PropertyConfiguration config) {
        config.renameProperty("object-type", OBJECT_TYPE.getName());
        config.renameProperty("access-token", ACCESS_TOKEN.getName());
        config.renameProperty("result-limit", RESULT_LIMIT.getName());
        config.renameProperty("is-incremental", IS_INCREMENTAL.getName());
        config.renameProperty("incremental-delay", INCREMENTAL_DELAY.getName());
        config.renameProperty("incremental-initial-start-time", INCREMENTAL_INITIAL_START_TIME.getName());
        config.renameProperty("web-client-service-provider", WEB_CLIENT_SERVICE_PROVIDER.getName());
    }

    private String getResponseBodyAsString(ProcessContext context, HttpResponseEntity response, URI uri) {
        try {
            return IOUtils.toString((InputStream)response.body(), (Charset)StandardCharsets.UTF_8);
        }
        catch (IOException e) {
            context.yield();
            throw new UncheckedIOException(String.format("Reading HTTP response body for requested URI [%s] failed", uri), e);
        }
    }

    private OutputStreamCallback parseHttpResponse(HttpResponseEntity response, AtomicInteger total, Map<String, String> stateMap) {
        return out -> {
            try (JsonParser jsonParser = JSON_FACTORY.createParser(response.body());
                 JsonGenerator jsonGenerator = JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8);){
                boolean isCursorAvailable = false;
                while (jsonParser.nextToken() != null) {
                    String fieldName;
                    if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.currentName().equals("total")) {
                        jsonParser.nextToken();
                        total.set(jsonParser.getIntValue());
                    }
                    if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.currentName().equals("results")) {
                        jsonParser.nextToken();
                        jsonGenerator.copyCurrentStructure(jsonParser);
                    }
                    if (!PAGING_CURSOR.equals(fieldName = jsonParser.currentName())) continue;
                    isCursorAvailable = true;
                    jsonParser.nextToken();
                    stateMap.put(CURSOR_KEY, jsonParser.getText());
                    break;
                }
                if (!isCursorAvailable) {
                    stateMap.put(CURSOR_KEY, NO_PAGING);
                }
            }
        };
    }

    URI getBaseUri(ProcessContext context) {
        String path = context.getProperty(OBJECT_TYPE).getValue();
        return this.webClientServiceProvider.getHttpUriBuilder().scheme(HTTPS).host(API_BASE_URI).encodedPath(path + "/search").build();
    }

    private HttpResponseEntity getHttpResponseEntity(String accessToken, URI uri, String filters) {
        InputStream inputStream = IOUtils.toInputStream((String)filters, (Charset)StandardCharsets.UTF_8);
        try {
            return this.webClientServiceProvider.getWebClientService().post().uri(uri).header("Authorization", "Bearer " + accessToken).header("Content-Type", "application/json").body(inputStream, OptionalLong.of(inputStream.available())).retrieve();
        }
        catch (IOException e) {
            throw new ProcessException("Could not transform incremental filters to input stream", (Throwable)e);
        }
    }

    private String createIncrementalFilters(ProcessContext context, Map<String, String> stateMap) {
        boolean isIncremental;
        String cursor;
        String limit = context.getProperty(RESULT_LIMIT).evaluateAttributeExpressions().getValue();
        String objectType = context.getProperty(OBJECT_TYPE).getValue();
        HubSpotObjectType hubSpotObjectType = OBJECT_TYPE_LOOKUP_MAP.get(objectType);
        Long incrDelayMs = context.getProperty(INCREMENTAL_DELAY).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
        ObjectNode root = OBJECT_MAPPER.createObjectNode();
        if (limit != null) {
            root.put("limit", limit);
        }
        if ((cursor = stateMap.get(CURSOR_KEY)) != null && !NO_PAGING.equals(cursor)) {
            root.put(PAGING_CURSOR, cursor);
        }
        if (isIncremental = context.getProperty(IS_INCREMENTAL).asBoolean().booleanValue()) {
            String currentEndTime;
            String currentStartTime;
            String initialStartTimeValue = context.getProperty(INCREMENTAL_INITIAL_START_TIME).evaluateAttributeExpressions().getValue();
            String hubspotSpecificIncrementalFieldName = hubSpotObjectType.getLastModifiedDateType().getValue();
            String lastStartTime = stateMap.get(START_INCREMENTAL_KEY);
            String lastEndTime = stateMap.get(END_INCREMENTAL_KEY);
            if (cursor != null && !NO_PAGING.equals(cursor)) {
                currentStartTime = lastStartTime;
                currentEndTime = lastEndTime;
            } else {
                currentStartTime = lastEndTime != null ? lastEndTime : this.getInitialStartTimeEpoch(initialStartTimeValue);
                long delayedCurrentEndTime = incrDelayMs != null ? this.getCurrentEpochTime() - incrDelayMs : this.getCurrentEpochTime();
                currentEndTime = String.valueOf(delayedCurrentEndTime);
                stateMap.put(START_INCREMENTAL_KEY, currentStartTime);
                stateMap.put(END_INCREMENTAL_KEY, currentEndTime);
            }
            ArrayNode filters = OBJECT_MAPPER.createArrayNode();
            if (currentStartTime != null) {
                ObjectNode greaterThanFilterNode = OBJECT_MAPPER.createObjectNode();
                greaterThanFilterNode.put("propertyName", hubspotSpecificIncrementalFieldName);
                greaterThanFilterNode.put("operator", "GTE");
                greaterThanFilterNode.put("value", currentStartTime);
                filters.add((JsonNode)greaterThanFilterNode);
            }
            ObjectNode lessThanFilterNode = OBJECT_MAPPER.createObjectNode();
            lessThanFilterNode.put("propertyName", hubspotSpecificIncrementalFieldName);
            lessThanFilterNode.put("operator", "LT");
            lessThanFilterNode.put("value", currentEndTime);
            filters.add((JsonNode)lessThanFilterNode);
            root.set("filters", (JsonNode)filters);
        }
        return root.toString();
    }

    private String getInitialStartTimeEpoch(String initialStartTimeValue) {
        if (initialStartTimeValue != null) {
            return String.valueOf(Instant.parse(initialStartTimeValue).toEpochMilli());
        }
        return null;
    }

    long getCurrentEpochTime() {
        return Instant.now().toEpochMilli();
    }

    private Map<String, String> getStateMap(ProcessSession session) {
        StateMap stateMap;
        try {
            stateMap = session.getState(Scope.CLUSTER);
        }
        catch (IOException e) {
            throw new ProcessException("State retrieval failed", (Throwable)e);
        }
        return new HashMap<String, String>(stateMap.toMap());
    }

    private void updateState(ProcessSession session, Map<String, String> newState) {
        try {
            session.setState(newState, Scope.CLUSTER);
        }
        catch (IOException e) {
            throw new ProcessException("Page cursor update failed", (Throwable)e);
        }
    }

    private void clearState(ProcessContext context) {
        try {
            context.getStateManager().clear(Scope.CLUSTER);
        }
        catch (IOException e) {
            throw new ProcessException("Clearing state failed", (Throwable)e);
        }
    }
}

