/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.operate.archiver;

import io.camunda.operate.Metrics;
import io.camunda.operate.archiver.ArchiveBatch;
import io.camunda.operate.archiver.ArchiverRepository;
import io.camunda.operate.conditions.OpensearchCondition;
import io.camunda.operate.property.OperateProperties;
import io.camunda.operate.schema.templates.BatchOperationTemplate;
import io.camunda.operate.schema.templates.ListViewTemplate;
import io.camunda.operate.store.opensearch.client.sync.RichOpenSearchClient;
import io.camunda.operate.store.opensearch.dsl.AggregationDSL;
import io.camunda.operate.store.opensearch.dsl.QueryDSL;
import io.camunda.operate.store.opensearch.dsl.RequestDSL;
import io.camunda.operate.util.FutureHelper;
import io.micrometer.core.instrument.Timer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.opensearch.client.opensearch._types.Conflicts;
import org.opensearch.client.opensearch._types.SortOptions;
import org.opensearch.client.opensearch._types.SortOrder;
import org.opensearch.client.opensearch._types.aggregations.Aggregate;
import org.opensearch.client.opensearch._types.aggregations.Aggregation;
import org.opensearch.client.opensearch._types.aggregations.DateHistogramAggregation;
import org.opensearch.client.opensearch._types.aggregations.DateHistogramBucket;
import org.opensearch.client.opensearch._types.query_dsl.Query;
import org.opensearch.client.opensearch.core.DeleteByQueryRequest;
import org.opensearch.client.opensearch.core.ReindexRequest;
import org.opensearch.client.opensearch.core.SearchRequest;
import org.opensearch.client.opensearch.core.SearchResponse;
import org.opensearch.client.opensearch.core.search.SourceConfig;
import org.opensearch.client.opensearch.indices.IndexState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Conditional;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Component;

