/*
 * Decompiled with CFR 0.152.
 */
package org.opennms.smoketest.telemetry;

import com.google.gson.Gson;
import io.searchbox.action.Action;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestClientFactory;
import io.searchbox.client.JestResult;
import io.searchbox.client.config.HttpClientConfig;
import io.searchbox.core.Search;
import io.searchbox.core.SearchResult;
import io.searchbox.indices.template.GetTemplate;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.awaitility.Awaitility;
import org.hamcrest.core.IsEqual;
import org.junit.Assert;
import org.opennms.features.jest.client.SearchResultUtils;
import org.opennms.netmgt.flows.elastic.NetflowVersion;
import org.opennms.smoketest.telemetry.FlowPacket;
import org.opennms.smoketest.telemetry.Sender;
import org.opennms.smoketest.utils.RestClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlowTester {
    private static final String TEMPLATE_NAME = "netflow";
    private static Logger LOG = LoggerFactory.getLogger(FlowTester.class);
    private static final Gson gson = new Gson();
    private final List<Delivery> deliveries;
    private final List<Consumer<FlowTester>> runBefore = new ArrayList<Consumer<FlowTester>>();
    private final List<Consumer<FlowTester>> runAfter = new ArrayList<Consumer<FlowTester>>();
    private final InetSocketAddress elasticRestAddress;
    private final int totalFlowCount;
    private JestClient client;

    public FlowTester(InetSocketAddress elasticAddress, InetSocketAddress opennmsWebAddress, List<Delivery> deliveries) {
        this.elasticRestAddress = Objects.requireNonNull(elasticAddress);
        this.deliveries = Objects.requireNonNull(deliveries);
        this.totalFlowCount = deliveries.stream().mapToInt(delivery -> delivery.packet.getFlowCount()).sum();
        if (this.totalFlowCount <= 0) {
            throw new IllegalStateException("Cannot verify flow creation/procession, as total flow count is <= 0, but must be > 0");
        }
        if (opennmsWebAddress != null) {
            RestClient restclient = new RestClient(opennmsWebAddress);
            this.runBefore.add(flowTester -> Assert.assertEquals((Object)0L, (Object)restclient.getFlowCount(0L, System.currentTimeMillis())));
            this.runAfter.add(flowTester -> Awaitility.with().pollInterval(15L, TimeUnit.SECONDS).await().atMost(1L, TimeUnit.MINUTES).until(() -> restclient.getFlowCount(0L, System.currentTimeMillis()), IsEqual.equalTo((Object)this.totalFlowCount)));
        }
        if (opennmsWebAddress != null) {
            RestClient restClient = new RestClient(opennmsWebAddress);
            this.runBefore.add(flowTester -> Assert.assertEquals((Object)0L, (Object)restClient.getFlowCount(0L, System.currentTimeMillis())));
            this.runAfter.add(flowTester -> Awaitility.with().pollInterval(15L, TimeUnit.SECONDS).await().atMost(1L, TimeUnit.MINUTES).until(() -> restClient.getFlowCount(0L, System.currentTimeMillis()), IsEqual.equalTo((Object)this.totalFlowCount)));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void verifyFlows() throws IOException {
        String elasticRestUrl = String.format("http://%s:%d", this.elasticRestAddress.getHostString(), this.elasticRestAddress.getPort());
        JestClientFactory factory = new JestClientFactory();
        factory.setHttpClientConfig(((HttpClientConfig.Builder)((HttpClientConfig.Builder)((HttpClientConfig.Builder)new HttpClientConfig.Builder(elasticRestUrl).connTimeout(5000)).readTimeout(10000)).multiThreaded(true)).build());
        try {
            this.client = factory.getObject();
            this.runBefore.forEach(rb -> rb.accept(this));
            Map<NetflowVersion, List<Delivery>> delivieriesByProtocol = this.deliveries.stream().collect(Collectors.groupingBy(delivery -> delivery.packet.getNetflowVersion()));
            LOG.info("Verifying flows. Expecting to persist {} flows across protocols: {}", (Object)this.totalFlowCount, delivieriesByProtocol.keySet());
            for (Delivery delivery2 : this.deliveries) {
                LOG.info("Sending packet payload from {} containing {} flows to: {}", new Object[]{delivery2.packet.getPayload(), delivery2.packet.getFlowCount(), delivery2.sender});
                delivery2.send();
            }
            for (NetflowVersion netflowVersion : delivieriesByProtocol.keySet()) {
                List<Delivery> deliveriesForProtocol = delivieriesByProtocol.get(netflowVersion);
                int numFlowsExpected = deliveriesForProtocol.stream().mapToInt(delivery -> delivery.packet.getFlowCount()).sum();
                LOG.info("Verifying flows for {}", (Object)netflowVersion);
                FlowTester.verify(() -> {
                    boolean foundAllFlowsForProtocol;
                    String query = "{\"query\":{\"term\":{\"netflow.version\":{\"value\":" + gson.toJson((Object)netflowVersion) + "}}}}";
                    LOG.info("Executing query: {}", (Object)query);
                    SearchResult response = (SearchResult)this.client.execute((Action)((Search.Builder)new Search.Builder(query).addIndex("netflow-*")).build());
                    LOG.info("Response {} with {} flow documents: {}", new Object[]{response.isSucceeded() ? "successful" : "failed", SearchResultUtils.getTotal((SearchResult)response), response.getJsonString()});
                    boolean bl = foundAllFlowsForProtocol = response.isSucceeded() && SearchResultUtils.getTotal((SearchResult)response) >= (long)numFlowsExpected;
                    if (!foundAllFlowsForProtocol) {
                        for (Delivery delivery : deliveriesForProtocol) {
                            LOG.info("Sending packet payload from {} containing {} flows to: {}", new Object[]{delivery.packet.getPayload(), delivery.packet.getFlowCount(), delivery.sender});
                            delivery.send();
                        }
                    }
                    return foundAllFlowsForProtocol;
                });
            }
            LOG.info("Ensuring that the index template was created...");
            FlowTester.verify(() -> {
                JestResult result = this.client.execute((Action)new GetTemplate.Builder(TEMPLATE_NAME).build());
                return result.isSucceeded() && result.getJsonObject().get(TEMPLATE_NAME) != null;
            });
            this.runAfter.forEach(ra -> ra.accept(this));
        }
        finally {
            if (this.client != null) {
                this.client.close();
            }
        }
    }

    public void setRunBefore(List<Consumer<FlowTester>> runBefore) {
        this.runBefore.clear();
        this.runBefore.addAll(runBefore);
    }

    public void setRunAfter(List<Consumer<FlowTester>> runAfter) {
        this.runAfter.clear();
        this.runAfter.addAll(runAfter);
    }

    public JestClient getJestClient() {
        return Objects.requireNonNull(this.client);
    }

    public static void verify(Block verifyCallback) {
        Objects.requireNonNull(verifyCallback);
        Awaitility.with().pollInterval(15L, TimeUnit.SECONDS).await().atMost(5L, TimeUnit.MINUTES).until(() -> {
            try {
                LOG.info("Querying elastic search");
                return verifyCallback.test();
            }
            catch (Exception e) {
                LOG.error("Error while querying to elastic search", (Throwable)e);
                return false;
            }
        });
    }

    public static interface Block {
        public boolean test() throws Exception;
    }

    public static class Delivery {
        private final FlowPacket packet;
        private final Sender sender;

        public Delivery(FlowPacket packet, Sender sender) {
            this.packet = Objects.requireNonNull(packet);
            this.sender = Objects.requireNonNull(sender);
        }

        public void send() throws IOException {
            this.packet.send(this.sender);
        }
    }
}

