/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.source.microsoft_office365;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import javax.inject.Named;
import org.opensearch.dataprepper.plugins.source.microsoft_office365.service.Office365Service;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.PluginExecutorServiceProvider;
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;

@Named
public class Office365Iterator
implements Iterator<ItemInfo> {
    private static final Logger log = LoggerFactory.getLogger(Office365Iterator.class);
    private static final int HAS_NEXT_TIMEOUT = 60;
    private final Office365Service service;
    private final ExecutorService crawlerTaskExecutor;
    private long crawlerQWaitTimeMillis = 2000L;
    private Queue<ItemInfo> itemInfoQueue;
    private Instant lastPollTime;
    private boolean firstTime = true;
    private final List<Future<Boolean>> futureList;

    public Office365Iterator(Office365Service service, PluginExecutorServiceProvider executorServiceProvider) {
        this.service = service;
        this.crawlerTaskExecutor = executorServiceProvider.get();
        this.futureList = new ArrayList<Future<Boolean>>();
    }

    @Override
    public boolean hasNext() {
        if (this.firstTime) {
            log.debug("Starting initial crawl for Office 365 audit logs");
            this.startCrawlerThreads();
            this.firstTime = false;
        }
        for (int timeout = 60; this.isCrawlerRunning() && this.isQueueEmpty() && timeout > 0; --timeout) {
            try {
                log.trace("Waiting for crawler queue to be filled, timeout in {} seconds", (Object)timeout);
                Thread.sleep(this.crawlerQWaitTimeMillis);
                continue;
            }
            catch (InterruptedException e) {
                log.error("Thread interrupted while waiting for crawler queue", (Throwable)e);
                Thread.currentThread().interrupt();
                return false;
            }
        }
        return !this.isQueueEmpty();
    }

    @Override
    public ItemInfo next() {
        if (this.hasNext()) {
            return this.itemInfoQueue.remove();
        }
        throw new NoSuchElementException("No more items available in the Office 365 audit log queue");
    }

    public void initialize(Instant startTime) {
        log.info("Initializing Office 365 iterator from timestamp: {}", (Object)startTime);
        this.itemInfoQueue = new ConcurrentLinkedQueue<ItemInfo>();
        this.lastPollTime = startTime;
        this.firstTime = true;
        this.futureList.clear();
    }

    private boolean isCrawlerRunning() {
        if (CollectionUtils.isEmpty(this.futureList)) {
            return false;
        }
        return this.futureList.stream().anyMatch(future -> !future.isDone());
    }

    private boolean isQueueEmpty() {
        return this.itemInfoQueue == null || this.itemInfoQueue.isEmpty();
    }

    void startCrawlerThreads() {
        log.debug("Starting crawler thread for Office 365 audit logs with lastPollTime: {}", (Object)this.lastPollTime);
        Future<Boolean> future = this.crawlerTaskExecutor.submit(() -> {
            try {
                this.service.getOffice365Entities(this.lastPollTime, this.itemInfoQueue);
                log.debug("Completed crawler thread for Office 365 audit logs with lastPollTime: {}", (Object)this.lastPollTime);
                return true;
            }
            catch (Exception e) {
                log.error("Error in crawler thread while fetching Office 365 audit logs", (Throwable)e);
                return false;
            }
        });
        this.futureList.add(future);
    }

    public void setCrawlerQWaitTimeMillis(long crawlerQWaitTimeMillis) {
        this.crawlerQWaitTimeMillis = crawlerQWaitTimeMillis;
    }
}