@Conditional(value={OpensearchCondition.class})
@Component
public class OpensearchArchiverRepository
implements ArchiverRepository {
    private static final Logger LOGGER = LoggerFactory.getLogger(OpensearchArchiverRepository.class);
    @Autowired
    protected RichOpenSearchClient richOpenSearchClient;
    @Autowired
    @Qualifier(value="archiverThreadPoolExecutor")
    protected ThreadPoolTaskScheduler archiverExecutor;
    @Autowired
    private BatchOperationTemplate batchOperationTemplate;
    @Autowired
    private ListViewTemplate processInstanceTemplate;
    @Autowired
    private OperateProperties operateProperties;
    @Autowired
    private Metrics metrics;

    private <R> ArchiveBatch createArchiveBatch(SearchResponse<R> searchResponse, String datesAggName, String instancesAggName) {
        Map buckets = ((Aggregate)searchResponse.aggregations().get(datesAggName)).dateHistogram().buckets().keyed();
        if (buckets.size() > 0) {
            Map.Entry entry = buckets.entrySet().iterator().next();
            List hits = ((Aggregate)((DateHistogramBucket)entry.getValue()).aggregations().get(instancesAggName)).topHits().hits().hits();
            List<Object> ids = hits.stream().map(hit -> hit.id()).toList();
            return new ArchiveBatch((String)entry.getKey(), ids);
        }
        return null;
    }

    private CompletableFuture<ArchiveBatch> search(SearchRequest.Builder searchRequestBuilder, Function<Exception, String> errorMessage) {
        return FutureHelper.withTimer((Timer)this.metrics.getTimer("operate.archiver.query", new String[0]), () -> this.richOpenSearchClient.async().doc().search(searchRequestBuilder, Object.class, errorMessage)).thenApply(response -> this.createArchiveBatch((SearchResponse)response, "datesAgg", "instancesAgg"));
    }

    private SearchRequest.Builder nextBatchSearchRequestBuilder(String index, String idColumn, String endDateField, Query query) {
        String format = this.operateProperties.getArchiver().getElsRolloverDateFormat();
        String interval = this.operateProperties.getArchiver().getRolloverInterval();
        int rollOverBatchSize = this.operateProperties.getArchiver().getRolloverBatchSize();
        Aggregation agg = AggregationDSL.withSubaggregations((DateHistogramAggregation)AggregationDSL.dateHistogramAggregation((String)"endDate", (String)interval, (String)format, (boolean)true), Map.of("datesSortedAgg", AggregationDSL.bucketSortAggregation((Integer)1, (SortOptions[])new SortOptions[]{QueryDSL.sortOptions((String)"_key", (SortOrder)SortOrder.Asc)})._toAggregation(), "instancesAgg", AggregationDSL.topHitsAggregation(List.of(idColumn), (int)rollOverBatchSize, (SortOptions[])new SortOptions[]{QueryDSL.sortOptions((String)idColumn, (SortOrder)SortOrder.Asc)})._toAggregation()));
        return RequestDSL.searchRequestBuilder((String)index).query(query).aggregations("datesAgg", agg).source(SourceConfig.of(b -> b.fetch(Boolean.valueOf(false)))).size(Integer.valueOf(0)).sort(QueryDSL.sortOptions((String)endDateField, (SortOrder)SortOrder.Asc), new SortOptions[0]).requestCache(Boolean.valueOf(false));
    }

    @Override
    public CompletableFuture<ArchiveBatch> getBatchOperationNextBatch() {
        Query query = QueryDSL.constantScore((Query)QueryDSL.lte((String)"endDate", (Object)this.operateProperties.getArchiver().getArchivingTimepoint()));
        SearchRequest.Builder searchRequestBuilder = this.nextBatchSearchRequestBuilder(this.batchOperationTemplate.getFullQualifiedName(), "id", "endDate", query);
        return this.search(searchRequestBuilder, e -> "Failed to search in " + this.batchOperationTemplate.getFullQualifiedName());
    }

    @Override
    public CompletableFuture<ArchiveBatch> getProcessInstancesNextBatch(List<Integer> partitionIds) {
        Query query = QueryDSL.constantScore((Query)QueryDSL.and((Query[])new Query[]{QueryDSL.lte((String)"endDate", (Object)this.operateProperties.getArchiver().getArchivingTimepoint()), QueryDSL.term((String)"joinRelation", (String)"processInstance"), QueryDSL.intTerms((String)"partitionId", partitionIds)}));
        SearchRequest.Builder searchRequestBuilder = this.nextBatchSearchRequestBuilder(this.processInstanceTemplate.getFullQualifiedName(), "id", "endDate", query);
        return this.search(searchRequestBuilder, e -> "Failed to search in " + this.batchOperationTemplate.getFullQualifiedName());
    }

    @Override
    public void setIndexLifeCycle(String destinationIndexName) {
        try {
            if (this.operateProperties.getArchiver().isIlmEnabled() && this.richOpenSearchClient.index().indexExists(destinationIndexName)) {
                this.richOpenSearchClient.ism().addPolicyToIndex(destinationIndexName, "operate_delete_archived_indices");
            }
        }
        catch (Exception e) {
            LOGGER.warn("Could not set ILM policy {} for index {}: {}", new Object[]{"operate_delete_archived_indices", destinationIndexName, e.getMessage()});
        }
    }

    @Override
    public CompletableFuture<Void> deleteDocuments(String sourceIndexName, String idFieldName, List<Object> processInstanceKeys) {
        DeleteByQueryRequest.Builder deleteByQueryRequestBuilder = RequestDSL.deleteByQueryRequestBuilder((String)sourceIndexName).query(QueryDSL.stringTerms((String)idFieldName, processInstanceKeys.stream().map(Object::toString).toList())).waitForCompletion(Boolean.valueOf(false)).slices(Long.valueOf(this.getAutoSlices())).conflicts(Conflicts.Proceed);
        return FutureHelper.withTimer((Timer)this.metrics.getTimer("operate.archiver.delete.query", new String[0]), () -> this.richOpenSearchClient.async().doc().delete(deleteByQueryRequestBuilder, e -> "Failed to delete asynchronously from " + sourceIndexName).thenAccept(response -> this.richOpenSearchClient.async().task().totalImpactedByTask(response.task(), this.archiverExecutor)));
    }

    @Override
    public CompletableFuture<Void> reindexDocuments(String sourceIndexName, String destinationIndexName, String idFieldName, List<Object> processInstanceKeys) {
        if (!this.richOpenSearchClient.index().indexExists(destinationIndexName)) {
            this.createIndexAs(sourceIndexName, destinationIndexName);
        }
        String errorMessage = String.format("Failed to reindex asynchronously from %s to %s!", sourceIndexName, destinationIndexName);
        Query sourceQuery = QueryDSL.stringTerms((String)idFieldName, processInstanceKeys.stream().map(Object::toString).toList());
        ReindexRequest.Builder reindexRequest = RequestDSL.reindexRequestBuilder((String)sourceIndexName, (Query)sourceQuery, (String)destinationIndexName).waitForCompletion(Boolean.valueOf(false)).scroll(RequestDSL.time((String)"30000ms")).slices(Long.valueOf(this.getAutoSlices())).conflicts(Conflicts.Proceed);
        return FutureHelper.withTimer((Timer)this.metrics.getTimer("operate.archiver.reindex.query", new String[0]), () -> this.richOpenSearchClient.async().index().reindex(reindexRequest, e -> errorMessage).thenAccept(response -> this.richOpenSearchClient.async().task().totalImpactedByTask(response.task(), this.archiverExecutor)));
    }

    private long getAutoSlices() {
        return this.operateProperties.getOpensearch().getNumberOfShards();
    }

    private void createIndexAs(String sourceIndexName, String destinationIndexName) {
        IndexState srcIndex = (IndexState)this.richOpenSearchClient.index().get(RequestDSL.getIndexRequestBuilder((String)sourceIndexName)).get(sourceIndexName);
        this.richOpenSearchClient.index().createIndexWithRetries(RequestDSL.createIndexRequestBuilder((String)destinationIndexName, (IndexState)srcIndex).build());
    }
}

