/*
 * Decompiled with CFR 0.152.
 */
package com.optimizely.ab.event;

import com.optimizely.ab.config.ProjectConfig;
import com.optimizely.ab.event.EventHandler;
import com.optimizely.ab.event.EventProcessor;
import com.optimizely.ab.event.LogEvent;
import com.optimizely.ab.event.internal.EventFactory;
import com.optimizely.ab.event.internal.UserEvent;
import com.optimizely.ab.internal.PropertyUtils;
import com.optimizely.ab.internal.SafetyUtils;
import com.optimizely.ab.notification.NotificationCenter;
import java.util.LinkedList;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BatchEventProcessor
implements EventProcessor,
AutoCloseable {
    private static final Logger logger = LoggerFactory.getLogger(BatchEventProcessor.class);
    public static final String CONFIG_BATCH_SIZE = "event.processor.batch.size";
    public static final String CONFIG_BATCH_INTERVAL = "event.processor.batch.interval";
    public static final String CONFIG_CLOSE_TIMEOUT = "event.processor.close.timeout";
    public static final int DEFAULT_QUEUE_CAPACITY = 1000;
    public static final int DEFAULT_EMPTY_COUNT = 2;
    public static final int DEFAULT_BATCH_SIZE = 10;
    public static final long DEFAULT_BATCH_INTERVAL = TimeUnit.SECONDS.toMillis(30L);
    public static final long DEFAULT_TIMEOUT_INTERVAL = TimeUnit.SECONDS.toMillis(5L);
    private static final Object SHUTDOWN_SIGNAL = new Object();
    private static final Object FLUSH_SIGNAL = new Object();
    private final BlockingQueue<Object> eventQueue;
    private final EventHandler eventHandler;
    final int batchSize;
    final long flushInterval;
    final long timeoutMillis;
    private final ExecutorService executor;
    private final NotificationCenter notificationCenter;
    private Future<?> future;
    private boolean isStarted = false;

    private BatchEventProcessor(BlockingQueue<Object> eventQueue, EventHandler eventHandler, Integer batchSize, Long flushInterval, Long timeoutMillis, ExecutorService executor, NotificationCenter notificationCenter) {
        this.eventHandler = eventHandler;
        this.eventQueue = eventQueue;
        this.batchSize = batchSize;
        this.flushInterval = flushInterval;
        this.timeoutMillis = timeoutMillis;
        this.notificationCenter = notificationCenter;
        this.executor = executor;
    }

    public synchronized void start() {
        if (this.isStarted) {
            logger.info("Executor already started.");
            return;
        }
        this.isStarted = true;
        EventConsumer runnable = new EventConsumer();
        this.future = this.executor.submit(runnable);
    }

    @Override
    public void close() throws Exception {
        logger.info("Start close");
        this.eventQueue.put(SHUTDOWN_SIGNAL);
        try {
            this.future.get(this.timeoutMillis, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            logger.warn("Interrupted while awaiting termination.");
            Thread.currentThread().interrupt();
        }
        catch (TimeoutException e) {
            logger.error("Timeout exceeded attempting to close for {} ms", (Object)this.timeoutMillis);
        }
        finally {
            this.isStarted = false;
            SafetyUtils.tryClose(this.eventHandler);
        }
    }

    @Override
    public void process(UserEvent userEvent) {
        logger.debug("Received userEvent: {}", (Object)userEvent);
        if (this.executor.isShutdown()) {
            logger.warn("Executor shutdown, not accepting tasks.");
            return;
        }
        if (!this.eventQueue.offer(userEvent)) {
            logger.warn("Payload not accepted by the queue. Current size: {}", (Object)this.eventQueue.size());
        }
    }

    public void flush() throws InterruptedException {
        this.eventQueue.put(FLUSH_SIGNAL);
    }

    public static Builder builder() {
        return new Builder();
    }

    public static class Builder {
        private BlockingQueue<Object> eventQueue = new ArrayBlockingQueue<Object>(1000);
        private EventHandler eventHandler = null;
        private Integer batchSize = PropertyUtils.getInteger("event.processor.batch.size", 10);
        private Long flushInterval = PropertyUtils.getLong("event.processor.batch.interval", DEFAULT_BATCH_INTERVAL);
        private Long timeoutMillis = PropertyUtils.getLong("event.processor.close.timeout", DEFAULT_TIMEOUT_INTERVAL);
        private ExecutorService executor = null;
        private NotificationCenter notificationCenter = null;

        public Builder withEventHandler(EventHandler eventHandler) {
            this.eventHandler = eventHandler;
            return this;
        }

        public Builder withEventQueue(BlockingQueue<Object> eventQueue) {
            this.eventQueue = eventQueue;
            return this;
        }

        public Builder withBatchSize(Integer batchSize) {
            this.batchSize = batchSize;
            return this;
        }

        public Builder withFlushInterval(Long flushInterval) {
            this.flushInterval = flushInterval;
            return this;
        }

        public Builder withExecutor(ExecutorService executor) {
            this.executor = executor;
            return this;
        }

        public Builder withTimeout(long duration, TimeUnit timeUnit) {
            this.timeoutMillis = timeUnit.toMillis(duration);
            return this;
        }

        public Builder withNotificationCenter(NotificationCenter notificationCenter) {
            this.notificationCenter = notificationCenter;
            return this;
        }

        public BatchEventProcessor build() {
            return this.build(true);
        }

        public BatchEventProcessor build(boolean shouldStart) {
            if (this.batchSize < 0) {
                logger.warn("Invalid batchSize of {}, Defaulting to {}", (Object)this.batchSize, (Object)10);
                this.batchSize = 10;
            }
            if (this.flushInterval < 0L) {
                logger.warn("Invalid flushInterval of {}, Defaulting to {}", (Object)this.flushInterval, (Object)DEFAULT_BATCH_INTERVAL);
                this.flushInterval = DEFAULT_BATCH_INTERVAL;
            }
            if (this.timeoutMillis < 0L) {
                logger.warn("Invalid timeoutMillis of {}, Defaulting to {}", (Object)this.timeoutMillis, (Object)DEFAULT_TIMEOUT_INTERVAL);
                this.timeoutMillis = DEFAULT_TIMEOUT_INTERVAL;
            }
            if (this.eventHandler == null) {
                throw new IllegalArgumentException("EventHandler was not configured");
            }
            if (this.executor == null) {
                ThreadFactory threadFactory = Executors.defaultThreadFactory();
                this.executor = Executors.newSingleThreadExecutor(runnable -> {
                    Thread thread = threadFactory.newThread(runnable);
                    thread.setDaemon(true);
                    return thread;
                });
            }
            BatchEventProcessor batchEventProcessor = new BatchEventProcessor(this.eventQueue, this.eventHandler, this.batchSize, this.flushInterval, this.timeoutMillis, this.executor, this.notificationCenter);
            if (shouldStart) {
                batchEventProcessor.start();
            }
            return batchEventProcessor;
        }
    }

    public class EventConsumer
    implements Runnable {
        private LinkedList<UserEvent> currentBatch = new LinkedList();
        private long deadline;

        public EventConsumer() {
            this.deadline = System.currentTimeMillis() + BatchEventProcessor.this.flushInterval;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                int emptyCount = 0;
                while (true) {
                    Object item;
                    if (System.currentTimeMillis() >= this.deadline) {
                        logger.debug("Deadline exceeded flushing current batch.");
                        this.flush();
                        this.deadline = System.currentTimeMillis() + BatchEventProcessor.this.flushInterval;
                    }
                    long timeout = this.deadline - System.currentTimeMillis();
                    Object e = item = emptyCount > 2 ? BatchEventProcessor.this.eventQueue.take() : BatchEventProcessor.this.eventQueue.poll(timeout, TimeUnit.MILLISECONDS);
                    if (item == null) {
                        logger.debug("Empty item after waiting flush interval.");
                        ++emptyCount;
                        continue;
                    }
                    emptyCount = 0;
                    if (item == SHUTDOWN_SIGNAL) {
                        logger.info("Received shutdown signal.");
                        break;
                    }
                    if (item == FLUSH_SIGNAL) {
                        logger.debug("Received flush signal.");
                        this.flush();
                        continue;
                    }
                    this.addToBatch((UserEvent)item);
                }
            }
            catch (InterruptedException e) {
                logger.info("Interrupted while processing buffer.");
            }
            catch (Exception e) {
                logger.error("Uncaught exception processing buffer.", (Throwable)e);
            }
            finally {
                logger.info("Exiting processing loop. Attempting to flush pending events.");
                this.flush();
            }
        }

        private void addToBatch(UserEvent userEvent) {
            if (this.shouldSplit(userEvent)) {
                this.flush();
                this.currentBatch = new LinkedList();
            }
            if (this.currentBatch.isEmpty()) {
                this.deadline = System.currentTimeMillis() + BatchEventProcessor.this.flushInterval;
            }
            this.currentBatch.add(userEvent);
            if (this.currentBatch.size() >= BatchEventProcessor.this.batchSize) {
                this.flush();
            }
        }

        private boolean shouldSplit(UserEvent userEvent) {
            if (this.currentBatch.isEmpty()) {
                return false;
            }
            ProjectConfig currentConfig = this.currentBatch.peekLast().getUserContext().getProjectConfig();
            ProjectConfig newConfig = userEvent.getUserContext().getProjectConfig();
            if (!currentConfig.getProjectId().equals(newConfig.getProjectId())) {
                return true;
            }
            return !currentConfig.getRevision().equals(newConfig.getRevision());
        }

        private void flush() {
            if (this.currentBatch.isEmpty()) {
                return;
            }
            LogEvent logEvent = EventFactory.createLogEvent(this.currentBatch);
            if (BatchEventProcessor.this.notificationCenter != null) {
                BatchEventProcessor.this.notificationCenter.send(logEvent);
            }
            try {
                BatchEventProcessor.this.eventHandler.dispatchEvent(logEvent);
            }
            catch (Exception e) {
                logger.error("Error dispatching event: {}", (Object)logEvent, (Object)e);
            }
            this.currentBatch = new LinkedList();
        }
    }
}

