/*
 * Decompiled with CFR 0.152.
 */
package io.continual.services.processor.engine.library.services.bucketing;

import io.continual.services.ServiceContainer;
import io.continual.services.processor.engine.library.sources.JsonObjectStreamSource;
import io.continual.services.processor.engine.model.Message;
import io.continual.services.processor.engine.model.MessageProcessingContext;
import io.continual.services.processor.engine.model.Source;
import io.continual.services.processor.service.SimpleProcessingService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BucketingService
extends SimpleProcessingService {
    private final Period fSize;
    private final long fOffsetSeconds;
    private final HashMap<Long, HashMap<String, Message>> fSet;
    private final MessageBridge fBridge;
    private JsonObjectStreamSource fRptTo;
    private final String fRptToName;
    private long fLastTs = -1L;
    public static final String kHost = "host";
    public static final String kMetricName = "metric";
    public static final String kTimestamp = "timestamp";
    public static final String kValue = "value";
    public static final String kCount = "count";
    private static final Logger log = LoggerFactory.getLogger(BucketingService.class);

    public static long getBucketTimestamp(Date time, Period p) {
        return Period.getBucketTimestamp(time.getTime(), p);
    }

    public BucketingService(ServiceContainer sc, JSONObject config) {
        this.fSize = Period.readFrom(config.optString("period", Period.MINUTES.toString()));
        this.fOffsetSeconds = config.optLong("bucketTimeOffset", 0L);
        this.fBridge = new StdMsgBridge(StdDataCombiner.SUM);
        this.fSet = new HashMap();
        this.fRptTo = null;
        this.fRptToName = config.optString("reportTo");
    }

    public BucketingService(Period bucketSize, JsonObjectStreamSource reportTo) {
        this(bucketSize, StdDataCombiner.SUM, reportTo);
    }

    public BucketingService(Period bucketSize, MessageBridge bridge, JsonObjectStreamSource reportTo) {
        this.fSize = bucketSize;
        this.fOffsetSeconds = 0L;
        this.fBridge = bridge;
        this.fSet = new HashMap();
        this.fRptTo = reportTo;
        this.fRptToName = null;
    }

    public BucketingService(Period bucketSize, StdDataCombiner type, JsonObjectStreamSource reportTo) {
        this(bucketSize, new StdMsgBridge(type), reportTo);
    }

    @Override
    protected void onStopRequested() {
        this.close();
    }

    public synchronized void close() {
        this.flush();
        if (this.fRptTo != null) {
            try {
                this.fRptTo.close();
            }
            catch (IOException e) {
                log.warn("Problem closing bucket service target stream: " + e.getMessage());
            }
        }
    }

    public synchronized void add(MessageProcessingContext context) {
        Source src;
        if (this.fRptTo == null && this.fRptToName != null && (src = context.getSource(this.fRptToName)) instanceof JsonObjectStreamSource) {
            this.fRptTo = (JsonObjectStreamSource)src;
        }
        Message entry = context.getMessage();
        long ts = this.fBridge.getTimestamp(entry);
        long tsBucket = Period.getBucketTimestamp(ts, this.fSize) + this.fOffsetSeconds * 1000L;
        Message entryAtBucketTime = this.fBridge.cloneWithTime(tsBucket, entry);
        String entryKey = this.fBridge.getKey(entryAtBucketTime);
        HashMap<String, Message> entriesAtTime = this.fSet.get(tsBucket);
        if (entriesAtTime == null) {
            this.flush();
            entriesAtTime = new HashMap();
            entriesAtTime.put(entryKey, entryAtBucketTime);
            this.fSet.put(tsBucket, entriesAtTime);
        } else {
            Message existing = entriesAtTime.get(entryKey);
            existing = existing == null ? entryAtBucketTime : this.fBridge.merge(existing, entry);
            entriesAtTime.put(entryKey, existing);
        }
    }

    public synchronized List<Message> getBuckets() {
        LinkedList<Message> result = new LinkedList<Message>();
        LinkedList<Long> timestamps = new LinkedList<Long>(this.fSet.keySet());
        Collections.sort(timestamps);
        for (Long ts : timestamps) {
            HashMap<String, Message> entriesAtTime = this.fSet.get(ts);
            LinkedList<String> keys = new LinkedList<String>(entriesAtTime.keySet());
            Collections.sort(keys);
            for (String key : keys) {
                Message msg = entriesAtTime.get(key);
                result.add(msg);
            }
        }
        return result;
    }

    private void flush() {
        if (this.fRptTo == null) {
            return;
        }
        LinkedList<Long> timestamps = new LinkedList<Long>(this.fSet.keySet());
        Collections.sort(timestamps);
        for (Long ts : timestamps) {
            if (this.fLastTs < 0L) {
                this.fLastTs = ts;
            } else {
                for (long tsExpected : Period.getTimestampsBetween(this.fSize, this.fLastTs, ts)) {
                    Message msg = new Message(new JSONObject().put(kTimestamp, tsExpected).put(kValue, 0));
                    this.fRptTo.submit(msg.toJson());
                }
            }
            HashMap<String, Message> entriesAtTime = this.fSet.get(ts);
            this.fLastTs = ts;
            LinkedList<String> keys = new LinkedList<String>(entriesAtTime.keySet());
            Collections.sort(keys);
            for (String key : keys) {
                Message msg = entriesAtTime.get(key);
                this.fRptTo.submit(msg.toJson());
            }
        }
        this.fSet.clear();
    }

    public static class StdMsgBridge
    implements MessageBridge {
        private final StdDataCombiner fCombiner;

        public StdMsgBridge(StdDataCombiner sdc) {
            this.fCombiner = sdc;
        }

        @Override
        public long getTimestamp(Message m) {
            return m.getLong(BucketingService.kTimestamp, -1L);
        }

        @Override
        public String getKey(Message m) {
            return m.getValueAsString(BucketingService.kMetricName);
        }

        @Override
        public Message cloneWithTime(long tsBucket, Message m) {
            return m.clone().putValue(BucketingService.kTimestamp, tsBucket);
        }

        @Override
        public Message merge(Message origEntry, Message newEntry) {
            long origCount = origEntry.getLong(BucketingService.kCount, 1L);
            double origVal = origEntry.getDouble(BucketingService.kValue, 0.0);
            long newCount = newEntry.getLong(BucketingService.kCount, 1L);
            double newVal = newEntry.getDouble(BucketingService.kValue, 0.0);
            Message base = origEntry.clone();
            switch (this.fCombiner) {
                case AVERAGE: {
                    double origTotal = origVal * (double)origCount;
                    double addlTotal = newVal * (double)newCount;
                    long totalCount = origCount + newCount;
                    double avgValue = totalCount == 0L ? 0.0 : (origTotal + addlTotal) / (double)totalCount;
                    return base.putValue(BucketingService.kValue, avgValue).putValue(BucketingService.kCount, totalCount);
                }
                case SUM: {
                    return base.putValue(BucketingService.kValue, origVal + newVal);
                }
            }
            return origEntry;
        }
    }

    public static interface MessageBridge {
        public long getTimestamp(Message var1);

        public Message cloneWithTime(long var1, Message var3);

        public Message merge(Message var1, Message var2);

        public String getKey(Message var1);
    }

    public static enum StdDataCombiner {
        AVERAGE,
        SUM;

    }

    public static enum Period {
        SECONDS{

            @Override
            protected long getFraction() {
                return 1000L;
            }
        }
        ,
        MINUTES{

            @Override
            protected long getFraction() {
                return SECONDS.getFraction() * 60L;
            }
        }
        ,
        HOURS{

            @Override
            protected long getFraction() {
                return MINUTES.getFraction() * 60L;
            }
        }
        ,
        DAYS{

            @Override
            protected long getFraction() {
                return HOURS.getFraction() * 24L;
            }
        }
        ,
        WEEKS{

            @Override
            protected long getFraction() {
                return DAYS.getFraction() * 7L;
            }
        }
        ,
        MONTHS{

            @Override
            protected long getFraction() {
                return -1L;
            }
        };


        protected abstract long getFraction();

        public static List<Long> getTimestampsBetween(Period p, long startMs, long endMs) {
            ArrayList<Long> result;
            block4: {
                startMs = Period.getBucketTimestamp(startMs, p);
                endMs = Period.getBucketTimestamp(endMs, p);
                result = new ArrayList<Long>();
                if (startMs >= endMs) break block4;
                if (p == MONTHS) {
                    Calendar cal = Calendar.getInstance();
                    cal.setTimeInMillis(startMs);
                    cal.add(2, 1);
                    while (cal.getTimeInMillis() < endMs) {
                        result.add(cal.getTimeInMillis());
                        cal.add(2, 1);
                    }
                } else {
                    long size = p.getFraction();
                    for (long tsExpected = startMs + size; tsExpected < endMs; tsExpected += size) {
                        result.add(tsExpected);
                    }
                }
            }
            return result;
        }

        public static long getBucketTimestamp(long ms, Period p) {
            if (p == MONTHS) {
                Calendar cal = Calendar.getInstance();
                cal.setTimeInMillis(ms);
                cal.set(5, 15);
                cal.set(11, 12);
                cal.set(12, 0);
                cal.set(13, 0);
                return cal.getTimeInMillis();
            }
            long f = p.getFraction();
            return ms / f * f + Math.round(0.5 * (double)f);
        }

        public static Period readFrom(String val) {
            if (val == null) {
                return null;
            }
            val = val.trim().toUpperCase();
            return Period.valueOf(val);
        }
    }
}

