/*
 * Decompiled with CFR 0.152.
 */
package com.oracle.coherence.concurrent.executor.internal;

import com.oracle.coherence.concurrent.executor.ClusteredTaskManager;
import com.oracle.coherence.concurrent.executor.PortableAbstractProcessor;
import com.oracle.coherence.concurrent.executor.internal.ExecutorTrace;
import com.oracle.coherence.concurrent.executor.options.Debugging;
import com.oracle.coherence.concurrent.executor.util.Caches;
import com.tangosol.io.ExternalizableLite;
import com.tangosol.io.pof.PofReader;
import com.tangosol.io.pof.PofWriter;
import com.tangosol.io.pof.PortableObject;
import com.tangosol.net.CacheFactory;
import com.tangosol.net.CacheService;
import com.tangosol.net.DistributedCacheService;
import com.tangosol.net.NamedCache;
import com.tangosol.net.events.Event;
import com.tangosol.net.events.EventInterceptor;
import com.tangosol.net.events.partition.TransactionEvent;
import com.tangosol.net.events.partition.TransferEvent;
import com.tangosol.net.events.partition.cache.EntryEvent;
import com.tangosol.net.partition.PartitionSet;
import com.tangosol.util.BinaryEntry;
import com.tangosol.util.DaemonThreadFactory;
import com.tangosol.util.InvocableMap;
import com.tangosol.util.LongArray;
import com.tangosol.util.SparseArray;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongArray;
import java.util.concurrent.atomic.AtomicReference;

