/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.compaction.audit;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.lang3.StringUtils;
import org.apache.gobblin.compaction.audit.AuditCountClient;
import org.apache.gobblin.configuration.State;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public class KafkaAuditCountHttpClient
implements AuditCountClient {
    private static final Logger log = LoggerFactory.getLogger(KafkaAuditCountHttpClient.class);
    public static final String KAFKA_AUDIT_HTTP = "kafka.audit.http";
    public static final String CONNECTION_MAX_TOTAL = "kafka.audit.httpmax.total";
    public static final int DEFAULT_CONNECTION_MAX_TOTAL = 10;
    public static final String MAX_PER_ROUTE = "kafka.audit.httpmax.per.route";
    public static final int DEFAULT_MAX_PER_ROUTE = 10;
    public static final String KAFKA_AUDIT_REST_BASE_URL = "kafka.audit.rest.base.url";
    public static final String KAFKA_AUDIT_REST_MAX_TRIES = "kafka.audit.rest.max.tries";
    public static final String KAFKA_AUDIT_REST_START_QUERYSTRING_KEY = "kafka.audit.rest.querystring.start";
    public static final String KAFKA_AUDIT_REST_END_QUERYSTRING_KEY = "kafka.audit.rest.querystring.end";
    public static final String KAFKA_AUDIT_REST_START_QUERYSTRING_DEFAULT = "begin";
    public static final String KAFKA_AUDIT_REST_END_QUERYSTRING_DEFAULT = "end";
    private PoolingHttpClientConnectionManager cm;
    private CloseableHttpClient httpClient;
    private static final JsonParser PARSER = new JsonParser();
    private final String baseUrl;
    private final String startQueryString;
    private final String endQueryString;
    private final int maxNumTries;

    public KafkaAuditCountHttpClient(State state) {
        int maxTotal = state.getPropAsInt(CONNECTION_MAX_TOTAL, 10);
        int maxPerRoute = state.getPropAsInt(MAX_PER_ROUTE, 10);
        this.cm = new PoolingHttpClientConnectionManager();
        this.cm.setMaxTotal(maxTotal);
        this.cm.setDefaultMaxPerRoute(maxPerRoute);
        this.httpClient = HttpClients.custom().setConnectionManager((HttpClientConnectionManager)this.cm).build();
        this.baseUrl = state.getProp(KAFKA_AUDIT_REST_BASE_URL);
        this.maxNumTries = state.getPropAsInt(KAFKA_AUDIT_REST_MAX_TRIES, 5);
        this.startQueryString = state.getProp(KAFKA_AUDIT_REST_START_QUERYSTRING_KEY, KAFKA_AUDIT_REST_START_QUERYSTRING_DEFAULT);
        this.endQueryString = state.getProp(KAFKA_AUDIT_REST_END_QUERYSTRING_KEY, KAFKA_AUDIT_REST_END_QUERYSTRING_DEFAULT);
    }

    @Override
    public Map<String, Long> fetch(String datasetName, long start, long end) throws IOException {
        String fullUrl = (this.baseUrl.endsWith("/") ? this.baseUrl : this.baseUrl + "/") + StringUtils.replaceChars((String)datasetName, (char)'/', (char)'.') + "?" + this.startQueryString + "=" + start + "&" + this.endQueryString + "=" + end;
        log.info("Full URL is " + fullUrl);
        String response = this.getHttpResponse(fullUrl);
        return KafkaAuditCountHttpClient.parseResponse(fullUrl, response, datasetName);
    }

    @VisibleForTesting
    public static Map<String, Long> parseResponse(String fullUrl, String response, String topic) throws IOException {
        HashMap result = Maps.newHashMap();
        JsonObject countsPerTier = null;
        try {
            JsonObject jsonObj = PARSER.parse(response).getAsJsonObject();
            countsPerTier = jsonObj.getAsJsonObject("result");
        }
        catch (Exception e) {
            throw new IOException(String.format("Unable to parse JSON response: %s for request url: %s ", response, fullUrl), e);
        }
        Set entrySet = countsPerTier.entrySet();
        for (Map.Entry entry : entrySet) {
            String tier = (String)entry.getKey();
            long count = Long.parseLong(((JsonElement)entry.getValue()).getAsString());
            result.put(tier, count);
        }
        return result;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private String getHttpResponse(String fullUrl) throws IOException {
        HttpGet req = new HttpGet(fullUrl);
        int numTries = 0;
        while (true) {
            try (CloseableHttpResponse response = this.httpClient.execute((HttpUriRequest)req);){
                int statusCode = response.getStatusLine().getStatusCode();
                if (statusCode < 200 || statusCode >= 300) {
                    throw new IOException(String.format("status code: %d, reason: %s", statusCode, response.getStatusLine().getReasonPhrase()));
                }
                String string = EntityUtils.toString((HttpEntity)response.getEntity());
                return string;
            }
            catch (IOException e) {
                String errMsg = "Unable to get or parse HTTP response for " + fullUrl;
                if (numTries >= this.maxNumTries) {
                    throw new IOException(errMsg, e);
                }
                long backOffSec = (numTries + 1) * 2;
                log.error(errMsg + ", will retry in " + backOffSec + " sec", (Throwable)e);
                try {
                    Thread.sleep(TimeUnit.SECONDS.toMillis(backOffSec));
                }
                catch (InterruptedException e1) {
                    Thread.currentThread().interrupt();
                }
                ++numTries;
                continue;
            }
            break;
        }
    }
}

