/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.eureka.util.batcher;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.netflix.eureka.util.batcher.MessageProcessor;
import com.netflix.servo.annotations.DataSourceType;
import com.netflix.servo.annotations.Monitor;
import com.netflix.servo.monitor.Counter;
import com.netflix.servo.monitor.Monitors;
import com.netflix.servo.monitor.Stopwatch;
import com.netflix.servo.monitor.Timer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MessageBatcher<T> {
    private static final Logger logger = LoggerFactory.getLogger(MessageBatcher.class);
    private static final String BATCHER_PREFIX = "batcher.";
    private static final String COLLECTOR_SUFFIX = ".collector";
    private static final int MAX_BATCH_SIZE = 250;
    private static final int BEFORE_SHUTDOWN_WAIT_TIME_MS = 10000;
    private final boolean shouldRejectWhenFull;
    private boolean shouldCollectorShutdown;
    protected long maxDelayNano;
    private List<T> batch;
    protected String name;
    protected BlockingQueue<T> queue;
    protected int maxMessages;
    private Collector collector;
    protected ThreadPoolExecutor processor;
    protected MessageProcessor target;
    protected final AtomicInteger concurrentBatches = new AtomicInteger(0);
    protected Timer queueSizeTracer;
    protected Timer batchSyncPutTracer;
    protected Timer threadSubmitTracer;
    protected Timer processTimeTracer;
    protected Timer avgBatchSizeTracer;
    protected Counter queueOverflowCounter;
    private volatile boolean isShutDown;
    private final AtomicLong numberAdded = new AtomicLong();
    private final AtomicLong numberDropped = new AtomicLong();
    private final boolean blockingProperty;
    private boolean isCollectorPaused;
    private final Counter processCount;

    public MessageBatcher(String name, MessageProcessor<T> target, int maxQueueSize, long maxDelayMs, int minThreads, int maxThreads, long keepAliveTimeMs, boolean shouldRejectWhenFull) {
        this.name = BATCHER_PREFIX + name;
        this.target = target;
        this.maxMessages = 250;
        this.maxDelayNano = maxDelayMs * 1000000L;
        this.shouldRejectWhenFull = shouldRejectWhenFull;
        this.queue = new ArrayBlockingQueue<T>(maxQueueSize);
        this.batch = new ArrayList<T>(this.maxMessages);
        this.collector = new Collector(this, this.name + COLLECTOR_SUFFIX);
        this.createProcessor(minThreads, maxThreads, keepAliveTimeMs);
        this.queueSizeTracer = Monitors.newTimer((String)"queue_size");
        this.batchSyncPutTracer = Monitors.newTimer((String)"waitTimeforBuffer");
        this.avgBatchSizeTracer = Monitors.newTimer((String)"batch_size");
        this.processCount = Monitors.newCounter((String)"messages_processed");
        this.threadSubmitTracer = Monitors.newTimer((String)"thread_invocation_time");
        this.processTimeTracer = Monitors.newTimer((String)"message_processTime");
        this.queueOverflowCounter = Monitors.newCounter((String)"queue_overflow");
        this.blockingProperty = false;
        this.collector.setDaemon(true);
        this.collector.start();
        try {
            Monitors.registerObject((String)this.name, (Object)this);
        }
        catch (Throwable e) {
            logger.warn("Metrics initialization error", e);
        }
    }

    public boolean isSpaceAvailable() {
        return this.queue.remainingCapacity() > 0;
    }

    public boolean process(T message) {
        if (this.isShutDown) {
            return false;
        }
        try {
            this.queueSizeTracer.record((long)this.queue.size());
        }
        catch (Throwable throwable) {
            // empty catch block
        }
        if (!this.queue.offer(message)) {
            this.numberDropped.incrementAndGet();
            this.queueOverflowCounter.increment();
            return false;
        }
        this.numberAdded.incrementAndGet();
        return true;
    }

    public void processSync(T message) {
        if (this.isShutDown) {
            return;
        }
        try {
            this.queueSizeTracer.record((long)this.queue.size());
        }
        catch (Throwable ignored) {
            // empty catch block
        }
        try {
            Stopwatch s = this.batchSyncPutTracer.start();
            this.queue.put(message);
            s.stop();
        }
        catch (InterruptedException e) {
            return;
        }
        this.numberAdded.incrementAndGet();
    }

    public void process(List<T> objects) {
        for (T message : objects) {
            if (this.isShutDown) {
                return;
            }
            this.process(message);
        }
    }

    public void process(List<T> objects, boolean sync) {
        for (T message : objects) {
            if (this.isShutDown) {
                return;
            }
            if (sync) {
                this.processSync(message);
                continue;
            }
            this.process(message);
        }
    }

    public void pause() {
        if (!this.isShutDown) {
            this.isCollectorPaused = true;
        }
    }

    public boolean isPaused() {
        return this.isCollectorPaused;
    }

    public void resume() {
        if (!this.isShutDown) {
            this.isCollectorPaused = false;
        }
    }

    public void stop() {
        this.isShutDown = true;
        int waitTimeinMillis = 10000;
        long timeToWait = (long)waitTimeinMillis + System.currentTimeMillis();
        while (!(this.queue.isEmpty() && this.batch.isEmpty() || System.currentTimeMillis() >= timeToWait)) {
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                // empty catch block
                break;
            }
        }
        try {
            this.shouldCollectorShutdown = true;
            this.processor.shutdownNow();
        }
        catch (Throwable e) {
            logger.warn("Message batcher shutdown completed with an error", e);
        }
    }

    @Monitor(name="batcherQueueSize", type=DataSourceType.GAUGE)
    public int getSize() {
        if (this.queue != null) {
            return this.queue.size();
        }
        return 0;
    }

    @Monitor(name="numberAdded", type=DataSourceType.GAUGE)
    public long getNumberAdded() {
        return this.numberAdded.get();
    }

    @Monitor(name="numberDropped", type=DataSourceType.GAUGE)
    public long getNumberDropped() {
        return this.numberDropped.get();
    }

    @Monitor(name="blocking", type=DataSourceType.INFORMATIONAL)
    public boolean isBlocking() {
        return this.blockingProperty;
    }

    private void createProcessor(int minThreads, int maxThreads, long keepAliveTimeMs) {
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(false).setNameFormat(this.name + "-process").build();
        this.processor = new ThreadPoolExecutor(minThreads, maxThreads, keepAliveTimeMs, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), threadFactory);
        if (!this.shouldRejectWhenFull) {
            this.processor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy(){

                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                    super.rejectedExecution(r, e);
                }
            });
        }
    }

    private class Collector
    extends Thread {
        private static final int SLEEP_TIME_MS = 1;
        private static final int RETRY_EXECUTION_TIMEOUT_MS = 1;
        private final Timer processTimeTracer;
        private final Counter rejectedCounter;
        private final MessageBatcher<T> stream;
        private final Timer queueSizeTracer;

        private Collector(MessageBatcher<T> stream, String name) {
            super(name);
            this.rejectedCounter = Monitors.newCounter((String)(MessageBatcher.this.processCount + ".rejected"));
            this.processTimeTracer = Monitors.newTimer((String)(name + ".processTime"));
            this.stream = stream;
            this.queueSizeTracer = Monitors.newTimer((String)(name + ".queue_size_at_drain"));
        }

        @Override
        public void run() {
            while (!MessageBatcher.this.shouldCollectorShutdown) {
                if (MessageBatcher.this.isCollectorPaused) {
                    try {
                        Thread.sleep(1L);
                    }
                    catch (InterruptedException ignore) {}
                    continue;
                }
                try {
                    boolean retryExecution;
                    int batchSize;
                    if (MessageBatcher.this.batch.size() < this.stream.maxMessages) {
                        long now;
                        long firstTime = now = System.nanoTime();
                        do {
                            if (this.stream.queue.drainTo(MessageBatcher.this.batch, this.stream.maxMessages - MessageBatcher.this.batch.size()) > 0) continue;
                            long maxWait = firstTime + this.stream.maxDelayNano - now;
                            if (maxWait <= 0L) break;
                            Object nextMessage = null;
                            try {
                                nextMessage = this.stream.queue.poll(maxWait, TimeUnit.NANOSECONDS);
                            }
                            catch (InterruptedException ignore) {
                                // empty catch block
                            }
                            if (nextMessage == null) break;
                            MessageBatcher.this.batch.add(nextMessage);
                            now = System.nanoTime();
                        } while (MessageBatcher.this.batch.size() < this.stream.maxMessages);
                    }
                    if ((batchSize = MessageBatcher.this.batch.size()) <= 0) continue;
                    try {
                        this.queueSizeTracer.record((long)this.stream.queue.size());
                    }
                    catch (Exception ignored) {
                        // empty catch block
                    }
                    MessageBatcher.this.avgBatchSizeTracer.record((long)batchSize);
                    Stopwatch s = this.processTimeTracer.start();
                    do {
                        try {
                            this.stream.processor.execute(new ProcessMessages(this.stream, MessageBatcher.this.batch));
                            retryExecution = false;
                        }
                        catch (RejectedExecutionException re) {
                            this.rejectedCounter.increment();
                            retryExecution = true;
                            Thread.sleep(1L);
                        }
                    } while (retryExecution);
                    MessageBatcher.this.processCount.increment((long)batchSize);
                    s.stop();
                    MessageBatcher.this.batch = new ArrayList(this.stream.maxMessages);
                }
                catch (Throwable e) {
                    logger.error("Collector task error", e);
                }
            }
        }
    }

    private static class ProcessMessages<T>
    implements Runnable {
        private final MessageBatcher<T> stream;
        private final List<T> batch;
        private final Timer processMessagesTracer;
        private final Timer avgConcurrentBatches;

        private ProcessMessages(MessageBatcher<T> stream, List<T> batch) {
            this.stream = stream;
            this.batch = batch;
            this.processMessagesTracer = stream.processTimeTracer;
            this.avgConcurrentBatches = Monitors.newTimer((String)(stream.name + ".concurrentBatches"));
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                if (this.batch == null) {
                    return;
                }
                int inProcess = this.stream.concurrentBatches.incrementAndGet();
                try {
                    this.avgConcurrentBatches.record((long)inProcess);
                    Stopwatch s = this.processMessagesTracer.start();
                    this.stream.target.process(this.batch);
                    s.stop();
                }
                finally {
                    this.stream.concurrentBatches.decrementAndGet();
                }
            }
            catch (Throwable e) {
                logger.error("Batch processing failure", e);
            }
        }
    }
}

