/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.source.microsoft_office365;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import java.lang.invoke.LambdaMetafactory;
import java.time.Duration;
import java.time.Instant;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.inject.Named;
import org.opensearch.dataprepper.logging.DataPrepperMarkers;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventType;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.microsoft_office365.Office365Iterator;
import org.opensearch.dataprepper.plugins.source.microsoft_office365.Office365SourceConfig;
import org.opensearch.dataprepper.plugins.source.microsoft_office365.exception.Office365Exception;
import org.opensearch.dataprepper.plugins.source.microsoft_office365.service.Office365Service;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerClient;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.PluginExecutorServiceProvider;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.PaginationCrawlerWorkerProgressState;
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.client.HttpClientErrorException;
import org.springframework.web.client.ResourceAccessException;

@Named
public class Office365CrawlerClient
implements CrawlerClient<PaginationCrawlerWorkerProgressState> {
    private static final Logger log = LoggerFactory.getLogger(Office365CrawlerClient.class);
    private static final String BUFFER_WRITE_LATENCY = "bufferWriteLatency";
    private static final String BUFFER_WRITE_ATTEMPTS = "bufferWriteAttempts";
    private static final String BUFFER_WRITE_SUCCESS = "bufferWriteSuccess";
    private static final String BUFFER_WRITE_RETRY_SUCCESS = "bufferWriteRetrySuccess";
    private static final String BUFFER_WRITE_RETRY_ATTEMPTS = "bufferWriteRetryAttempts";
    private static final String BUFFER_WRITE_FAILURES = "bufferWriteFailures";
    private static final String WORKER_STATE_UPDATES = "workerStateUpdates";
    private static final String CONTENT_TYPE = "contentType";
    private static final int BUFFER_TIMEOUT_IN_SECONDS = 10;
    private final Office365Service service;
    private final Office365Iterator office365Iterator;
    private final ExecutorService executorService;
    private final Office365SourceConfig configuration;
    private final Timer bufferWriteLatencyTimer;
    private final Counter bufferWriteAttemptsCounter;
    private final Counter bufferWriteSuccessCounter;
    private final Counter bufferWriteRetrySuccessCounter;
    private final Counter bufferWriteRetryAttemptsCounter;
    private final Counter bufferWriteFailuresCounter;
    private ObjectMapper objectMapper;

    public Office365CrawlerClient(Office365Service service, Office365Iterator office365Iterator, PluginExecutorServiceProvider executorServiceProvider, Office365SourceConfig sourceConfig, PluginMetrics pluginMetrics) {
        this.service = service;
        this.office365Iterator = office365Iterator;
        this.executorService = executorServiceProvider.get();
        this.configuration = sourceConfig;
        this.objectMapper = new ObjectMapper();
        this.bufferWriteLatencyTimer = pluginMetrics.timer(BUFFER_WRITE_LATENCY);
        this.bufferWriteAttemptsCounter = pluginMetrics.counter(BUFFER_WRITE_ATTEMPTS);
        this.bufferWriteSuccessCounter = pluginMetrics.counter(BUFFER_WRITE_SUCCESS);
        this.bufferWriteRetrySuccessCounter = pluginMetrics.counter(BUFFER_WRITE_RETRY_SUCCESS);
        this.bufferWriteRetryAttemptsCounter = pluginMetrics.counter(BUFFER_WRITE_RETRY_ATTEMPTS);
        this.bufferWriteFailuresCounter = pluginMetrics.counter(BUFFER_WRITE_FAILURES);
    }

    @VisibleForTesting
    void injectObjectMapper(ObjectMapper objectMapper) {
        this.objectMapper = objectMapper;
    }

    public Iterator<ItemInfo> listItems(Instant lastPollTime) {
        log.info("Starting to list Office 365 audit logs from {}", (Object)lastPollTime);
        this.service.initializeSubscriptions();
        this.office365Iterator.initialize(lastPollTime);
        return this.office365Iterator;
    }

    public void executePartition(PaginationCrawlerWorkerProgressState state, Buffer<Record<Event>> buffer, AcknowledgementSet acknowledgementSet) {
        log.info("Starting to execute partition with {} log(s)", (Object)state.getItemIds().size());
        List itemIds = state.getItemIds();
        List records = itemIds.stream().map(id -> {
            try {
                return this.processAuditLog((String)id);
            }
            catch (Office365Exception e) {
                log.error(DataPrepperMarkers.NOISY, "{} error processing audit log: {}", new Object[]{e.isRetryable() ? "Retryable" : "Non-retryable", id, e});
                if (e.isRetryable()) {
                    throw new RuntimeException("Retryable error processing audit log: " + id, e);
                }
                log.error(DataPrepperMarkers.NOISY, "Non-retryable error - record will be dropped. Error processing audit log: {}", id, (Object)e);
                return null;
            }
            catch (Exception e) {
                log.error(DataPrepperMarkers.NOISY, "Unexpected error processing audit log: {}", id, (Object)e);
                throw new RuntimeException("Unexpected error processing audit log: " + id, e);
            }
        }).filter(Objects::nonNull).collect(Collectors.toList());
        this.bufferWriteLatencyTimer.record(() -> {
            try {
                this.writeRecordsWithRetry(records, buffer, acknowledgementSet, state);
            }
            catch (Exception e) {
                this.bufferWriteFailuresCounter.increment();
                throw e;
            }
        });
    }

    private Record<Event> processAuditLog(String id) {
        try {
            String auditLog = this.service.getAuditLog(id);
            if (auditLog == null) {
                throw new Office365Exception("Received null audit log for ID: " + id, false);
            }
            try {
                JsonNode jsonNode = this.objectMapper.readTree(auditLog);
                Map data = jsonNode.isArray() && !jsonNode.isEmpty() ? (Map)this.objectMapper.convertValue((Object)jsonNode.get(0), (TypeReference)new TypeReference<Map<String, Object>>(){}) : (Map)this.objectMapper.readValue(auditLog, (TypeReference)new TypeReference<Map<String, Object>>(){});
                String contentType = (String)data.get("Workload");
                if (contentType == null) {
                    throw new Office365Exception("Missing Workload field in audit log: " + id, false);
                }
                JacksonEvent event = JacksonEvent.builder().withEventType(EventType.LOG.toString()).withData((Object)data).build();
                event.getMetadata().setAttribute(CONTENT_TYPE, (Object)contentType);
                return new Record((Object)event);
            }
            catch (JsonProcessingException e) {
                throw new Office365Exception("Failed to parse audit log: " + id, e, false);
            }
        }
        catch (HttpClientErrorException e) {
            switch (e.getStatusCode()) {
                case UNAUTHORIZED: 
                case FORBIDDEN: {
                    throw new Office365Exception("Authentication failed while fetching audit log: " + id, e, true);
                }
                case NOT_FOUND: {
                    throw new Office365Exception("Audit log not found: " + id, e, false);
                }
                case TOO_MANY_REQUESTS: {
                    throw new Office365Exception("Rate limited while fetching audit log: " + id, e, true);
                }
            }
            throw new Office365Exception("Client error while fetching audit log: " + id, e, false);
        }
        catch (ResourceAccessException e) {
            throw new Office365Exception("Network error while fetching audit log: " + id, e, true);
        }
    }

    /*
     * Unable to fully structure code
     */
    private void writeRecordsWithRetry(List<Record<Event>> records, Buffer<Record<Event>> buffer, AcknowledgementSet acknowledgementSet, PaginationCrawlerWorkerProgressState state) {
        this.bufferWriteAttemptsCounter.increment();
        retryCount = 0;
        currentBackoff = 1000;
        maxBackoff = 30000;
        maxRetries = 5;
        while (true) lbl-1000:
        // 2 sources

        {
            try {
                if (this.configuration.isAcknowledgments()) {
                    records.forEach((Consumer<Record>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)V, lambda$writeRecordsWithRetry$2(org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet org.opensearch.dataprepper.model.record.Record ), (Lorg/opensearch/dataprepper/model/record/Record;)V)((AcknowledgementSet)acknowledgementSet));
                    buffer.writeAll(records, (int)Duration.ofSeconds(10L).toMillis());
                    acknowledgementSet.complete();
                } else {
                    buffer.writeAll(records, (int)Duration.ofSeconds(10L).toMillis());
                }
                if (retryCount > 0) {
                    this.bufferWriteRetrySuccessCounter.increment();
                } else {
                    this.bufferWriteSuccessCounter.increment();
                }
                return;
            }
            catch (TimeoutException e) {
                if (++retryCount >= 5) {
                    this.bufferWriteFailuresCounter.increment();
                    throw new RuntimeException("Failed to write to buffer after 5 attempts", e);
                }
                this.bufferWriteRetryAttemptsCounter.increment();
                currentBackoff = Math.min((int)((double)currentBackoff * 2.0), 30000);
                Office365CrawlerClient.log.info("Buffer full, backing off for {} ms before retry", (Object)currentBackoff);
                try {
                    Thread.sleep(currentBackoff);
                    continue;
                }
                catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException("Buffer write retry interrupted", ie);
                }
            }
            ** while (true)
            catch (Exception e) {
                this.bufferWriteFailuresCounter.increment();
                throw new RuntimeException("Error writing to buffer", e);
            }
            break;
        }
    }

    private static /* synthetic */ void lambda$writeRecordsWithRetry$2(AcknowledgementSet acknowledgementSet, Record record) {
        acknowledgementSet.add((Event)record.getData());
    }
}

