/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.azuredataexplorer;

import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.microsoft.azure.kusto.data.StringUtils;
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.data.exceptions.KustoDataExceptionBase;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionMapping;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException;
import com.microsoft.azure.kusto.ingest.result.IngestionResult;
import com.microsoft.azure.kusto.ingest.result.IngestionStatus;
import com.microsoft.azure.kusto.ingest.result.TableReportIngestionResult;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.net.ConnectException;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.Generated;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.azuredataexplorer.ADXPulsarEvent;
import org.apache.pulsar.io.azuredataexplorer.ADXSinkConfig;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Connector(name="adx", type=IOType.SINK, help="The ADXSink is used for moving messages from Pulsar to ADX.", configClass=ADXSinkConfig.class)
public class ADXSink
implements Sink<byte[]> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(ADXSink.class);
    private final ObjectMapper mapper = new ObjectMapper().registerModule((Module)new JavaTimeModule());
    IngestionProperties ingestionProperties;
    private IngestClient ingestClient;
    private List<Record<byte[]>> incomingRecordsList;
    private int batchSize;
    private long batchTimeMs;
    private ScheduledExecutorService adxSinkExecutor;
    private int maxRetryAttempts;
    private long retryBackOffTime;

    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
        log.info("Open ADX Sink");
        ADXSinkConfig adxConfig = ADXSinkConfig.load(config, sinkContext);
        adxConfig.validate();
        ConnectionStringBuilder kcsb = this.getConnectionStringBuilder(adxConfig);
        if (kcsb == null) {
            throw new Exception("Kusto Connection String NULL");
        }
        log.debug("ConnectionString created: {}.", (Object)kcsb);
        this.ingestClient = adxConfig.getManagedIdentityId() != null ? IngestClientFactory.createManagedStreamingIngestClient((ConnectionStringBuilder)kcsb) : IngestClientFactory.createClient((ConnectionStringBuilder)kcsb);
        this.ingestionProperties = new IngestionProperties(adxConfig.getDatabase(), adxConfig.getTable());
        this.ingestionProperties.setIngestionMapping(adxConfig.getMappingRefName(), this.getParseMappingRefType(adxConfig.getMappingRefType()));
        this.ingestionProperties.setReportLevel(IngestionProperties.IngestionReportLevel.FAILURES_AND_SUCCESSES);
        this.ingestionProperties.setReportMethod(IngestionProperties.IngestionReportMethod.TABLE);
        this.ingestionProperties.setFlushImmediately(adxConfig.isFlushImmediately());
        this.ingestionProperties.setDataFormat(IngestionProperties.DataFormat.MULTIJSON);
        log.debug("Ingestion Properties:  {}", (Object)this.ingestionProperties.toString());
        this.maxRetryAttempts = adxConfig.getMaxRetryAttempts() + 1;
        this.retryBackOffTime = adxConfig.getRetryBackOffTime();
        this.batchSize = adxConfig.getBatchSize();
        this.batchTimeMs = adxConfig.getBatchTimeMs();
        this.incomingRecordsList = new ArrayList<Record<byte[]>>();
        this.adxSinkExecutor = Executors.newScheduledThreadPool(1);
        this.adxSinkExecutor.scheduleAtFixedRate(this::sinkData, this.batchTimeMs, this.batchTimeMs, TimeUnit.MILLISECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void write(Record<byte[]> record) {
        int runningSize;
        ADXSink aDXSink = this;
        synchronized (aDXSink) {
            this.incomingRecordsList.add(record);
            runningSize = this.incomingRecordsList.size();
        }
        if (runningSize == this.batchSize) {
            this.adxSinkExecutor.execute(this::sinkData);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sinkData() {
        List<Record<byte[]>> recordsToSink;
        ADXSink aDXSink = this;
        synchronized (aDXSink) {
            if (this.incomingRecordsList.isEmpty()) {
                return;
            }
            recordsToSink = this.incomingRecordsList;
            this.incomingRecordsList = new ArrayList<Record<byte[]>>();
        }
        LinkedList<ADXPulsarEvent> eventsToSink = new LinkedList<ADXPulsarEvent>();
        for (Record<byte[]> record : recordsToSink) {
            try {
                eventsToSink.add(this.getADXPulsarEvent(record));
            }
            catch (Exception ex) {
                record.fail();
                log.error("Failed to collect the record for ADX cluster.", (Throwable)ex);
            }
        }
        try {
            int retryAttempts = 0;
            while (true) {
                try {
                    block18: {
                        block17: {
                            StreamSourceInfo streamSourceInfo = new StreamSourceInfo((InputStream)new ByteArrayInputStream(this.mapper.writeValueAsBytes(eventsToSink)));
                            IngestionResult ingestionResult = this.ingestClient.ingestFromStream(streamSourceInfo, this.ingestionProperties);
                            if (!(ingestionResult instanceof TableReportIngestionResult)) break block17;
                            IngestionStatus ingestionStatus = (IngestionStatus)ingestionResult.getIngestionStatusCollection().get(0);
                            if (!this.hasStreamingSucceeded(ingestionStatus)) break block18;
                            recordsToSink.forEach(Record::ack);
                        }
                        return;
                    }
                    this.backOffForRemainingAttempts(retryAttempts += 3, null, recordsToSink);
                }
                catch (IngestionServiceException exception) {
                    Throwable innerException = exception.getCause();
                    if (innerException instanceof KustoDataExceptionBase && ((KustoDataExceptionBase)innerException).isPermanent()) {
                        recordsToSink.forEach(Record::fail);
                        throw new ConnectException(exception.getMessage());
                    }
                    this.backOffForRemainingAttempts(retryAttempts, (Exception)((Object)exception), recordsToSink);
                }
                catch (IngestionClientException | URISyntaxException exception) {
                    recordsToSink.forEach(Record::fail);
                    throw new ConnectException(exception.getMessage());
                }
                ++retryAttempts;
            }
        }
        catch (Exception ex) {
            log.error("Failed to publish the message to ADX cluster", (Throwable)ex);
            return;
        }
    }

    private boolean hasStreamingSucceeded(@NotNull IngestionStatus status) {
        switch (status.status) {
            case Succeeded: 
            case Queued: 
            case Pending: {
                return true;
            }
            case Skipped: 
            case PartiallySucceeded: {
                String failureStatus = status.getFailureStatus();
                String details = status.getDetails();
                UUID ingestionSourceId = status.getIngestionSourceId();
                log.warn("A batch of streaming records has {} ingestion: table:{}, database:{}, operationId: {},ingestionSourceId: {}{}{}.\nStatus is final and therefore ingestion won't be retried and data won't reach dlq", new Object[]{status.getStatus(), status.getTable(), status.getDatabase(), status.getOperationId(), ingestionSourceId, StringUtils.isNotEmpty((CharSequence)failureStatus) ? ", failure: " + failureStatus : "", StringUtils.isNotEmpty((CharSequence)details) ? ", details: " + details : ""});
                return true;
            }
        }
        return false;
    }

    private void backOffForRemainingAttempts(int retryAttempts, Exception exception, List<Record<byte[]>> records) throws PulsarClientException.ConnectException {
        if (retryAttempts < this.maxRetryAttempts) {
            long sleepTimeMs = this.retryBackOffTime;
            log.error("Failed to ingest records into Kusto, backing off and retrying ingesting records after {} milliseconds.", (Object)sleepTimeMs);
            try {
                TimeUnit.MILLISECONDS.sleep(sleepTimeMs);
                throw new InterruptedException();
            }
            catch (InterruptedException interruptedErr) {
                records.forEach(Record::fail);
                throw new PulsarClientException.ConnectException(String.format("Retrying ingesting records into KustoDB was interrupted after retryAttempts=%s", retryAttempts + 1));
            }
        }
        records.forEach(Record::fail);
        throw new PulsarClientException.ConnectException(String.format("Retry attempts exhausted, failed to ingest records into KustoDB. Exception: %s", exception.getMessage()));
    }

    @NotNull
    private ADXPulsarEvent getADXPulsarEvent(@NotNull Record<byte[]> record) throws Exception {
        ADXPulsarEvent event = new ADXPulsarEvent();
        record.getEventTime().ifPresent(time -> event.setEventTime(Instant.ofEpochMilli(time)));
        record.getKey().ifPresent(event::setKey);
        record.getMessage().ifPresent(message -> event.setProducerName(message.getProducerName()));
        record.getMessage().ifPresent(message -> event.setSequenceId(message.getSequenceId()));
        event.setValue(new String((byte[])record.getValue(), StandardCharsets.UTF_8));
        event.setProperties(new ObjectMapper().writeValueAsString((Object)record.getProperties()));
        return event;
    }

    private IngestionMapping.IngestionMappingKind getParseMappingRefType(String mappingRefType) {
        if (mappingRefType == null || mappingRefType.isEmpty()) {
            return null;
        }
        return switch (mappingRefType) {
            case "CSV" -> IngestionMapping.IngestionMappingKind.CSV;
            case "AVRO" -> IngestionMapping.IngestionMappingKind.AVRO;
            case "JSON" -> IngestionMapping.IngestionMappingKind.JSON;
            case "PARQUET" -> IngestionMapping.IngestionMappingKind.PARQUET;
            default -> null;
        };
    }

    private ConnectionStringBuilder getConnectionStringBuilder(@NotNull ADXSinkConfig adxConfig) {
        if (adxConfig.getManagedIdentityId() != null) {
            if ("system".equalsIgnoreCase(adxConfig.getManagedIdentityId())) {
                return ConnectionStringBuilder.createWithAadManagedIdentity((String)adxConfig.getClusterUrl());
            }
            ConnectionStringBuilder.createWithAadManagedIdentity((String)adxConfig.getClusterUrl(), (String)adxConfig.getManagedIdentityId());
        }
        return ConnectionStringBuilder.createWithAadApplicationCredentials((String)adxConfig.getClusterUrl(), (String)adxConfig.getAppId(), (String)adxConfig.getAppKey(), (String)adxConfig.getTenantId());
    }

    public void close() throws Exception {
        this.ingestClient.close();
        this.adxSinkExecutor.shutdown();
        try {
            if (!this.adxSinkExecutor.awaitTermination(2L * this.batchTimeMs, TimeUnit.MILLISECONDS)) {
                this.adxSinkExecutor.shutdownNow();
            }
        }
        catch (InterruptedException ie) {
            this.adxSinkExecutor.shutdownNow();
            Thread.currentThread().interrupt();
        }
        log.info("Kusto ingest client closed.");
    }
}