public class ClusteredTaskInterceptor
implements EventInterceptor {
    protected final AtomicReference<PartitionSet> f_atomicPartsPending = new AtomicReference();
    protected final AtomicLongArray f_sequences;
    protected final int f_cMaxBatch;
    protected final int f_cMaxAllowedTasks;
    protected final AtomicInteger f_cOrchestratedTasks;
    protected final AtomicBoolean f_fPendingTasks;
    protected final ExecutorService f_executorService;
    protected final DistributedCacheService f_cacheService;
    protected final ConcurrentHashMap<Integer, LongArray<String>> f_mapPendingTasks;
    protected int m_nLastPartitionId;

    public ClusteredTaskInterceptor(String sServiceName) {
        this.f_cacheService = (DistributedCacheService)CacheFactory.getCluster().getService(sServiceName);
        this.f_cMaxBatch = 20;
        this.f_cMaxAllowedTasks = 100;
        this.f_cOrchestratedTasks = new AtomicInteger(0);
        this.f_fPendingTasks = new AtomicBoolean(false);
        this.f_executorService = Executors.newSingleThreadExecutor((ThreadFactory)new DaemonThreadFactory("TaskInterceptorThread-"));
        int cParts = this.f_cacheService.getPartitionCount();
        this.f_sequences = new AtomicLongArray(cParts);
        for (int i = 0; i < cParts; ++i) {
            this.f_sequences.set(i, 0L);
        }
        this.f_atomicPartsPending.set(new PartitionSet(cParts));
        this.f_mapPendingTasks = new ConcurrentHashMap();
    }

    protected synchronized void addKey(LongArray<String> keyMap, long sequence, String key) {
        keyMap.set(sequence, (Object)key);
    }

    public void addPending(int nPid) {
        AtomicReference<PartitionSet> atomicSetPending = this.f_atomicPartsPending;
        PartitionSet partsPending = atomicSetPending.get();
        PartitionSet newPartsPending = new PartitionSet(partsPending);
        newPartsPending.add(nPid);
        do {
            partsPending = atomicSetPending.get();
            assert (partsPending != null);
        } while (!atomicSetPending.compareAndSet(partsPending, newPartsPending));
    }

    protected boolean removePending(int nPid) {
        boolean fRemoved;
        PartitionSet partsNew;
        PartitionSet partsCurrent = this.f_atomicPartsPending.get();
        if (partsCurrent == null || partsCurrent.isEmpty()) {
            return false;
        }
        do {
            partsCurrent = this.f_atomicPartsPending.get();
            partsNew = new PartitionSet(partsCurrent);
            fRemoved = partsNew.remove(nPid);
        } while (!this.f_atomicPartsPending.compareAndSet(partsCurrent, partsNew));
        return fRemoved;
    }

    public CacheService getCacheService() {
        return this.f_cacheService;
    }

    public long getSequence(int nPartitionId) {
        return this.f_sequences.get(nPartitionId);
    }

    public long getNextSequence(int nPartitionId) {
        return this.f_sequences.incrementAndGet(nPartitionId);
    }

    public long resetSequence(int nPartitionId) {
        return this.f_sequences.getAndSet(nPartitionId, 0L);
    }

    public int getLastPartitionId() {
        return this.m_nLastPartitionId;
    }

    public void setLastPartitionId(int nPartitionId) {
        this.m_nLastPartitionId = nPartitionId;
    }

    public int getBatchMax() {
        return this.f_cMaxBatch;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onEvent(Event event) {
        Set setBinaryEntries = Collections.EMPTY_SET;
        if (event instanceof TransactionEvent) {
            setBinaryEntries = ((TransactionEvent)event).getEntrySet();
        } else if (event instanceof TransferEvent && (setBinaryEntries = (Set)((TransferEvent)event).getEntries().get(Caches.TASKS_CACHE_NAME)) == null) {
            setBinaryEntries = Collections.EMPTY_SET;
        }
        for (BinaryEntry binaryEntry : setBinaryEntries) {
            ClusteredTaskManager manager;
            Object oValue = binaryEntry.getValue();
            EntryEvent.Type type = EntryEvent.Type.UPDATING;
            if (event instanceof TransactionEvent) {
                if (((TransactionEvent)event).getType() == TransactionEvent.Type.COMMITTING) {
                    if (binaryEntry.getOriginalValue() == null) {
                        type = EntryEvent.Type.INSERTING;
                    } else if (binaryEntry.getValue() == null) {
                        type = EntryEvent.Type.REMOVING;
                    }
                } else if (((TransactionEvent)event).getType() == TransactionEvent.Type.COMMITTED) {
                    if (binaryEntry.getOriginalValue() == null) {
                        type = EntryEvent.Type.INSERTED;
                    } else if (oValue == null) {
                        type = EntryEvent.Type.REMOVED;
                    } else if (oValue instanceof ClusteredTaskManager) {
                        manager = (ClusteredTaskManager)oValue;
                        ClusteredTaskManager oldManager = (ClusteredTaskManager)binaryEntry.getOriginalValue();
                        if (oldManager.getState() == ClusteredTaskManager.State.PENDING && manager.getState() == ClusteredTaskManager.State.ORCHESTRATED) {
                            type = EntryEvent.Type.INSERTED;
                        }
                    } else {
                        type = EntryEvent.Type.UPDATED;
                    }
                }
            } else if (event instanceof TransferEvent) {
                TransferEvent.Type transType = (TransferEvent.Type)((TransferEvent)event).getType();
                if (transType == TransferEvent.Type.ARRIVED || transType == TransferEvent.Type.RECOVERED) {
                    type = EntryEvent.Type.INSERTED;
                } else if (transType == TransferEvent.Type.DEPARTING) {
                    type = EntryEvent.Type.REMOVED;
                }
            }
            switch (type) {
                case INSERTING: {
                    if (!(oValue instanceof ClusteredTaskManager)) break;
                    manager = (ClusteredTaskManager)oValue;
                    int nPartitionId = binaryEntry.getContext().getKeyPartition((Object)binaryEntry.getBinaryKey());
                    long lSequence = this.getNextSequence(nPartitionId);
                    int cOrchestrated = this.f_cOrchestratedTasks.get();
                    if (cOrchestrated > this.f_cMaxAllowedTasks || this.f_fPendingTasks.get()) {
                        LongArray<String> keyMap;
                        manager.setState(ClusteredTaskManager.State.PENDING);
                        this.f_fPendingTasks.compareAndSet(false, true);
                        PartitionSet partsPending = this.f_atomicPartsPending.get();
                        if (!partsPending.isFull()) {
                            this.addPending(nPartitionId);
                        }
                        if ((keyMap = this.f_mapPendingTasks.get(nPartitionId)) == null) {
                            this.f_mapPendingTasks.putIfAbsent(nPartitionId, (LongArray<String>)new SparseArray());
                        }
                        ClusteredTaskInterceptor clusteredTaskInterceptor = this;
                        synchronized (clusteredTaskInterceptor) {
                            keyMap = this.f_mapPendingTasks.get(nPartitionId);
                            keyMap.set(lSequence, binaryEntry.getKey());
                        }
                    }
                    manager.setPartitionId(nPartitionId);
                    manager.setTaskSequence(lSequence);
                    binaryEntry.setValue((Object)manager);
                    Debugging debug = manager.getDebugging();
                    debug = debug.getLogLevel() < 7 ? new Debugging() : debug;
                    ExecutorTrace.log(() -> String.format("INSERTING: In Partition [%s], Task Sequence [%s], Task State [%s], Orchestrated Count [%s]", new Object[]{manager.getPartitionId(), manager.getTaskSequence(), manager.getState(), cOrchestrated}), debug);
                    break;
                }
                case INSERTED: {
                    Debugging debug;
                    if (!(oValue instanceof ClusteredTaskManager)) break;
                    manager = (ClusteredTaskManager)oValue;
                    if (manager.getState() == ClusteredTaskManager.State.ORCHESTRATED) {
                        this.f_cOrchestratedTasks.incrementAndGet();
                    }
                    debug = (debug = manager.getDebugging()).getLogLevel() < 7 ? new Debugging() : debug;
                    ExecutorTrace.log(() -> String.format("INSERTED: In Partition [%s], Task Sequence [%s], Task State [%s], Orchestrated Count [%s]", new Object[]{manager.getPartitionId(), manager.getTaskSequence(), manager.getState(), this.f_cOrchestratedTasks.get()}), debug);
                    break;
                }
                case UPDATED: 
                case REMOVING: {
                    break;
                }
                case REMOVED: {
                    oValue = binaryEntry.getOriginalValue();
                    if (!(oValue instanceof ClusteredTaskManager)) break;
                    manager = (ClusteredTaskManager)oValue;
                    int cOrchestrated = manager.getState() != ClusteredTaskManager.State.PENDING ? this.f_cOrchestratedTasks.decrementAndGet() : this.f_cOrchestratedTasks.get();
                    Debugging debug = manager.getDebugging();
                    debug = debug.getLogLevel() < 7 ? new Debugging() : debug;
                    ExecutorTrace.log(() -> String.format("REMOVED: Orchestrated Task Count [%s], Partition [%s], Task Sequence [%s]", cOrchestrated, manager.getPartitionId(), manager.getTaskSequence()), debug);
                    if (cOrchestrated >= this.f_cMaxAllowedTasks - this.f_cMaxBatch) break;
                    this.f_executorService.submit(new Runnable(){

                        @Override
                        public synchronized void run() {
                            int nPid;
                            int nBatchSize = 0;
                            int nMaxSize = ClusteredTaskInterceptor.this.getBatchMax();
                            HashSet<String> setTasks = new HashSet<String>();
                            NamedCache taskCache = Caches.tasks(ClusteredTaskInterceptor.this.getCacheService());
                            int nPidLast = ClusteredTaskInterceptor.this.m_nLastPartitionId;
                            PartitionSet partsPending = ClusteredTaskInterceptor.this.f_atomicPartsPending.get();
                            int cOrchestrated = ClusteredTaskInterceptor.this.f_cOrchestratedTasks.get();
                            int n = nPid = nPidLast < 0 ? partsPending.next(0) : partsPending.next(nPidLast + 1);
                            while (nPid >= 0 && (nMaxSize == 0 || nBatchSize < nMaxSize) && cOrchestrated < ClusteredTaskInterceptor.this.f_cMaxAllowedTasks) {
                                int count;
                                ClusteredTaskInterceptor.this.removePending(nPid);
                                LongArray<String> keyMap = ClusteredTaskInterceptor.this.f_mapPendingTasks.get(nPid);
                                if (keyMap != null && (count = keyMap.getSize()) > 0) {
                                    if (count > nMaxSize) {
                                        ClusteredTaskInterceptor.this.addPending(nPid);
                                    }
                                    count = 0;
                                    LongArray.Iterator iterTasks = keyMap.iterator();
                                    while (iterTasks.hasNext()) {
                                        String sKey = (String)iterTasks.next();
                                        setTasks.add(sKey);
                                        iterTasks.remove();
                                        if (++count <= nMaxSize) continue;
                                        break;
                                    }
                                    nBatchSize += count;
                                }
                                nPid = partsPending.next(nPid + 1);
                            }
                            if (!setTasks.isEmpty()) {
                                taskCache.invokeAll(setTasks, (InvocableMap.EntryProcessor)new SetTaskStateProcessor(ClusteredTaskManager.State.PENDING, ClusteredTaskManager.State.ORCHESTRATED));
                            } else {
                                ClusteredTaskInterceptor.this.f_fPendingTasks.compareAndSet(true, false);
                            }
                            ClusteredTaskInterceptor.this.setLastPartitionId(nPid);
                        }
                    });
                }
            }
        }
    }

    public static class SetTaskStateProcessor
    extends PortableAbstractProcessor {
        protected ClusteredTaskManager.State m_previous;
        protected ClusteredTaskManager.State m_desired;

        public SetTaskStateProcessor() {
        }

        public SetTaskStateProcessor(ClusteredTaskManager.State desired) {
            this.m_previous = null;
            this.m_desired = desired;
        }

        public SetTaskStateProcessor(ClusteredTaskManager.State previous, ClusteredTaskManager.State desired) {
            this.m_previous = previous;
            this.m_desired = desired;
        }

        public Object process(InvocableMap.Entry entry) {
            if (entry.isPresent()) {
                ClusteredTaskManager task = (ClusteredTaskManager)entry.getValue();
                ClusteredTaskManager.State existing = task.getState();
                if (existing != null && existing.equals((Object)this.m_previous) || this.m_previous == null) {
                    task.setState(this.m_desired);
                    entry.setValue((Object)task);
                }
                return existing;
            }
            return null;
        }

        @Override
        public void readExternal(PofReader in) throws IOException {
            this.m_previous = (ClusteredTaskManager.State)((Object)in.readObject(0));
            this.m_desired = (ClusteredTaskManager.State)((Object)in.readObject(1));
        }

        @Override
        public void writeExternal(PofWriter out) throws IOException {
            out.writeObject(0, (Object)this.m_previous);
            out.writeObject(1, (Object)this.m_desired);
        }
    }

    public static class SequenceComparator
    implements Comparator<Object>,
    ExternalizableLite,
    PortableObject {
        @Override
        public int compare(Object o1, Object o2) {
            if (o1 instanceof BinaryEntry && o2 instanceof BinaryEntry) {
                ClusteredTaskManager manager1 = (ClusteredTaskManager)((BinaryEntry)o1).getValue();
                ClusteredTaskManager manager2 = (ClusteredTaskManager)((BinaryEntry)o2).getValue();
                return (int)(manager1.getTaskSequence() - manager2.getTaskSequence());
            }
            if (o1 instanceof ClusteredTaskManager && o2 instanceof ClusteredTaskManager) {
                return (int)(((ClusteredTaskManager)o1).getTaskSequence() - ((ClusteredTaskManager)o2).getTaskSequence());
            }
            if (o1 instanceof Map.Entry && o2 instanceof Map.Entry) {
                return (int)(((ClusteredTaskManager)((Map.Entry)o1).getValue()).getTaskSequence() - ((ClusteredTaskManager)((Map.Entry)o2).getValue()).getTaskSequence());
            }
            return o1.hashCode() - o2.hashCode();
        }

        public void readExternal(DataInput in) throws IOException {
        }

        public void writeExternal(DataOutput out) throws IOException {
        }

        public void readExternal(PofReader in) throws IOException {
        }

        public void writeExternal(PofWriter out) throws IOException {
        }
    }
}

