/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.source.crowdstrike;

import java.time.Duration;
import java.time.Instant;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.inject.Named;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventType;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.crowdstrike.CrowdStrikeService;
import org.opensearch.dataprepper.plugins.source.crowdstrike.CrowdStrikeSourceConfig;
import org.opensearch.dataprepper.plugins.source.crowdstrike.models.CrowdStrikeIndicatorResult;
import org.opensearch.dataprepper.plugins.source.crowdstrike.models.CrowdStrikeThreatIntelApiResponse;
import org.opensearch.dataprepper.plugins.source.crowdstrike.models.ThreatIndicator;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerClient;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.CrowdStrikeWorkerProgressState;
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Named
public class CrowdStrikeClient
implements CrawlerClient<CrowdStrikeWorkerProgressState> {
    CrowdStrikeService crowdStrikeService;
    private static final Logger log = LoggerFactory.getLogger(CrowdStrikeClient.class);
    private final CrowdStrikeSourceConfig configuration;
    private final int bufferWriteTimeoutInSeconds = 10;

    public CrowdStrikeClient(CrowdStrikeService crowdStrikeService, CrowdStrikeSourceConfig sourceConfig) {
        log.info("Creating CrowdStrike Crawler");
        this.crowdStrikeService = crowdStrikeService;
        this.configuration = sourceConfig;
        log.info("Created CrowdStrike Crawler");
    }

    public Iterator<ItemInfo> listItems(Instant lastPollTime) {
        return null;
    }

    public void executePartition(CrowdStrikeWorkerProgressState state, Buffer<Record<Event>> buffer, AcknowledgementSet acknowledgementSet) {
        CrowdStrikeThreatIntelApiResponse response;
        Instant startTime = state.getStartTime();
        Instant endTime = state.getEndTime();
        Optional<String> paginationLink = Optional.empty();
        do {
            CrowdStrikeIndicatorResult result;
            List<ThreatIndicator> indicators;
            if ((indicators = (result = (response = this.crowdStrikeService.getThreatIndicators(startTime, endTime, paginationLink)).getBody()).getResults()) == null || indicators.isEmpty()) {
                log.info("No threat indicators found for the time window {} to {}", (Object)startTime, (Object)endTime);
                continue;
            }
            this.writeIndicatorsToBuffer(indicators, buffer);
        } while ((paginationLink = response.getFirstHeaderValue("Next-Page")).isPresent());
        if (this.configuration.isAcknowledgments()) {
            acknowledgementSet.complete();
        }
    }

    private void writeIndicatorsToBuffer(List<ThreatIndicator> indicators, Buffer<Record<Event>> buffer) {
        List records = indicators.parallelStream().map(indicator -> JacksonEvent.builder().withEventType(EventType.DOCUMENT.toString()).withData(indicator).build()).map(Record::new).collect(Collectors.toList());
        try {
            buffer.writeAll(records, (int)Duration.ofSeconds(10L).toMillis());
        }
        catch (Exception e) {
            log.error("Failed to write {} indicators to buffer", (Object)records.size(), (Object)e);
            throw new RuntimeException("Buffer write failed for CrowdStrike indicators", e);
        }
    }
}

