/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.statetransfer;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import org.infinispan.Cache;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.container.DataContainer;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.Stop;
import org.infinispan.loaders.CacheLoaderManager;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.CacheNotifier;
import org.infinispan.notifications.cachelistener.annotation.TopologyChanged;
import org.infinispan.notifications.cachelistener.event.TopologyChangedEvent;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.statetransfer.OutboundTransferTask;
import org.infinispan.statetransfer.StateProvider;
import org.infinispan.statetransfer.StateTransferLock;
import org.infinispan.statetransfer.TransactionInfo;
import org.infinispan.topology.CacheTopology;
import org.infinispan.transaction.TransactionTable;
import org.infinispan.transaction.xa.CacheTransaction;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

@Listener
public class StateProviderImpl
implements StateProvider {
    private static final Log log = LogFactory.getLog(StateProviderImpl.class);
    private static final boolean trace = log.isTraceEnabled();
    private String cacheName;
    private Configuration configuration;
    private RpcManager rpcManager;
    private CommandsFactory commandsFactory;
    private CacheNotifier cacheNotifier;
    private TransactionTable transactionTable;
    private DataContainer dataContainer;
    private CacheLoaderManager cacheLoaderManager;
    private ExecutorService executorService;
    private StateTransferLock stateTransferLock;
    private long timeout;
    private int chunkSize;
    private volatile int topologyId;
    private volatile ConsistentHash readCh;
    private final Map<Address, List<OutboundTransferTask>> transfersByDestination = new HashMap<Address, List<OutboundTransferTask>>();

    @Inject
    public void init(Cache cache, @ComponentName(value="org.infinispan.executors.transport") ExecutorService executorService, Configuration configuration, RpcManager rpcManager, CommandsFactory commandsFactory, CacheNotifier cacheNotifier, CacheLoaderManager cacheLoaderManager, DataContainer dataContainer, TransactionTable transactionTable, StateTransferLock stateTransferLock) {
        this.cacheName = cache.getName();
        this.executorService = executorService;
        this.configuration = configuration;
        this.rpcManager = rpcManager;
        this.commandsFactory = commandsFactory;
        this.cacheNotifier = cacheNotifier;
        this.cacheLoaderManager = cacheLoaderManager;
        this.dataContainer = dataContainer;
        this.transactionTable = transactionTable;
        this.stateTransferLock = stateTransferLock;
        this.timeout = configuration.clustering().stateTransfer().timeout();
        int chunkSize = configuration.clustering().stateTransfer().chunkSize();
        this.chunkSize = chunkSize > 0 ? chunkSize : Integer.MAX_VALUE;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean isStateTransferInProgress() {
        Map<Address, List<OutboundTransferTask>> map = this.transfersByDestination;
        synchronized (map) {
            return !this.transfersByDestination.isEmpty();
        }
    }

    @TopologyChanged
    public void onTopologyChange(TopologyChangedEvent<?, ?> tce) {
        if (tce.isPre()) {
            return;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onTopologyUpdate(CacheTopology cacheTopology, boolean isRebalance) {
        this.readCh = cacheTopology.getReadConsistentHash();
        this.topologyId = cacheTopology.getTopologyId();
        HashSet<Address> members = new HashSet<Address>(cacheTopology.getWriteConsistentHash().getMembers());
        Map<Address, List<OutboundTransferTask>> map = this.transfersByDestination;
        synchronized (map) {
            Iterator<Address> it = this.transfersByDestination.keySet().iterator();
            while (it.hasNext()) {
                Address destination = it.next();
                if (members.contains(destination)) continue;
                List<OutboundTransferTask> transfers = this.transfersByDestination.get(destination);
                it.remove();
                for (OutboundTransferTask outboundTransfer : transfers) {
                    outboundTransfer.cancel();
                }
            }
        }
    }

    @Override
    @Start(priority=60)
    public void start() {
        this.cacheNotifier.addListener(this);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    @Stop(priority=20)
    public void stop() {
        if (trace) {
            log.tracef("Shutting down StateProvider of cache %s on node %s", this.cacheName, this.rpcManager.getAddress());
        }
        try {
            Map<Address, List<OutboundTransferTask>> map = this.transfersByDestination;
            synchronized (map) {
                Iterator<List<OutboundTransferTask>> it = this.transfersByDestination.values().iterator();
                while (it.hasNext()) {
                    List<OutboundTransferTask> transfers = it.next();
                    it.remove();
                    for (OutboundTransferTask outboundTransfer : transfers) {
                        outboundTransfer.cancel();
                    }
                }
            }
        }
        catch (Throwable t) {
            log.errorf(t, "Failed to stop StateProvider of cache %s on node %s", this.cacheName, this.rpcManager.getAddress());
        }
    }

    @Override
    public List<TransactionInfo> getTransactionsForSegments(Address destination, int requestTopologyId, Set<Integer> segments) throws InterruptedException {
        if (trace) {
            log.tracef("Received request for transactions from node %s for segments %s with topology id %d", destination, segments, requestTopologyId);
        }
        if (this.readCh == null) {
            throw new IllegalStateException("No cache topology received yet");
        }
        if (requestTopologyId < this.topologyId) {
            log.warnf("Transactions were requested by node %s with topology %d, smaller than the local topology (%d)", destination, requestTopologyId, this.topologyId);
        } else if (requestTopologyId > this.topologyId) {
            log.tracef("Transactions were requested by node %s with topology %d, greater than the local topology (%d). Waiting for topology %d to be installed locally.", new Object[]{destination, requestTopologyId, this.topologyId, requestTopologyId});
            this.stateTransferLock.waitForTopology(requestTopologyId);
        }
        Set<Integer> ownedSegments = this.readCh.getSegmentsForOwner(this.rpcManager.getAddress());
        if (!ownedSegments.containsAll(segments)) {
            segments.removeAll(ownedSegments);
            throw new IllegalArgumentException("Segments " + segments + " are not owned by " + this.rpcManager.getAddress());
        }
        ArrayList<TransactionInfo> transactions = new ArrayList<TransactionInfo>();
        if (this.configuration.transaction().transactionMode().isTransactional()) {
            this.collectTransactionsToTransfer(transactions, this.transactionTable.getRemoteTransactions(), segments);
            this.collectTransactionsToTransfer(transactions, this.transactionTable.getLocalTransactions(), segments);
            if (trace) {
                log.tracef("Found %d transaction(s) to transfer", transactions.size());
            }
        }
        return transactions;
    }

    private void collectTransactionsToTransfer(List<TransactionInfo> transactionsToTransfer, Collection<? extends CacheTransaction> transactions, Set<Integer> segments) {
        for (CacheTransaction cacheTransaction : transactions) {
            HashSet<Object> lockedKeys = new HashSet<Object>();
            for (Object key : cacheTransaction.getLockedKeys()) {
                if (!segments.contains(this.readCh.getSegment(key))) continue;
                lockedKeys.add(key);
            }
            for (Object key : cacheTransaction.getBackupLockedKeys()) {
                if (!segments.contains(this.readCh.getSegment(key))) continue;
                lockedKeys.add(key);
            }
            if (lockedKeys.isEmpty()) continue;
            List<WriteCommand> txModifications = cacheTransaction.getModifications();
            WriteCommand[] modifications = null;
            if (txModifications != null) {
                modifications = txModifications.toArray(new WriteCommand[txModifications.size()]);
            }
            transactionsToTransfer.add(new TransactionInfo(cacheTransaction.getGlobalTransaction(), cacheTransaction.getTopologyId(), modifications, lockedKeys));
        }
    }

    @Override
    public void startOutboundTransfer(Address destination, int requestTopologyId, Set<Integer> segments) throws InterruptedException {
        log.tracef("Starting outbound transfer of segments %s to node %s with topology id %d", segments, destination, requestTopologyId);
        if (requestTopologyId < this.topologyId) {
            log.warnf("Segments were requested by node %s with topology %d, smaller than the local topology (%d)", destination, requestTopologyId, this.topologyId);
        } else if (requestTopologyId > this.topologyId) {
            log.tracef("Segments were requested by node %s with topology %d, greater than the local topology (%d). Waiting for topology %d to be installed locally.", new Object[]{destination, requestTopologyId, this.topologyId, requestTopologyId});
            this.stateTransferLock.waitForTopology(requestTopologyId);
        }
        OutboundTransferTask outboundTransfer = new OutboundTransferTask(destination, segments, this.chunkSize, requestTopologyId, this.readCh, this, this.dataContainer, this.cacheLoaderManager, this.rpcManager, this.configuration, this.commandsFactory, this.timeout);
        this.addTransfer(outboundTransfer);
        outboundTransfer.execute(this.executorService);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addTransfer(OutboundTransferTask transferTask) {
        if (trace) {
            log.tracef("Adding outbound transfer of segments %s to %s", transferTask.getSegments(), transferTask.getDestination());
        }
        Map<Address, List<OutboundTransferTask>> map = this.transfersByDestination;
        synchronized (map) {
            List<OutboundTransferTask> transfers = this.transfersByDestination.get(transferTask.getDestination());
            if (transfers == null) {
                transfers = new ArrayList<OutboundTransferTask>();
                this.transfersByDestination.put(transferTask.getDestination(), transfers);
            }
            transfers.add(transferTask);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void cancelOutboundTransfer(Address destination, int topologyId, Set<Integer> segments) {
        if (trace) {
            log.tracef("Cancelling outbound transfer of segments %s to node %s with topology id %d", segments, destination, topologyId);
        }
        Map<Address, List<OutboundTransferTask>> map = this.transfersByDestination;
        synchronized (map) {
            List<OutboundTransferTask> transferTasks = this.transfersByDestination.get(destination);
            if (transferTasks != null) {
                OutboundTransferTask[] tasks;
                for (OutboundTransferTask transferTask : tasks = transferTasks.toArray(new OutboundTransferTask[transferTasks.size()])) {
                    transferTask.cancelSegments(segments);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void removeTransfer(OutboundTransferTask transferTask) {
        Map<Address, List<OutboundTransferTask>> map = this.transfersByDestination;
        synchronized (map) {
            List<OutboundTransferTask> transferTasks = this.transfersByDestination.get(transferTask.getDestination());
            if (transferTasks != null) {
                transferTasks.remove(transferTask);
                if (transferTasks.isEmpty()) {
                    this.transfersByDestination.remove(transferTask.getDestination());
                }
            }
        }
    }

    void onTaskCompletion(OutboundTransferTask transferTask) {
        if (trace) {
            log.tracef("Removing %s outbound transfer of segments %s to %s", transferTask.isCancelled() ? "cancelled" : "completed", transferTask.getSegments(), transferTask.getDestination());
        }
        this.removeTransfer(transferTask);
    }
}

