/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.elasticsearch.sink;

import java.io.IOException;
import java.util.List;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.connector.elasticsearch.NetworkClientConfig;
import org.apache.flink.connector.elasticsearch.sink.BulkProcessorBuilderFactory;
import org.apache.flink.connector.elasticsearch.sink.BulkProcessorConfig;
import org.apache.flink.connector.elasticsearch.sink.BulkResponseInspector;
import org.apache.flink.connector.elasticsearch.sink.ElasticsearchEmitter;
import org.apache.flink.connector.elasticsearch.sink.FailureHandler;
import org.apache.flink.connector.elasticsearch.sink.RequestIndexer;
import org.apache.flink.connector.elasticsearch.utils.RestClientUtils;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingRunnable;
import org.apache.http.HttpHost;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.rest.RestStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ElasticsearchWriter<IN>
implements SinkWriter<IN> {
    private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchWriter.class);
    private final ElasticsearchEmitter<? super IN> emitter;
    private final MailboxExecutor mailboxExecutor;
    private final boolean flushOnCheckpoint;
    private final BulkProcessor bulkProcessor;
    private final RestHighLevelClient client;
    private final RequestIndexer requestIndexer;
    private final Counter numBytesOutCounter;
    private long pendingActions = 0L;
    private boolean checkpointInProgress = false;
    private volatile long lastSendTime = 0L;
    private volatile long ackTime = Long.MAX_VALUE;
    private volatile boolean closed = false;

    ElasticsearchWriter(List<HttpHost> hosts, ElasticsearchEmitter<? super IN> emitter, boolean flushOnCheckpoint, BulkProcessorConfig bulkProcessorConfig, BulkProcessorBuilderFactory bulkProcessorBuilderFactory, BulkResponseInspector bulkResponseInspector, NetworkClientConfig networkClientConfig, SinkWriterMetricGroup metricGroup, MailboxExecutor mailboxExecutor) {
        this.emitter = (ElasticsearchEmitter)Preconditions.checkNotNull(emitter);
        this.flushOnCheckpoint = flushOnCheckpoint;
        this.mailboxExecutor = (MailboxExecutor)Preconditions.checkNotNull((Object)mailboxExecutor);
        this.client = new RestHighLevelClient(RestClientUtils.configureRestClientBuilder(RestClient.builder((HttpHost[])hosts.toArray(new HttpHost[0])), networkClientConfig));
        this.bulkProcessor = this.createBulkProcessor(bulkProcessorBuilderFactory, bulkProcessorConfig, (BulkResponseInspector)Preconditions.checkNotNull((Object)bulkResponseInspector));
        Preconditions.checkNotNull((Object)metricGroup);
        this.requestIndexer = new DefaultRequestIndexer(metricGroup.getNumRecordsSendCounter());
        metricGroup.setCurrentSendTimeGauge(() -> this.ackTime - this.lastSendTime);
        this.numBytesOutCounter = metricGroup.getIOMetricGroup().getNumBytesOutCounter();
        try {
            emitter.open();
        }
        catch (Exception e) {
            throw new FlinkRuntimeException("Failed to open the ElasticsearchEmitter", (Throwable)e);
        }
    }

    public void write(IN element, SinkWriter.Context context) throws IOException, InterruptedException {
        while (this.checkpointInProgress) {
            this.mailboxExecutor.yield();
        }
        this.emitter.emit(element, context, this.requestIndexer);
    }

    public void flush(boolean endOfInput) throws IOException, InterruptedException {
        this.checkpointInProgress = true;
        while (this.pendingActions != 0L && (this.flushOnCheckpoint || endOfInput)) {
            this.bulkProcessor.flush();
            LOG.info("Waiting for the response of {} pending actions.", (Object)this.pendingActions);
            this.mailboxExecutor.yield();
        }
        this.checkpointInProgress = false;
    }

    @VisibleForTesting
    void blockingFlushAllActions() throws InterruptedException {
        while (this.pendingActions != 0L) {
            this.bulkProcessor.flush();
            LOG.info("Waiting for the response of {} pending actions.", (Object)this.pendingActions);
            this.mailboxExecutor.yield();
        }
    }

    public void close() throws Exception {
        this.closed = true;
        this.emitter.close();
        this.bulkProcessor.close();
        this.client.close();
    }

    private BulkProcessor createBulkProcessor(BulkProcessorBuilderFactory bulkProcessorBuilderFactory, BulkProcessorConfig bulkProcessorConfig, BulkResponseInspector bulkResponseInspector) {
        BulkProcessor.Builder builder = (BulkProcessor.Builder)bulkProcessorBuilderFactory.apply(this.client, bulkProcessorConfig, new BulkListener(bulkResponseInspector));
        builder.setConcurrentRequests(0);
        return builder.build();
    }

    private void enqueueActionInMailbox(ThrowingRunnable<? extends Exception> action, String actionName) {
        if (this.isClosed()) {
            return;
        }
        this.mailboxExecutor.execute(action, actionName);
    }

    private static Throwable wrapException(RestStatus restStatus, Throwable rootFailure, DocWriteRequest<?> actionRequest) {
        if (restStatus == null) {
            return new FlinkRuntimeException(String.format("Single action %s of bulk request failed.", actionRequest), rootFailure);
        }
        return new FlinkRuntimeException(String.format("Single action %s of bulk request failed with status %s.", actionRequest, restStatus.getStatus()), rootFailure);
    }

    private boolean isClosed() {
        if (this.closed) {
            LOG.warn("Writer was closed before all records were acknowledged by Elasticsearch.");
        }
        return this.closed;
    }

    static class DefaultFailureHandler
    implements FailureHandler {
        DefaultFailureHandler() {
        }

        @Override
        public void onFailure(Throwable failure) {
            if (failure instanceof FlinkRuntimeException) {
                throw (FlinkRuntimeException)failure;
            }
            throw new FlinkRuntimeException(failure);
        }
    }

    static class DefaultBulkResponseInspector
    implements BulkResponseInspector {
        @VisibleForTesting
        final FailureHandler failureHandler;

        DefaultBulkResponseInspector() {
            this(new DefaultFailureHandler());
        }

        DefaultBulkResponseInspector(FailureHandler failureHandler) {
            this.failureHandler = (FailureHandler)Preconditions.checkNotNull((Object)failureHandler);
        }

        @Override
        public void inspect(BulkRequest request, BulkResponse response) {
            if (!response.hasFailures()) {
                return;
            }
            Throwable chainedFailures = null;
            for (int i = 0; i < response.getItems().length; ++i) {
                Exception failure;
                BulkItemResponse itemResponse = response.getItems()[i];
                if (!itemResponse.isFailed() || (failure = itemResponse.getFailure().getCause()) == null) continue;
                RestStatus restStatus = itemResponse.getFailure().getStatus();
                DocWriteRequest actionRequest = (DocWriteRequest)request.requests().get(i);
                chainedFailures = ExceptionUtils.firstOrSuppressed((Throwable)ElasticsearchWriter.wrapException(restStatus, failure, actionRequest), chainedFailures);
            }
            if (chainedFailures == null) {
                return;
            }
            this.failureHandler.onFailure(chainedFailures);
        }
    }

    private class DefaultRequestIndexer
    implements RequestIndexer {
        private final Counter numRecordsSendCounter;

        public DefaultRequestIndexer(Counter numRecordsSendCounter) {
            this.numRecordsSendCounter = (Counter)Preconditions.checkNotNull((Object)numRecordsSendCounter);
        }

        @Override
        public void add(DeleteRequest ... deleteRequests) {
            for (DeleteRequest deleteRequest : deleteRequests) {
                this.numRecordsSendCounter.inc();
                ++ElasticsearchWriter.this.pendingActions;
                ElasticsearchWriter.this.bulkProcessor.add(deleteRequest);
            }
        }

        @Override
        public void add(IndexRequest ... indexRequests) {
            for (IndexRequest indexRequest : indexRequests) {
                this.numRecordsSendCounter.inc();
                ++ElasticsearchWriter.this.pendingActions;
                ElasticsearchWriter.this.bulkProcessor.add(indexRequest);
            }
        }

        @Override
        public void add(UpdateRequest ... updateRequests) {
            for (UpdateRequest updateRequest : updateRequests) {
                this.numRecordsSendCounter.inc();
                ++ElasticsearchWriter.this.pendingActions;
                ElasticsearchWriter.this.bulkProcessor.add((DocWriteRequest)updateRequest);
            }
        }

        @Override
        public void flush() {
            ElasticsearchWriter.this.bulkProcessor.flush();
        }
    }

    private class BulkListener
    implements BulkProcessor.Listener {
        private final BulkResponseInspector bulkResponseInspector;

        public BulkListener(BulkResponseInspector bulkResponseInspector) {
            this.bulkResponseInspector = bulkResponseInspector;
        }

        public void beforeBulk(long executionId, BulkRequest request) {
            LOG.info("Sending bulk of {} actions to Elasticsearch.", (Object)request.numberOfActions());
            ElasticsearchWriter.this.lastSendTime = System.currentTimeMillis();
            ElasticsearchWriter.this.numBytesOutCounter.inc(request.estimatedSizeInBytes());
        }

        public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
            ElasticsearchWriter.this.ackTime = System.currentTimeMillis();
            ElasticsearchWriter.this.enqueueActionInMailbox((ThrowingRunnable<Exception>)((ThrowingRunnable)() -> this.extractFailures(request, response)), "elasticsearchSuccessCallback");
        }

        public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
            ElasticsearchWriter.this.enqueueActionInMailbox((ThrowingRunnable<Exception>)((ThrowingRunnable)() -> {
                throw new FlinkRuntimeException("Complete bulk has failed.", failure);
            }), "elasticsearchErrorCallback");
        }

        private void extractFailures(BulkRequest request, BulkResponse response) {
            this.bulkResponseInspector.inspect(request, response);
            ElasticsearchWriter.this.pendingActions -= (long)request.numberOfActions();
        }
    }
}

