/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.source.confluence;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import javax.inject.Named;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventType;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.confluence.ConfluenceItemInfo;
import org.opensearch.dataprepper.plugins.source.confluence.ConfluenceIterator;
import org.opensearch.dataprepper.plugins.source.confluence.ConfluenceService;
import org.opensearch.dataprepper.plugins.source.confluence.ConfluenceSourceConfig;
import org.opensearch.dataprepper.plugins.source.confluence.utils.HtmlToTextConversionUtil;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerClient;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerSourceConfig;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.PluginExecutorServiceProvider;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.PaginationCrawlerWorkerProgressState;
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Named
public class ConfluenceClient
implements CrawlerClient<PaginationCrawlerWorkerProgressState> {
    private static final Logger log = LoggerFactory.getLogger(ConfluenceClient.class);
    private ObjectMapper objectMapper = new ObjectMapper();
    private final ConfluenceService service;
    private final ConfluenceIterator confluenceIterator;
    private final ExecutorService executorService;
    private final CrawlerSourceConfig configuration;
    private final int bufferWriteTimeoutInSeconds = 10;
    private final boolean preserveContentFormatting;

    public ConfluenceClient(ConfluenceService service, ConfluenceIterator confluenceIterator, PluginExecutorServiceProvider executorServiceProvider, ConfluenceSourceConfig sourceConfig) {
        this.service = service;
        this.confluenceIterator = confluenceIterator;
        this.executorService = executorServiceProvider.get();
        this.configuration = sourceConfig;
        this.preserveContentFormatting = sourceConfig.isPreserveContentFormatting();
    }

    public Iterator<ItemInfo> listItems(Instant lastPollTime) {
        this.confluenceIterator.initialize(lastPollTime);
        return this.confluenceIterator;
    }

    @VisibleForTesting
    public void injectObjectMapper(ObjectMapper objectMapper) {
        this.objectMapper = objectMapper;
    }

    public void executePartition(PaginationCrawlerWorkerProgressState state, Buffer<Record<Event>> buffer, AcknowledgementSet acknowledgementSet) {
        log.trace("Executing the partition: {} with {} ticket(s)", (Object)state.getKeyAttributes(), (Object)state.getItemIds().size());
        List itemIds = state.getItemIds();
        Map keyAttributes = state.getKeyAttributes();
        String space = (String)keyAttributes.get("space");
        Instant eventTime = state.getExportStartTime();
        ArrayList<ConfluenceItemInfo> itemInfos = new ArrayList<ConfluenceItemInfo>();
        for (String itemId : itemIds) {
            if (itemId == null) continue;
            ConfluenceItemInfo itemInfo = ConfluenceItemInfo.builder().withItemId(itemId).withId(itemId).withSpace(space).withEventTime(eventTime).withMetadata(keyAttributes).build();
            itemInfos.add(itemInfo);
        }
        String eventType = EventType.DOCUMENT.toString();
        List<Record> recordsToWrite = itemInfos.parallelStream().map(t -> () -> this.service.getContent(t.getId())).map(supplier -> CompletableFuture.supplyAsync(supplier, this.executorService)).map(CompletableFuture::join).map(contentJson -> {
            try {
                ObjectNode contentJsonObj = (ObjectNode)this.objectMapper.readValue(contentJson, (TypeReference)new TypeReference<ObjectNode>(){});
                if (this.preserveContentFormatting) {
                    return contentJsonObj;
                }
                return HtmlToTextConversionUtil.convertHtmlToText(contentJsonObj, "body/view/value");
            }
            catch (JsonProcessingException e) {
                throw new RuntimeException(e);
            }
        }).map(t -> JacksonEvent.builder().withEventType(eventType).withData(t).build()).map(Record::new).peek(record -> ((Event)record.getData()).getMetadata().setAttribute("space", (Object)space.toLowerCase())).collect(Collectors.toList());
        try {
            if (this.configuration.isAcknowledgments()) {
                recordsToWrite.forEach(eventRecord -> acknowledgementSet.add((Event)eventRecord.getData()));
                buffer.writeAll(recordsToWrite, (int)Duration.ofSeconds(10L).toMillis());
                acknowledgementSet.complete();
            } else {
                buffer.writeAll(recordsToWrite, (int)Duration.ofSeconds(10L).toMillis());
            }
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

