/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.exporter;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.core.BulkRequest;
import co.elastic.clients.transport.ElasticsearchTransport;
import io.camunda.exporter.DefaultExporterResourceProvider;
import io.camunda.exporter.ExporterResourceProvider;
import io.camunda.exporter.clients.elasticsearch.ElasticsearchClientFactory;
import io.camunda.exporter.config.ElasticsearchExporterConfiguration;
import io.camunda.exporter.exceptions.ElasticsearchExporterException;
import io.camunda.exporter.exceptions.PersistenceException;
import io.camunda.exporter.handlers.AuthorizationRecordValueExportHandler;
import io.camunda.exporter.handlers.UserRecordValueExportHandler;
import io.camunda.exporter.store.ElasticsearchBatchRequest;
import io.camunda.exporter.store.ExporterBatchWriter;
import io.camunda.exporter.utils.ElasticsearchScriptBuilder;
import io.camunda.zeebe.exporter.api.Exporter;
import io.camunda.zeebe.exporter.api.context.Context;
import io.camunda.zeebe.exporter.api.context.Controller;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.ValueType;
import java.time.Duration;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CamundaExporter
implements Exporter {
    private static final Logger LOG = LoggerFactory.getLogger(CamundaExporter.class);
    private Controller controller;
    private ElasticsearchExporterConfiguration configuration;
    private ElasticsearchClient client;
    private ExporterBatchWriter writer;
    private long lastPosition = -1L;
    private final ExporterResourceProvider provider;

    public CamundaExporter() {
        this(new DefaultExporterResourceProvider());
    }

    public CamundaExporter(ExporterResourceProvider provider) {
        this.provider = provider;
    }

    public void configure(Context context) {
        this.configuration = (ElasticsearchExporterConfiguration)context.getConfiguration().instantiate(ElasticsearchExporterConfiguration.class);
        context.setFilter((Context.RecordFilter)new ElasticsearchRecordFilter());
        LOG.debug("Exporter configured with {}", (Object)this.configuration);
    }

    public void open(Controller controller) {
        this.controller = controller;
        this.client = this.createClient();
        this.writer = this.createBatchWriter();
        this.scheduleDelayedFlush();
        LOG.info("Exporter opened");
    }

    public void close() {
        try {
            this.flush();
            this.updateLastExportedPosition();
        }
        catch (Exception e) {
            LOG.warn("Failed to flush records before closing exporter.", (Throwable)e);
        }
        try {
            ((ElasticsearchTransport)this.client._transport()).close();
        }
        catch (Exception e) {
            LOG.warn("Failed to close elasticsearch client", (Throwable)e);
        }
        LOG.info("Exporter closed");
    }

    public void export(Record<?> record) {
        this.writer.addRecord(record);
        this.lastPosition = record.getPosition();
        if (this.shouldFlush()) {
            this.flush();
            this.updateLastExportedPosition();
        }
    }

    private ElasticsearchClient createClient() {
        return ElasticsearchClientFactory.INSTANCE.create(this.configuration.elasticsearch);
    }

    private boolean shouldFlush() {
        return this.writer.getBatchSize() >= this.configuration.bulk.getSize();
    }

    private ExporterBatchWriter createBatchWriter() {
        return ExporterBatchWriter.Builder.begin().withHandler(new UserRecordValueExportHandler()).withHandler(new AuthorizationRecordValueExportHandler()).build();
    }

    private void scheduleDelayedFlush() {
        this.controller.scheduleCancellableTask(Duration.ofSeconds(this.configuration.bulk.getDelay()), this::flushAndReschedule);
    }

    private void flushAndReschedule() {
        try {
            this.flush();
            this.updateLastExportedPosition();
        }
        catch (Exception e) {
            LOG.warn("Unexpected exception occurred on periodically flushing bulk, will retry later.", (Throwable)e);
        }
        this.scheduleDelayedFlush();
    }

    private void flush() {
        try {
            ElasticsearchBatchRequest batchRequest = new ElasticsearchBatchRequest(this.client, new BulkRequest.Builder(), new ElasticsearchScriptBuilder());
            this.writer.flush(batchRequest);
        }
        catch (PersistenceException ex) {
            throw new ElasticsearchExporterException(ex.getMessage(), ex);
        }
    }

    private void updateLastExportedPosition() {
        this.controller.updateLastExportedRecordPosition(this.lastPosition);
    }

    private record ElasticsearchRecordFilter() implements Context.RecordFilter
    {
        private static final Set<ValueType> VALUE_TYPES_2_EXPORT = Set.of(ValueType.USER, ValueType.AUTHORIZATION);

        public boolean acceptType(RecordType recordType) {
            return recordType.equals((Object)RecordType.EVENT);
        }

        public boolean acceptValue(ValueType valueType) {
            return VALUE_TYPES_2_EXPORT.contains(valueType);
        }
    }
}

