/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.transaction;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.TopologyChanged;
import org.infinispan.notifications.cachelistener.event.TopologyChangedEvent;
import org.infinispan.remoting.MembershipArithmetic;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.transaction.TransactionTable;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

@Listener
public class StaleTransactionCleanupService {
    private static Log log = LogFactory.getLog(StaleTransactionCleanupService.class);
    private TransactionTable transactionTable;
    private ScheduledExecutorService executorService;

    public StaleTransactionCleanupService(TransactionTable transactionTable) {
        this.transactionTable = transactionTable;
    }

    @TopologyChanged
    public void onTopologyChange(TopologyChangedEvent<?, ?> tce) {
        List<Address> leavers;
        ConsistentHash consistentHashAtStart;
        if (tce.isPre() && (consistentHashAtStart = tce.getConsistentHashAtStart()) != null && !(leavers = MembershipArithmetic.getMembersLeft(consistentHashAtStart.getMembers(), tce.getConsistentHashAtEnd().getMembers())).isEmpty()) {
            log.tracef("Saw %d leavers - kicking off a lock breaking task", leavers.size());
            this.cleanTxForWhichTheOwnerLeft(leavers);
        }
    }

    private void cleanTxForWhichTheOwnerLeft(final Collection<Address> leavers) {
        try {
            this.executorService.submit(new Runnable(){

                @Override
                public void run() {
                    try {
                        StaleTransactionCleanupService.this.transactionTable.updateStateOnNodesLeaving(leavers);
                    }
                    catch (Exception e) {
                        log.error("Exception whilst updating state", e);
                    }
                }
            });
        }
        catch (RejectedExecutionException ree) {
            log.error("Unable to submit task to executor", ree);
        }
    }

    public void start(final String cacheName, final RpcManager rpcManager, Configuration configuration) {
        ThreadFactory tf = new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                String address = rpcManager != null ? rpcManager.getTransport().getAddress().toString() : "local";
                Thread th = new Thread(r, "LockBreakingService," + cacheName + "," + address);
                th.setDaemon(true);
                return th;
            }
        };
        this.executorService = Executors.newSingleThreadScheduledExecutor(tf);
        long interval = configuration.transaction().reaperWakeUpInterval();
        this.executorService.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                StaleTransactionCleanupService.this.transactionTable.cleanupCompletedTransactions();
            }
        }, interval, interval, TimeUnit.MILLISECONDS);
    }

    public void stop() {
        if (this.executorService != null) {
            this.executorService.shutdownNow();
        }
    }
}

