/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.service.resourcelimits;

import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.noop.NoopTracerFactory;
import io.opentracing.tag.Tags;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.HttpRequest;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClient;
import io.vertx.ext.web.client.predicate.ResponsePredicate;
import io.vertx.ext.web.codec.BodyCodec;
import java.time.Duration;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.YearMonth;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalAdjusters;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
import org.eclipse.hono.cache.CacheProvider;
import org.eclipse.hono.cache.ExpiringValueCache;
import org.eclipse.hono.service.metric.MetricsTags;
import org.eclipse.hono.service.resourcelimits.PrometheusBasedResourceLimitChecksConfig;
import org.eclipse.hono.service.resourcelimits.ResourceLimitChecks;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.ConnectionDuration;
import org.eclipse.hono.util.DataVolume;
import org.eclipse.hono.util.ResourceLimitsPeriod;
import org.eclipse.hono.util.Strings;
import org.eclipse.hono.util.TenantObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class PrometheusBasedResourceLimitChecks
implements ResourceLimitChecks {
    private static final String CONNECTIONS_METRIC_NAME = "hono.connections.authenticated".replace(".", "_");
    private static final String MESSAGES_PAYLOAD_SIZE_METRIC_NAME = String.format("%s_bytes_sum", "hono.messages.payload".replace(".", "_"));
    private static final String COMMANDS_PAYLOAD_SIZE_METRIC_NAME = String.format("%s_bytes_sum", "hono.commands.payload".replace(".", "_"));
    private static final String CONNECTIONS_DURATION_METRIC_NAME = String.format("%s_seconds_sum", "hono.connections.authenticated.duration".replace(".", "_"));
    private static final Logger LOG = LoggerFactory.getLogger(PrometheusBasedResourceLimitChecks.class);
    private static final String QUERY_URI = "/api/v1/query";
    private static final String LIMITS_CACHE_NAME = "resource-limits";
    private final Tracer tracer;
    private final WebClient client;
    private final PrometheusBasedResourceLimitChecksConfig config;
    private final ExpiringValueCache<Object, Object> limitsCache;
    private final String url;

    public PrometheusBasedResourceLimitChecks(WebClient webClient, PrometheusBasedResourceLimitChecksConfig config, CacheProvider cacheProvider) {
        this(webClient, config, cacheProvider, (Tracer)NoopTracerFactory.create());
    }

    public PrometheusBasedResourceLimitChecks(WebClient webClient, PrometheusBasedResourceLimitChecksConfig config, CacheProvider cacheProvider, Tracer tracer) {
        this.client = Objects.requireNonNull(webClient);
        this.config = Objects.requireNonNull(config);
        this.limitsCache = Optional.ofNullable(cacheProvider).map(provider -> provider.getCache(LIMITS_CACHE_NAME)).orElse(null);
        this.tracer = Objects.requireNonNull(tracer);
        this.url = String.format("%s://%s:%d%s", config.isTlsEnabled() ? "https" : "http", config.getHost(), config.getPort(), QUERY_URI);
    }

    private Span createSpan(String name, SpanContext parent, TenantObject tenant) {
        return TracingHelper.buildChildSpan((Tracer)this.tracer, (SpanContext)parent, (String)name, (String)this.getClass().getSimpleName()).withTag(Tags.SPAN_KIND.getKey(), "client").withTag(Tags.PEER_HOSTNAME.getKey(), this.config.getHost()).withTag(Tags.PEER_PORT.getKey(), (Number)this.config.getPort()).withTag(Tags.HTTP_URL.getKey(), QUERY_URI).withTag("tenant_id", tenant.getTenantId()).start();
    }

    @Override
    public Future<Boolean> isConnectionLimitReached(TenantObject tenant, SpanContext spanContext) {
        Objects.requireNonNull(tenant);
        Span span = this.createSpan("verify connection limit", spanContext, tenant);
        HashMap<String, Object> items = new HashMap<String, Object>();
        Promise result = Promise.promise();
        if (tenant.getResourceLimits() == null) {
            items.put("event", "no resource limits configured");
            LOG.trace("no resource limits configured for tenant [{}]", (Object)tenant.getTenantId());
            result.complete((Object)Boolean.FALSE);
        } else {
            long maxConnections = tenant.getResourceLimits().getMaxConnections();
            items.put("max-connections", maxConnections);
            LOG.trace("connection limit for tenant [{}] is [{}]", (Object)tenant.getTenantId(), (Object)maxConnections);
            if (maxConnections == -1L) {
                items.put("event", "no connection limit configured");
                result.complete((Object)Boolean.FALSE);
            } else {
                String queryParams = String.format("sum(%s{tenant=\"%s\"})", CONNECTIONS_METRIC_NAME, tenant.getTenantId());
                this.executeQuery(queryParams, span).map(currentConnections -> {
                    items.put("current-connections", currentConnections);
                    boolean isExceeded = currentConnections >= maxConnections;
                    LOG.trace("connection limit {}exceeded [tenant: {}, current connections: {}, max-connections: {}]", new Object[]{isExceeded ? "" : "not ", tenant.getTenantId(), currentConnections, maxConnections});
                    return isExceeded;
                }).otherwise(failure -> Boolean.FALSE).setHandler((Handler)result);
            }
        }
        return result.future().map(b -> {
            items.put("limit exceeded", b);
            span.log(items);
            span.finish();
            return b;
        });
    }

    @Override
    public Future<Boolean> isMessageLimitReached(TenantObject tenant, long payloadSize, SpanContext spanContext) {
        Objects.requireNonNull(tenant);
        Span span = this.createSpan("verify message limit", spanContext, tenant);
        HashMap<String, Object> items = new HashMap<String, Object>();
        items.put("payload-size", payloadSize);
        Promise result = Promise.promise();
        if (tenant.getResourceLimits() == null) {
            items.put("event", "no resource limits configured");
            LOG.trace("no resource limits configured for tenant [{}]", (Object)tenant.getTenantId());
            result.complete((Object)Boolean.FALSE);
        } else if (tenant.getResourceLimits().getDataVolume() == null) {
            items.put("event", "no message limits configured");
            LOG.trace("no message limits configured for tenant [{}]", (Object)tenant.getTenantId());
            result.complete((Object)Boolean.FALSE);
        } else {
            this.checkMessageLimit(tenant, payloadSize, items, span, (Promise<Boolean>)result);
        }
        return result.future().map(b -> {
            items.put("limit exceeded", b);
            span.log(items);
            span.finish();
            return b;
        });
    }

    @Override
    public Future<Boolean> isConnectionDurationLimitReached(TenantObject tenant, SpanContext spanContext) {
        Objects.requireNonNull(tenant);
        Span span = this.createSpan("verify connection duration limit", spanContext, tenant);
        HashMap<String, Object> items = new HashMap<String, Object>();
        Promise result = Promise.promise();
        if (tenant.getResourceLimits() == null) {
            items.put("event", "no resource limits configured");
            LOG.trace("no resource limits configured for tenant [{}]", (Object)tenant.getTenantId());
            result.complete((Object)Boolean.FALSE);
        } else if (tenant.getResourceLimits().getConnectionDuration() == null) {
            items.put("event", "no connection duration limit configured");
            LOG.trace("no connection duration limit configured for tenant [{}]", (Object)tenant.getTenantId());
            result.complete((Object)Boolean.FALSE);
        } else {
            this.checkConnectionDurationLimit(tenant, items, span, (Promise<Boolean>)result);
        }
        return result.future().map(b -> {
            items.put("limit exceeded", b);
            span.log(items);
            span.finish();
            return b;
        });
    }

    private void checkConnectionDurationLimit(TenantObject tenant, Map<String, Object> items, Span span, Promise<Boolean> result) {
        ConnectionDuration connectionDurationConfig = tenant.getResourceLimits().getConnectionDuration();
        long maxConnectionDurationInMinutes = connectionDurationConfig.getMaxMinutes();
        Instant effectiveSince = connectionDurationConfig.getEffectiveSince();
        PeriodMode periodMode = Optional.ofNullable(connectionDurationConfig.getPeriod()).map(period -> PeriodMode.from(period.getMode())).orElse(PeriodMode.MONTHLY);
        long periodInDays = Optional.ofNullable(connectionDurationConfig.getPeriod()).map(ResourceLimitsPeriod::getNoOfDays).orElse(0).intValue();
        LOG.trace("connection duration config for the tenant [{}] is [{}:{}, {}:{}, {}:{}, {}:{}]", new Object[]{tenant.getTenantId(), "max-minutes", maxConnectionDurationInMinutes, "effective-since", effectiveSince, "mode", periodMode, "no-of-days", periodInDays});
        if (maxConnectionDurationInMinutes == -1L || effectiveSince == null || PeriodMode.UNKNOWN.equals((Object)periodMode)) {
            result.complete((Object)Boolean.FALSE);
        } else {
            long allowedMaxMinutes = this.getOrAddToCache(this.limitsCache, String.format("%s_allowed_max_minutes", tenant.getTenantId()), () -> this.calculateEffectiveLimit(OffsetDateTime.ofInstant(effectiveSince, ZoneOffset.UTC), OffsetDateTime.now(ZoneOffset.UTC), periodMode, maxConnectionDurationInMinutes));
            long connectionDurationUsagePeriod = this.getOrAddToCache(this.limitsCache, String.format("%s_conn_duration_usage_period", tenant.getTenantId()), () -> this.calculateResourceUsagePeriod(OffsetDateTime.ofInstant(effectiveSince, ZoneOffset.UTC), OffsetDateTime.now(ZoneOffset.UTC), periodMode, periodInDays));
            items.put("current period connection duration limit in minutes", allowedMaxMinutes);
            if (connectionDurationUsagePeriod <= 0L) {
                result.complete((Object)Boolean.FALSE);
            } else {
                String queryParams = String.format("minute( sum( increase( %s {tenant=\"%s\"} [%sd])))", CONNECTIONS_DURATION_METRIC_NAME, tenant.getTenantId(), connectionDurationUsagePeriod);
                String key = String.format("%s_minutes_consumed", tenant.getTenantId());
                Optional.ofNullable(this.limitsCache).map(ok -> this.limitsCache.get((Object)key)).map(cachedValue -> Future.succeededFuture((Object)((Long)cachedValue))).orElseGet(() -> this.executeQuery(queryParams, span).map(minutesConnected -> this.addToCache(this.limitsCache, key, (long)minutesConnected))).map(minutesConnected -> {
                    items.put("current period's connection duration in minutes", minutesConnected);
                    boolean isExceeded = minutesConnected >= allowedMaxMinutes;
                    LOG.trace("connection duration limit {} exceeded [tenant: {}, connection duration consumed: {}, allowed max-duration: {}, {}: {}, {}: {}, {}: {}]", new Object[]{isExceeded ? "" : "not ", tenant.getTenantId(), minutesConnected, allowedMaxMinutes, "effective-since", effectiveSince, "mode", periodMode, "no-of-days", periodInDays});
                    return isExceeded;
                }).otherwise(failed -> Boolean.FALSE).setHandler(result);
            }
        }
    }

    private void checkMessageLimit(TenantObject tenant, long payloadSize, Map<String, Object> items, Span span, Promise<Boolean> result) {
        DataVolume dataVolumeConfig = tenant.getResourceLimits().getDataVolume();
        long maxBytes = dataVolumeConfig.getMaxBytes();
        Instant effectiveSince = dataVolumeConfig.getEffectiveSince();
        PeriodMode periodMode = Optional.ofNullable(dataVolumeConfig.getPeriod()).map(period -> PeriodMode.from(period.getMode())).orElse(PeriodMode.MONTHLY);
        long periodInDays = Optional.ofNullable(dataVolumeConfig.getPeriod()).map(ResourceLimitsPeriod::getNoOfDays).orElse(0).intValue();
        LOG.trace("message limit config for tenant [{}] are [{}:{}, {}:{}, {}:{}, {}:{}]", new Object[]{tenant.getTenantId(), "max-bytes", maxBytes, "effective-since", effectiveSince, "mode", periodMode, "no-of-days", periodInDays});
        if (maxBytes == -1L || effectiveSince == null || PeriodMode.UNKNOWN.equals((Object)periodMode) || payloadSize <= 0L) {
            result.complete((Object)Boolean.FALSE);
        } else {
            long allowedMaxBytes = this.getOrAddToCache(this.limitsCache, String.format("%s_allowed_max_bytes", tenant.getTenantId()), () -> this.calculateEffectiveLimit(OffsetDateTime.ofInstant(effectiveSince, ZoneOffset.UTC), OffsetDateTime.now(ZoneOffset.UTC), periodMode, maxBytes));
            long dataUsagePeriod = this.getOrAddToCache(this.limitsCache, String.format("%s_data_usage_period", tenant.getTenantId()), () -> this.calculateResourceUsagePeriod(OffsetDateTime.ofInstant(effectiveSince, ZoneOffset.UTC), OffsetDateTime.now(ZoneOffset.UTC), periodMode, periodInDays));
            items.put("current period bytes limit", allowedMaxBytes);
            if (dataUsagePeriod <= 0L) {
                result.complete((Object)Boolean.FALSE);
            } else {
                String queryParams = String.format("floor(sum(increase(%s{status=~\"%s|%s\", tenant=\"%s\"} [%sd]) or %s*0) + sum(increase(%s{status=~\"%s|%s\", tenant=\"%s\"} [%sd]) or %s*0))", MESSAGES_PAYLOAD_SIZE_METRIC_NAME, MetricsTags.ProcessingOutcome.FORWARDED.asTag().getValue(), MetricsTags.ProcessingOutcome.UNPROCESSABLE.asTag().getValue(), tenant.getTenantId(), dataUsagePeriod, COMMANDS_PAYLOAD_SIZE_METRIC_NAME, COMMANDS_PAYLOAD_SIZE_METRIC_NAME, MetricsTags.ProcessingOutcome.FORWARDED.asTag().getValue(), MetricsTags.ProcessingOutcome.UNPROCESSABLE.asTag().getValue(), tenant.getTenantId(), dataUsagePeriod, MESSAGES_PAYLOAD_SIZE_METRIC_NAME);
                String key = String.format("%s_bytes_consumed", tenant.getTenantId());
                Optional.ofNullable(this.limitsCache).map(success -> this.limitsCache.get((Object)key)).map(cachedValue -> Future.succeededFuture((Object)((Long)cachedValue))).orElseGet(() -> this.executeQuery(queryParams, span).map(bytesConsumed -> this.addToCache(this.limitsCache, key, (long)bytesConsumed))).map(bytesConsumed -> {
                    items.put("current period bytes consumed", bytesConsumed);
                    boolean isExceeded = bytesConsumed + payloadSize > allowedMaxBytes;
                    LOG.trace("data limit {}exceeded [tenant: {}, bytes consumed: {}, allowed max-bytes: {}, {}: {}, {}: {}, {}: {}]", new Object[]{isExceeded ? "" : "not ", tenant.getTenantId(), bytesConsumed, allowedMaxBytes, "effective-since", effectiveSince, "mode", periodMode, "no-of-days", periodInDays});
                    return isExceeded;
                }).otherwise(failed -> Boolean.FALSE).setHandler(result);
            }
        }
    }

    private Future<Long> executeQuery(String query, Span span) {
        Promise result = Promise.promise();
        LOG.trace("running Prometheus query [URL: {}, query: {}]", (Object)this.url, (Object)query);
        this.newQueryRequest(query).send(sendAttempt -> {
            if (sendAttempt.succeeded()) {
                HttpResponse response = (HttpResponse)sendAttempt.result();
                result.complete((Object)this.extractLongValue((JsonObject)response.body(), span));
            } else {
                Map<String, Throwable> items = Map.of("event", Tags.ERROR.getKey(), "message", "failed to run Prometheus query", "URL", this.url, "query", query, "error.kind", "Exception", "error.object", sendAttempt.cause());
                TracingHelper.logError((Span)span, items);
                LOG.warn("failed to run Prometheus query [URL: {}, query: {}]: {}", new Object[]{this.url, query, sendAttempt.cause().getMessage()});
                result.fail(sendAttempt.cause());
            }
        });
        return result.future();
    }

    private HttpRequest<JsonObject> newQueryRequest(String query) {
        HttpRequest request = this.client.get(this.config.getPort(), this.config.getHost(), QUERY_URI).addQueryParam("query", query).expect(ResponsePredicate.SC_OK).as(BodyCodec.jsonObject());
        if (!Strings.isNullOrEmpty((Object)this.config.getUsername()) && !Strings.isNullOrEmpty((Object)this.config.getPassword())) {
            request.basicAuthentication(this.config.getUsername(), this.config.getPassword());
        }
        return request;
    }

    private Long extractLongValue(JsonObject response, Span span) {
        Objects.requireNonNull(response);
        try {
            String status = response.getString("status");
            if ("error".equals(status)) {
                TracingHelper.logError((Span)span, Map.of("message", "error executing query", "status", status, "error-type", response.getString("errorType"), "error", response.getString("error")));
                LOG.debug("error executing query [status: {}, error type: {}, error: {}]", new Object[]{status, response.getString("errorType"), response.getString("error")});
                return 0L;
            }
            JsonObject data = response.getJsonObject("data", new JsonObject());
            JsonArray result = data.getJsonArray("result");
            if (result != null) {
                String value;
                JsonArray valueArray;
                if (result.size() == 0) {
                    span.log("no metrics available (yet)");
                    return 0L;
                }
                if (result.size() == 1 && result.getJsonObject(0) != null && (valueArray = result.getJsonObject(0).getJsonArray("value")) != null && valueArray.size() == 2 && (value = valueArray.getString(1)) != null && !value.isEmpty()) {
                    return Long.parseLong(value);
                }
            }
            String jsonResponse = response.encodePrettily();
            TracingHelper.logError((Span)span, Map.of("message", "server returned malformed response", "response", jsonResponse));
            LOG.debug("server returned malformed response: {}", (Object)jsonResponse);
        }
        catch (Exception e) {
            String jsonResponse = response.encodePrettily();
            TracingHelper.logError((Span)span, Map.of("message", "server returned malformed response", "response", jsonResponse));
            LOG.debug("server returned malformed response: {}", (Object)jsonResponse);
        }
        return 0L;
    }

    long calculateEffectiveLimit(OffsetDateTime effectiveSince, OffsetDateTime targetDateTime, PeriodMode mode, long configuredLimit) {
        if (PeriodMode.MONTHLY.equals((Object)mode) && configuredLimit > 0L && !targetDateTime.isBefore(effectiveSince) && YearMonth.from(targetDateTime).equals(YearMonth.from(effectiveSince)) && effectiveSince.getDayOfMonth() != 1) {
            OffsetDateTime lastDayOfMonth = effectiveSince.with(TemporalAdjusters.lastDayOfMonth());
            long daysBetween = ChronoUnit.DAYS.between(effectiveSince, lastDayOfMonth) + 1L;
            return Double.valueOf(Math.ceil(daysBetween * configuredLimit / (long)lastDayOfMonth.getDayOfMonth())).longValue();
        }
        return configuredLimit;
    }

    long calculateResourceUsagePeriod(OffsetDateTime effectiveSince, OffsetDateTime currentDateTime, PeriodMode mode, long periodInDays) {
        long inclusiveDaysBetween = ChronoUnit.DAYS.between(effectiveSince, currentDateTime) + 1L;
        switch (mode) {
            case DAYS: {
                if (inclusiveDaysBetween > 0L && periodInDays > 0L) {
                    long dataUsagePeriodInDays = inclusiveDaysBetween % periodInDays;
                    return dataUsagePeriodInDays == 0L ? periodInDays : dataUsagePeriodInDays;
                }
                return 0L;
            }
            case MONTHLY: {
                if (YearMonth.from(currentDateTime).equals(YearMonth.from(effectiveSince)) && effectiveSince.getDayOfMonth() != 1) {
                    return inclusiveDaysBetween;
                }
                return currentDateTime.getDayOfMonth();
            }
        }
        return 0L;
    }

    private long getOrAddToCache(ExpiringValueCache<Object, Object> cache, String key, Supplier<Long> valueSupplier) {
        return Optional.ofNullable(cache).map(success -> cache.get((Object)key)).map(cachedValue -> (long)((Long)cachedValue)).orElseGet(() -> this.addToCache(cache, key, (Long)valueSupplier.get()));
    }

    private long addToCache(ExpiringValueCache<Object, Object> cache, String key, long result) {
        Optional.ofNullable(cache).ifPresent(success -> cache.put((Object)key, (Object)result, Duration.ofSeconds(this.config.getCacheTimeout())));
        return result;
    }

    protected static enum PeriodMode {
        DAYS("days"),
        MONTHLY("monthly"),
        UNKNOWN("unknown");

        private final String mode;

        private PeriodMode(String mode) {
            this.mode = mode;
        }

        static PeriodMode from(String value) {
            if (value != null) {
                for (PeriodMode mode : PeriodMode.values()) {
                    if (!value.equalsIgnoreCase(mode.value())) continue;
                    return mode;
                }
            }
            return UNKNOWN;
        }

        String value() {
            return this.mode;
        }
    }
}

