/*
 * Decompiled with CFR 0.152.
 */
package com.tc.async.impl;

import com.tc.async.api.EventHandler;
import com.tc.async.api.EventHandlerException;
import com.tc.async.api.MultiThreadedEventContext;
import com.tc.async.api.Sink;
import com.tc.async.api.Source;
import com.tc.async.api.SpecializedEventContext;
import com.tc.async.api.StageQueueStats;
import com.tc.async.impl.ContextWrapper;
import com.tc.exception.TCRuntimeException;
import com.tc.logging.TCLogger;
import com.tc.logging.TCLoggerProvider;
import com.tc.stats.Stats;
import com.tc.util.Assert;
import com.tc.util.concurrent.QueueFactory;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class StageQueueImpl<EC>
implements Sink<EC> {
    private final String stageName;
    private final TCLogger logger;
    private final SourceQueueImpl<ContextWrapper<EC>>[] sourceQueues;
    private volatile boolean closed = false;
    private volatile int fcheck = 0;

    public StageQueueImpl(int queueCount, QueueFactory<ContextWrapper<EC>> queueFactory, TCLoggerProvider loggerProvider, String stageName, int queueSize) {
        Assert.eval(queueCount > 0);
        this.logger = loggerProvider.getLogger(Sink.class.getName() + ": " + stageName);
        this.stageName = stageName;
        this.sourceQueues = new SourceQueueImpl[queueCount];
        this.createWorkerQueues(queueCount, queueFactory, queueSize, stageName);
    }

    private void createWorkerQueues(int queueCount, QueueFactory<ContextWrapper<EC>> queueFactory, int queueSize, String stage) {
        NullStageQueueStatsCollector statsCollector = new NullStageQueueStatsCollector(stage);
        BlockingQueue<ContextWrapper<EC>> q = null;
        if (queueSize != Integer.MAX_VALUE) {
            queueSize = (int)Math.ceil((double)queueSize / (double)queueCount);
        }
        Assert.eval(queueSize > 0);
        for (int i = 0; i < queueCount; ++i) {
            q = queueFactory.createInstance(queueSize);
            this.sourceQueues[i] = new SourceQueueImpl<ContextWrapper<EC>>(q, i, statsCollector);
        }
    }

    public Source<ContextWrapper<EC>> getSource(int index) {
        return index < 0 || index >= this.sourceQueues.length ? null : this.sourceQueues[index];
    }

    @Override
    public void setClosed(boolean closed) {
        this.closed = closed;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void addSingleThreaded(EC context) {
        Assert.assertNotNull(context);
        Assert.assertFalse(context instanceof MultiThreadedEventContext);
        if (this.closed) {
            throw new IllegalStateException("closed");
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Added:" + context + " to:" + this.stageName);
        }
        boolean interrupted = Thread.interrupted();
        HandledContext<EC> wrapper = new HandledContext<EC>(context);
        try {
            while (true) {
                try {
                    this.sourceQueues[0].put(wrapper);
                }
                catch (InterruptedException e) {
                    this.logger.debug("StageQueue Add: " + e);
                    interrupted = true;
                    continue;
                }
                break;
            }
        }
        finally {
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void addMultiThreaded(EC context) {
        Assert.assertNotNull(context);
        Assert.assertTrue(context instanceof MultiThreadedEventContext);
        if (this.closed) {
            throw new IllegalStateException("closed");
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Added:" + context + " to:" + this.stageName);
        }
        boolean interrupted = Thread.interrupted();
        MultiThreadedEventContext cxt = (MultiThreadedEventContext)context;
        int index = this.getSourceQueueFor(cxt);
        ContextWrapper<Object> wrapper = cxt.flush() ? new FlushingHandledContext(context, index) : new HandledContext<EC>(context);
        try {
            while (true) {
                try {
                    this.sourceQueues[index].put(wrapper);
                }
                catch (InterruptedException e) {
                    this.logger.debug("StageQueue Add: " + e);
                    interrupted = true;
                    continue;
                }
                break;
            }
        }
        finally {
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void addSpecialized(SpecializedEventContext specialized) {
        if (this.closed) {
            throw new IllegalStateException("closed");
        }
        DirectExecuteContext wrapper = new DirectExecuteContext(specialized);
        boolean interrupted = Thread.interrupted();
        int index = this.getSourceQueueFor(specialized);
        try {
            while (true) {
                try {
                    this.sourceQueues[index].put(wrapper);
                }
                catch (InterruptedException e) {
                    this.logger.debug("StageQueue Add: " + e);
                    interrupted = true;
                    continue;
                }
                break;
            }
        }
        finally {
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }

    private int findShortestQueueIndex() {
        int pointer = this.fcheck;
        int min = Integer.MAX_VALUE;
        int can = -1;
        for (int x = 0; x < this.sourceQueues.length; ++x) {
            int index = (pointer + x) % this.sourceQueues.length;
            SourceQueueImpl<ContextWrapper<EC>> impl = this.sourceQueues[index];
            if (impl.isEmpty()) {
                return index;
            }
            int checkMin = impl.size();
            if (Math.min(min, checkMin) == min) continue;
            can = index;
            min = checkMin;
        }
        Assert.assertTrue(can >= 0 && can < this.sourceQueues.length);
        return can;
    }

    private int getSourceQueueFor(MultiThreadedEventContext context) {
        Object schedulingKey = context.getSchedulingKey();
        if (null == schedulingKey) {
            return this.findShortestQueueIndex();
        }
        int index = this.hashCodeToArrayIndex(schedulingKey.hashCode(), this.sourceQueues.length);
        return index;
    }

    private int hashCodeToArrayIndex(int hashcode, int arrayLength) {
        return Math.abs(hashcode % arrayLength);
    }

    @Override
    public int size() {
        int totalQueueSize = 0;
        for (SourceQueueImpl<ContextWrapper<EC>> sourceQueue : this.sourceQueues) {
            totalQueueSize += sourceQueue.size();
        }
        return totalQueueSize;
    }

    public String toString() {
        return "StageQueue(" + this.stageName + ")";
    }

    @Override
    public void clear() {
        int clearCount = 0;
        for (SourceQueueImpl<ContextWrapper<EC>> sourceQueue : this.sourceQueues) {
            clearCount += sourceQueue.clear();
        }
        this.logger.info("Cleared " + clearCount);
    }

    @Override
    public void enableStatsCollection(boolean enable) {
        StageQueueStats collector = null;
        for (SourceQueueImpl<ContextWrapper<EC>> src : this.sourceQueues) {
            String name = this.stageName + "[" + src.getSourceName() + "]";
            if (collector == null || !collector.getName().equals(name)) {
                collector = enable ? new StageQueueStatsCollectorImpl(name) : new NullStageQueueStatsCollector(name);
            }
            src.setStatsCollector((StageQueueStatsCollector)collector);
        }
    }

    @Override
    public Stats getStats(long frequency) {
        if (this.sourceQueues.length == 1) {
            return this.sourceQueues[0].getStatsCollector();
        }
        return new Stats(){

            @Override
            public String getDetails() {
                StringBuilder build = new StringBuilder();
                StageQueueStatsCollector stats = null;
                for (SourceQueueImpl impl : StageQueueImpl.this.sourceQueues) {
                    StageQueueStatsCollector current = impl.getStatsCollector();
                    if (stats != current) {
                        if (stats != null) {
                            build.append('\n');
                        }
                        build.append(current.getDetails());
                    }
                    stats = current;
                }
                return build.toString();
            }

            @Override
            public void logDetails(TCLogger statsLogger) {
                statsLogger.info(this.getDetails());
            }
        };
    }

    @Override
    public Stats getStatsAndReset(long frequency) {
        return this.getStats(frequency);
    }

    @Override
    public boolean isStatsCollectionEnabled() {
        return this.sourceQueues[0].getStatsCollector() instanceof StageQueueStatsCollectorImpl;
    }

    @Override
    public void resetStats() {
        this.sourceQueues[0].getStatsCollector().reset();
    }

    private class FlushingHandledContext<T extends EC>
    implements ContextWrapper<EC> {
        private final EC context;
        private final int offset;
        private int executionCount = 0;

        public FlushingHandledContext(EC context, int offset) {
            this.context = context;
            this.offset = offset;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void runWithHandler(EventHandler<EC> handler) throws EventHandlerException {
            if (++this.executionCount == StageQueueImpl.this.sourceQueues.length) {
                handler.handleEvent(this.context);
            } else {
                boolean interrupted = false;
                try {
                    while (true) {
                        try {
                            StageQueueImpl.this.sourceQueues[(this.executionCount + this.offset) % StageQueueImpl.this.sourceQueues.length].put(this);
                        }
                        catch (InterruptedException e) {
                            StageQueueImpl.this.logger.debug("FlushingHandledContext move to next queue: " + e + " : " + (this.executionCount + this.offset) % StageQueueImpl.this.sourceQueues.length);
                            interrupted = true;
                            continue;
                        }
                        break;
                    }
                }
                finally {
                    if (interrupted) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }

        public boolean equals(Object obj) {
            if (this.context.getClass().isInstance(obj)) {
                return this.context.equals(obj);
            }
            return super.equals(obj);
        }
    }

    private static class HandledContext<EC>
    implements ContextWrapper<EC> {
        private final EC context;

        public HandledContext(EC context) {
            this.context = context;
        }

        @Override
        public void runWithHandler(EventHandler<EC> handler) throws EventHandlerException {
            handler.handleEvent(this.context);
        }

        public boolean equals(Object obj) {
            if (this.context.getClass().isInstance(obj)) {
                return this.context.equals(obj);
            }
            return super.equals(obj);
        }
    }

    private static class DirectExecuteContext<EC>
    implements ContextWrapper<EC> {
        private final SpecializedEventContext context;

        public DirectExecuteContext(SpecializedEventContext context) {
            this.context = context;
        }

        @Override
        public void runWithHandler(EventHandler<EC> handler) throws EventHandlerException {
            this.context.execute();
        }
    }

    private static class StageQueueStatsCollectorImpl
    extends StageQueueStatsCollector {
        private final AtomicInteger count = new AtomicInteger(0);
        private final String name;
        private final String trimmedName;

        public StageQueueStatsCollectorImpl(String stage) {
            this.trimmedName = stage.trim();
            this.name = this.makeWidth(stage, 40);
        }

        @Override
        public String getDetails() {
            return this.name + " : " + this.count;
        }

        @Override
        public void contextAdded() {
            this.count.incrementAndGet();
        }

        @Override
        public void contextRemoved() {
            this.count.decrementAndGet();
        }

        @Override
        public void reset() {
            this.count.set(0);
        }

        @Override
        public String getName() {
            return this.trimmedName;
        }

        @Override
        public int getDepth() {
            return this.count.get();
        }
    }

    private static class NullStageQueueStatsCollector
    extends StageQueueStatsCollector {
        private final String name;
        private final String trimmedName;

        public NullStageQueueStatsCollector(String stage) {
            this.trimmedName = stage.trim();
            this.name = this.makeWidth(stage, 40);
        }

        @Override
        public String getDetails() {
            return this.name + " : Not Monitored";
        }

        @Override
        public void contextAdded() {
        }

        @Override
        public void contextRemoved() {
        }

        @Override
        public void reset() {
        }

        @Override
        public String getName() {
            return this.trimmedName;
        }

        @Override
        public int getDepth() {
            return -1;
        }
    }

    private static abstract class StageQueueStatsCollector
    implements StageQueueStats {
        private StageQueueStatsCollector() {
        }

        @Override
        public void logDetails(TCLogger statsLogger) {
            statsLogger.info(this.getDetails());
        }

        public abstract void contextAdded();

        public abstract void reset();

        public abstract void contextRemoved();

        protected String makeWidth(String name, int width) {
            int len = name.length();
            if (len == width) {
                return name;
            }
            if (len > width) {
                return name.substring(0, width);
            }
            StringBuffer buf = new StringBuffer(name);
            for (int i = len; i < width; ++i) {
                buf.append(' ');
            }
            return buf.toString();
        }
    }

    private final class SourceQueueImpl<W>
    implements Source<W> {
        private final BlockingQueue<W> queue;
        private final int sourceIndex;
        private volatile StageQueueStatsCollector statsCollector;

        public SourceQueueImpl(BlockingQueue<W> queue, int sourceIndex, StageQueueStatsCollector statsCollector) {
            this.queue = queue;
            this.sourceIndex = sourceIndex;
            this.statsCollector = statsCollector;
        }

        public StageQueueStatsCollector getStatsCollector() {
            return this.statsCollector;
        }

        public void setStatsCollector(StageQueueStatsCollector collector) {
            this.statsCollector = collector;
        }

        public int clear() {
            int cleared = 0;
            try {
                while (this.poll(0L) != null) {
                    ++cleared;
                }
                return cleared;
            }
            catch (InterruptedException e) {
                throw new TCRuntimeException(e);
            }
        }

        @Override
        public boolean isEmpty() {
            return this.queue.isEmpty();
        }

        @Override
        public W poll(long timeout) throws InterruptedException {
            W rv = this.queue.poll(timeout, TimeUnit.MILLISECONDS);
            if (rv != null) {
                this.statsCollector.contextRemoved();
                if (this.queue.isEmpty()) {
                    StageQueueImpl.this.fcheck = this.sourceIndex;
                }
            } else {
                StageQueueImpl.this.fcheck = this.sourceIndex;
            }
            return rv;
        }

        public void put(W context) throws InterruptedException {
            this.queue.put(context);
            this.statsCollector.contextAdded();
        }

        public int size() {
            return this.queue.size();
        }

        @Override
        public String getSourceName() {
            return Integer.toString(this.sourceIndex);
        }
    }
}

