/*
 * Decompiled with CFR 0.152.
 */
package com.amazonaws.services.kinesis.connectors.elasticsearch;

import com.amazonaws.services.kinesis.connectors.KinesisConnectorConfiguration;
import com.amazonaws.services.kinesis.connectors.UnmodifiableBuffer;
import com.amazonaws.services.kinesis.connectors.elasticsearch.ElasticsearchObject;
import com.amazonaws.services.kinesis.connectors.interfaces.IEmitter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequestBuilder;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;

public class ElasticsearchEmitter
implements IEmitter<ElasticsearchObject> {
    private static final Log LOG = LogFactory.getLog(ElasticsearchEmitter.class);
    private static final String ELASTICSEARCH_CLUSTER_NAME_KEY = "cluster.name";
    private static final String ELASTICSEARCH_CLIENT_TRANSPORT_SNIFF_KEY = "client.transport.sniff";
    private static final String ELASTICSEARCH_CLIENT_TRANSPORT_IGNORE_CLUSTER_NAME_KEY = "client.transport.ignore_cluster_name";
    private static final String ELASTICSEARCH_CLIENT_TRANSPORT_PING_TIMEOUT_KEY = "client.transport.ping_timeout";
    private static final String ELASTICSEARCH_CLIENT_TRANSPORT_NODES_SAMPLER_INTERVAL_KEY = "client.transport.nodes_sampler_interval";
    private final TransportClient elasticsearchClient;
    private final String elasticsearchEndpoint;
    private final int elasticsearchPort;
    private long BACKOFF_PERIOD = 10000L;

    public ElasticsearchEmitter(KinesisConnectorConfiguration configuration) {
        Settings settings = ImmutableSettings.settingsBuilder().put(ELASTICSEARCH_CLUSTER_NAME_KEY, configuration.ELASTICSEARCH_CLUSTER_NAME).put(ELASTICSEARCH_CLIENT_TRANSPORT_SNIFF_KEY, configuration.ELASTICSEARCH_TRANSPORT_SNIFF).put(ELASTICSEARCH_CLIENT_TRANSPORT_IGNORE_CLUSTER_NAME_KEY, configuration.ELASTICSEARCH_IGNORE_CLUSTER_NAME).put(ELASTICSEARCH_CLIENT_TRANSPORT_PING_TIMEOUT_KEY, configuration.ELASTICSEARCH_PING_TIMEOUT).put(ELASTICSEARCH_CLIENT_TRANSPORT_NODES_SAMPLER_INTERVAL_KEY, configuration.ELASTICSEARCH_NODE_SAMPLER_INTERVAL).build();
        this.elasticsearchEndpoint = configuration.ELASTICSEARCH_ENDPOINT;
        this.elasticsearchPort = configuration.ELASTICSEARCH_PORT;
        LOG.info((Object)("ElasticsearchEmitter using elasticsearch endpoint " + this.elasticsearchEndpoint + ":" + this.elasticsearchPort));
        this.elasticsearchClient = new TransportClient(settings);
        this.elasticsearchClient.addTransportAddress((TransportAddress)new InetSocketTransportAddress(this.elasticsearchEndpoint, this.elasticsearchPort));
    }

    @Override
    public List<ElasticsearchObject> emit(UnmodifiableBuffer<ElasticsearchObject> buffer) throws IOException {
        List<ElasticsearchObject> records = buffer.getRecords();
        if (records.isEmpty()) {
            return Collections.emptyList();
        }
        BulkRequestBuilder bulkRequest = this.elasticsearchClient.prepareBulk();
        for (ElasticsearchObject record : records) {
            Boolean create;
            Long ttl;
            IndexRequestBuilder indexRequestBuilder = this.elasticsearchClient.prepareIndex(record.getIndex(), record.getType(), record.getId());
            indexRequestBuilder.setSource(record.getSource());
            Long version = record.getVersion();
            if (version != null) {
                indexRequestBuilder.setVersion(version.longValue());
            }
            if ((ttl = record.getTtl()) != null) {
                indexRequestBuilder.setTTL(ttl.longValue());
            }
            if ((create = record.getCreate()) != null) {
                indexRequestBuilder.setCreate(create.booleanValue());
            }
            bulkRequest.add(indexRequestBuilder);
        }
        while (true) {
            try {
                BulkResponse bulkResponse = (BulkResponse)bulkRequest.execute().actionGet();
                BulkItemResponse[] responses = bulkResponse.getItems();
                ArrayList<ElasticsearchObject> failures = new ArrayList<ElasticsearchObject>();
                int numberOfSkippedRecords = 0;
                for (int i = 0; i < responses.length; ++i) {
                    if (!responses[i].isFailed()) continue;
                    LOG.error((Object)("Record failed with message: " + responses[i].getFailureMessage()));
                    BulkItemResponse.Failure failure = responses[i].getFailure();
                    if (failure.getMessage().contains("DocumentAlreadyExistsException") || failure.getMessage().contains("VersionConflictEngineException")) {
                        ++numberOfSkippedRecords;
                        continue;
                    }
                    failures.add(records.get(i));
                }
                LOG.info((Object)("Emitted " + (records.size() - failures.size() - numberOfSkippedRecords) + " records to Elasticsearch"));
                if (!failures.isEmpty()) {
                    this.printClusterStatus();
                    LOG.warn((Object)("Returning " + failures.size() + " records as failed"));
                }
                return failures;
            }
            catch (NoNodeAvailableException nnae) {
                LOG.error((Object)("No nodes found at " + this.elasticsearchEndpoint + ":" + this.elasticsearchPort + ". Retrying in " + this.BACKOFF_PERIOD + " milliseconds"), (Throwable)nnae);
                this.sleep(this.BACKOFF_PERIOD);
                continue;
            }
            catch (Exception e) {
                LOG.error((Object)"ElasticsearchEmitter threw an unexpected exception ", (Throwable)e);
                this.sleep(this.BACKOFF_PERIOD);
                continue;
            }
            break;
        }
    }

    @Override
    public void fail(List<ElasticsearchObject> records) {
        for (ElasticsearchObject record : records) {
            LOG.error((Object)("Record failed: " + record));
        }
    }

    @Override
    public void shutdown() {
        this.elasticsearchClient.close();
    }

    private void sleep(long sleepTime) {
        try {
            Thread.sleep(sleepTime);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    private void printClusterStatus() {
        ClusterHealthRequestBuilder healthRequestBuilder = this.elasticsearchClient.admin().cluster().prepareHealth(new String[0]);
        ClusterHealthResponse response = (ClusterHealthResponse)healthRequestBuilder.execute().actionGet();
        if (response.getStatus().equals((Object)ClusterHealthStatus.RED)) {
            LOG.error((Object)"Cluster health is RED. Indexing ability will be limited");
        } else if (response.getStatus().equals((Object)ClusterHealthStatus.YELLOW)) {
            LOG.warn((Object)"Cluster health is YELLOW.");
        } else if (response.getStatus().equals((Object)ClusterHealthStatus.GREEN)) {
            LOG.info((Object)"Cluster health is GREEN.");
        }
    }
}

