/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.services.azure.data.explorer;

import com.microsoft.azure.kusto.data.Client;
import com.microsoft.azure.kusto.data.ClientFactory;
import com.microsoft.azure.kusto.data.KustoResultColumn;
import com.microsoft.azure.kusto.data.KustoResultSetTable;
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.data.exceptions.DataClientException;
import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionMapping;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.ManagedStreamingIngestClient;
import com.microsoft.azure.kusto.ingest.QueuedIngestClient;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException;
import com.microsoft.azure.kusto.ingest.result.IngestionResult;
import com.microsoft.azure.kusto.ingest.result.IngestionStatus;
import com.microsoft.azure.kusto.ingest.result.OperationStatus;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.services.azure.data.explorer.KustoAuthenticationStrategy;
import org.apache.nifi.services.azure.data.explorer.KustoIngestQueryResponse;
import org.apache.nifi.services.azure.data.explorer.KustoIngestService;
import org.apache.nifi.services.azure.data.explorer.KustoIngestionRequest;
import org.apache.nifi.services.azure.data.explorer.KustoIngestionResult;

@Tags(value={"Azure", "Data", "Explorer", "ADX", "Kusto", "ingest", "azure"})
@CapabilityDescription(value="Sends batches of flowfile content or stream flowfile content to an Azure ADX cluster.")
public class StandardKustoIngestService
extends AbstractControllerService
implements KustoIngestService {
    public static final PropertyDescriptor AUTHENTICATION_STRATEGY = new PropertyDescriptor.Builder().name("Authentication Strategy").displayName("Authentication Strategy").description("Authentication method for access to Azure Data Explorer").required(true).defaultValue(KustoAuthenticationStrategy.MANAGED_IDENTITY.getValue()).allowableValues(KustoAuthenticationStrategy.class).build();
    public static final PropertyDescriptor APPLICATION_CLIENT_ID = new PropertyDescriptor.Builder().name("Application Client ID").displayName("Application Client ID").description("Azure Data Explorer Application Client Identifier for Authentication").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor APPLICATION_KEY = new PropertyDescriptor.Builder().name("Application Key").displayName("Application Key").description("Azure Data Explorer Application Key for Authentication").required(true).sensitive(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dependsOn(AUTHENTICATION_STRATEGY, KustoAuthenticationStrategy.APPLICATION_CREDENTIALS.getValue(), new String[0]).build();
    public static final PropertyDescriptor APPLICATION_TENANT_ID = new PropertyDescriptor.Builder().name("Application Tenant ID").displayName("Application Tenant ID").description("Azure Data Explorer Application Tenant Identifier for Authentication").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dependsOn(AUTHENTICATION_STRATEGY, KustoAuthenticationStrategy.APPLICATION_CREDENTIALS.getValue(), new String[0]).build();
    public static final PropertyDescriptor CLUSTER_URI = new PropertyDescriptor.Builder().name("Cluster URI").displayName("Cluster URI").description("Azure Data Explorer Cluster URI").required(true).addValidator(StandardValidators.URL_VALIDATOR).build();
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(AUTHENTICATION_STRATEGY, APPLICATION_CLIENT_ID, APPLICATION_KEY, APPLICATION_TENANT_ID, CLUSTER_URI);
    private static final String STREAMING_POLICY_SHOW_COMMAND = ".show database %s policy streamingingestion";
    private static final String COUNT_TABLE_COMMAND = "%s | count";
    private static final Pair<String, String> NIFI_SINK = Pair.of((Object)"processor", (Object)StandardKustoIngestService.class.getSimpleName());
    private volatile QueuedIngestClient queuedIngestClient;
    private volatile ManagedStreamingIngestClient managedStreamingIngestClient;
    private volatile Client executionClient;

    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    @OnEnabled
    public void onEnabled(ConfigurationContext context) throws ProcessException, URISyntaxException {
        String applicationClientId = context.getProperty(APPLICATION_CLIENT_ID).getValue();
        String applicationKey = context.getProperty(APPLICATION_KEY).getValue();
        String applicationTenantId = context.getProperty(APPLICATION_TENANT_ID).getValue();
        String clusterUri = context.getProperty(CLUSTER_URI).getValue();
        KustoAuthenticationStrategy kustoAuthenticationStrategy = KustoAuthenticationStrategy.valueOf(context.getProperty(AUTHENTICATION_STRATEGY).getValue());
        this.queuedIngestClient = this.createKustoQueuedIngestClient(clusterUri, applicationClientId, applicationKey, applicationTenantId, kustoAuthenticationStrategy);
        this.managedStreamingIngestClient = this.createKustoStreamingIngestClient(clusterUri, applicationClientId, applicationKey, applicationTenantId, kustoAuthenticationStrategy);
        this.executionClient = this.createKustoExecutionClient(clusterUri, applicationClientId, applicationKey, applicationTenantId, kustoAuthenticationStrategy);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @OnStopped
    public final void onStopped() {
        if (this.queuedIngestClient != null) {
            try {
                this.queuedIngestClient.close();
            }
            catch (IOException e) {
                this.getLogger().error("Closing Azure Data Explorer Queued Ingest Client failed", (Throwable)e);
            }
            finally {
                this.queuedIngestClient = null;
            }
        }
        if (this.managedStreamingIngestClient != null) {
            try {
                this.managedStreamingIngestClient.close();
            }
            catch (IOException e) {
                this.getLogger().error("Closing Azure Data Explorer Managed Streaming Ingest Client failed", (Throwable)e);
            }
            finally {
                this.managedStreamingIngestClient = null;
            }
        }
        if (this.executionClient != null) {
            try {
                this.executionClient.close();
            }
            catch (IOException e) {
                this.getLogger().error("Closing Azure Data Explorer Execution Client failed", (Throwable)e);
            }
            finally {
                this.executionClient = null;
            }
        }
    }

    protected QueuedIngestClient createKustoQueuedIngestClient(String clusterUrl, String appId, String appKey, String appTenant, KustoAuthenticationStrategy kustoAuthStrategy) throws URISyntaxException {
        ConnectionStringBuilder ingestConnectionStringBuilder = this.createKustoEngineConnectionString(clusterUrl, appId, appKey, appTenant, kustoAuthStrategy);
        return IngestClientFactory.createClient((ConnectionStringBuilder)ingestConnectionStringBuilder);
    }

    protected ManagedStreamingIngestClient createKustoStreamingIngestClient(String clusterUrl, String appId, String appKey, String appTenant, KustoAuthenticationStrategy kustoAuthStrategy) throws URISyntaxException {
        ConnectionStringBuilder ingestConnectionStringBuilder = this.createKustoEngineConnectionString(clusterUrl, appId, appKey, appTenant, kustoAuthStrategy);
        ConnectionStringBuilder streamingConnectionStringBuilder = this.createKustoEngineConnectionString(clusterUrl, appId, appKey, appTenant, kustoAuthStrategy);
        return IngestClientFactory.createManagedStreamingIngestClient((ConnectionStringBuilder)ingestConnectionStringBuilder, (ConnectionStringBuilder)streamingConnectionStringBuilder);
    }

    public KustoIngestionResult ingestData(KustoIngestionRequest kustoIngestionRequest) {
        StreamSourceInfo info = new StreamSourceInfo(kustoIngestionRequest.getInputStream());
        IngestionProperties ingestionProperties = new IngestionProperties(kustoIngestionRequest.getDatabaseName(), kustoIngestionRequest.getTableName());
        IngestionMapping.IngestionMappingKind ingestionMappingKind = this.setDataFormatAndMapping(kustoIngestionRequest.getDataFormat(), ingestionProperties);
        if (StringUtils.isNotEmpty((CharSequence)kustoIngestionRequest.getMappingName()) && ingestionMappingKind != null) {
            ingestionProperties.setIngestionMapping(kustoIngestionRequest.getMappingName(), ingestionMappingKind);
        }
        ingestionProperties.setReportLevel(IngestionProperties.IngestionReportLevel.FAILURES_AND_SUCCESSES);
        ingestionProperties.setReportMethod(IngestionProperties.IngestionReportMethod.TABLE);
        ingestionProperties.setFlushImmediately(false);
        ingestionProperties.setIgnoreFirstRecord(kustoIngestionRequest.isIgnoreFirstRecord());
        try {
            IngestionResult ingestionResult = kustoIngestionRequest.isStreamingEnabled() ? this.managedStreamingIngestClient.ingestFromStream(info, ingestionProperties) : this.queuedIngestClient.ingestFromStream(info, ingestionProperties);
            if (kustoIngestionRequest.pollOnIngestionStatus()) {
                long timeoutMillis = kustoIngestionRequest.getIngestionStatusPollingTimeout().toMillis();
                long ingestionStatusPollingInterval = kustoIngestionRequest.getIngestionStatusPollingInterval().toMillis();
                return this.pollOnIngestionStatus(ingestionResult, timeoutMillis, ingestionStatusPollingInterval);
            }
            return KustoIngestionResult.SUCCEEDED;
        }
        catch (IngestionClientException | IngestionServiceException | URISyntaxException e) {
            throw new ProcessException("Azure Data Explorer Ingest failed", e);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ProcessException("Azure Data Explorer Ingest interrupted", (Throwable)e);
        }
    }

    private KustoIngestionResult pollOnIngestionStatus(IngestionResult ingestionResult, long timeoutMillis, long ingestionStatusPollingInterval) throws URISyntaxException, InterruptedException {
        List statuses = this.initializeKustoIngestionStatusAsPending();
        long startTime = System.currentTimeMillis();
        long endTime = startTime + timeoutMillis;
        while (System.currentTimeMillis() < endTime) {
            List currentStatuses = ingestionResult.getIngestionStatusCollection();
            OperationStatus operationStatus = ((IngestionStatus)currentStatuses.getFirst()).status;
            if (operationStatus == OperationStatus.Succeeded || operationStatus == OperationStatus.Failed || operationStatus == OperationStatus.PartiallySucceeded) {
                statuses = currentStatuses;
                break;
            }
            Thread.sleep(ingestionStatusPollingInterval);
        }
        if (System.currentTimeMillis() - startTime >= timeoutMillis) {
            throw new ProcessException(String.format("Timeout of %s exceeded while waiting for ingestion status", ingestionStatusPollingInterval));
        }
        return KustoIngestionResult.fromString((String)statuses.getFirst().status.toString());
    }

    public boolean isStreamingPolicyEnabled(String databaseName) {
        String streamingPolicy;
        List<String> row;
        String query = String.format(STREAMING_POLICY_SHOW_COMMAND, databaseName);
        KustoIngestQueryResponse kustoIngestQueryResponse = this.executeQuery(databaseName, query);
        boolean streamingPolicyEnabled = false;
        if (!(kustoIngestQueryResponse.getQueryResult().isEmpty() || (row = kustoIngestQueryResponse.getQueryResult().get(0)).isEmpty() || (streamingPolicy = row.get(2)).isEmpty())) {
            streamingPolicyEnabled = true;
        }
        return streamingPolicyEnabled;
    }

    public boolean isTableReadable(String databaseName, String tableName) {
        String query = String.format(COUNT_TABLE_COMMAND, tableName);
        KustoIngestQueryResponse kustoIngestQueryResponse = this.executeQuery(databaseName, query);
        return !kustoIngestQueryResponse.isError();
    }

    protected List<IngestionStatus> initializeKustoIngestionStatusAsPending() {
        IngestionStatus ingestionStatus = new IngestionStatus();
        ingestionStatus.status = OperationStatus.Pending;
        return Collections.singletonList(ingestionStatus);
    }

    protected Client createKustoExecutionClient(String clusterUrl, String appId, String appKey, String appTenant, KustoAuthenticationStrategy kustoAuthStrategy) throws URISyntaxException {
        return ClientFactory.createClient((ConnectionStringBuilder)this.createKustoEngineConnectionString(clusterUrl, appId, appKey, appTenant, kustoAuthStrategy));
    }

    private ConnectionStringBuilder createKustoEngineConnectionString(String clusterUrl, String appId, String appKey, String appTenant, KustoAuthenticationStrategy kustoAuthStrategy) {
        ConnectionStringBuilder builder = KustoAuthenticationStrategy.APPLICATION_CREDENTIALS == kustoAuthStrategy ? ConnectionStringBuilder.createWithAadApplicationCredentials((String)clusterUrl, (String)appId, (String)appKey, (String)appTenant) : ConnectionStringBuilder.createWithAadManagedIdentity((String)clusterUrl, (String)appId);
        builder.setConnectorDetails("Kusto.Nifi.Sink", StandardKustoIngestService.class.getPackage().getImplementationVersion(), null, null, false, null, new Pair[]{NIFI_SINK});
        return builder;
    }

    private KustoIngestQueryResponse executeQuery(String databaseName, String query) {
        KustoIngestQueryResponse kustoIngestQueryResponse;
        Objects.requireNonNull(databaseName, "Database Name required");
        Objects.requireNonNull(query, "Query required");
        try {
            KustoResultSetTable kustoResultSetTable = this.executionClient.execute(databaseName, query).getPrimaryResults();
            HashMap<Integer, List<String>> response = new HashMap<Integer, List<String>>();
            int rowCount = 0;
            while (kustoResultSetTable.hasNext()) {
                kustoResultSetTable.next();
                ArrayList<String> rowData = new ArrayList<String>();
                for (KustoResultColumn columnName : kustoResultSetTable.getColumns()) {
                    String data = kustoResultSetTable.getString(columnName.getOrdinal());
                    rowData.add(data);
                }
                response.put(rowCount++, rowData);
            }
            kustoIngestQueryResponse = new KustoIngestQueryResponse(response);
        }
        catch (DataClientException | DataServiceException e) {
            this.getLogger().error("Azure Data Explorer Ingest execution failed", e);
            kustoIngestQueryResponse = new KustoIngestQueryResponse(true);
        }
        return kustoIngestQueryResponse;
    }

    private IngestionMapping.IngestionMappingKind setDataFormatAndMapping(String dataFormat, IngestionProperties ingestionProperties) {
        IngestionProperties.DataFormat dataFormatFound = this.getDataFormat(dataFormat);
        return switch (dataFormatFound) {
            case IngestionProperties.DataFormat.AVRO -> {
                ingestionProperties.setDataFormat(IngestionProperties.DataFormat.AVRO);
                yield IngestionProperties.DataFormat.AVRO.getIngestionMappingKind();
            }
            case IngestionProperties.DataFormat.APACHEAVRO -> {
                ingestionProperties.setDataFormat(IngestionProperties.DataFormat.APACHEAVRO);
                yield IngestionProperties.DataFormat.APACHEAVRO.getIngestionMappingKind();
            }
            case IngestionProperties.DataFormat.CSV -> {
                ingestionProperties.setDataFormat(IngestionProperties.DataFormat.CSV);
                yield IngestionProperties.DataFormat.CSV.getIngestionMappingKind();
            }
            case IngestionProperties.DataFormat.JSON -> {
                ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON);
                yield IngestionProperties.DataFormat.JSON.getIngestionMappingKind();
            }
            case IngestionProperties.DataFormat.MULTIJSON -> {
                ingestionProperties.setDataFormat(IngestionProperties.DataFormat.MULTIJSON);
                yield IngestionProperties.DataFormat.MULTIJSON.getIngestionMappingKind();
            }
            case IngestionProperties.DataFormat.ORC -> {
                ingestionProperties.setDataFormat(IngestionProperties.DataFormat.ORC);
                yield IngestionProperties.DataFormat.ORC.getIngestionMappingKind();
            }
            case IngestionProperties.DataFormat.PARQUET -> {
                ingestionProperties.setDataFormat(IngestionProperties.DataFormat.PARQUET);
                yield IngestionProperties.DataFormat.PARQUET.getIngestionMappingKind();
            }
            case IngestionProperties.DataFormat.PSV -> {
                ingestionProperties.setDataFormat(IngestionProperties.DataFormat.PSV);
                yield IngestionProperties.DataFormat.PSV.getIngestionMappingKind();
            }
            case IngestionProperties.DataFormat.SCSV -> {
                ingestionProperties.setDataFormat(IngestionProperties.DataFormat.SCSV);
                yield IngestionProperties.DataFormat.SCSV.getIngestionMappingKind();
            }
            case IngestionProperties.DataFormat.SOHSV -> {
                ingestionProperties.setDataFormat(IngestionProperties.DataFormat.SOHSV);
                yield IngestionProperties.DataFormat.SOHSV.getIngestionMappingKind();
            }
            case IngestionProperties.DataFormat.TSV -> {
                ingestionProperties.setDataFormat(IngestionProperties.DataFormat.TSV);
                yield IngestionProperties.DataFormat.TSV.getIngestionMappingKind();
            }
            case IngestionProperties.DataFormat.TSVE -> {
                ingestionProperties.setDataFormat(IngestionProperties.DataFormat.TSVE);
                yield IngestionProperties.DataFormat.TSVE.getIngestionMappingKind();
            }
            case IngestionProperties.DataFormat.TXT -> {
                ingestionProperties.setDataFormat(IngestionProperties.DataFormat.TXT);
                yield IngestionProperties.DataFormat.TXT.getIngestionMappingKind();
            }
            default -> null;
        };
    }

    private IngestionProperties.DataFormat getDataFormat(String dataFormat) {
        Optional<IngestionProperties.DataFormat> dataFormatFound = Arrays.stream(IngestionProperties.DataFormat.values()).filter(value -> value.getKustoValue().equalsIgnoreCase(dataFormat)).findFirst();
        return dataFormatFound.orElseThrow(() -> new IllegalArgumentException("Data Format [%s] not supported".formatted(dataFormat)));
    }
}

