/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.dht;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import java.net.InetAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.IFailureDetector;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.streaming.IStreamCallback;
import org.apache.cassandra.streaming.OperationType;
import org.apache.cassandra.streaming.StreamIn;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RangeStreamer {
    private static final Logger logger = LoggerFactory.getLogger(RangeStreamer.class);
    private final TokenMetadata metadata;
    private final InetAddress address;
    private final OperationType opType;
    private final Multimap<String, Map.Entry<InetAddress, Collection<Range<Token>>>> toFetch = HashMultimap.create();
    private final Set<ISourceFilter> sourceFilters = new HashSet<ISourceFilter>();
    protected CountDownLatch latch;
    private Set<Range<Token>> completed = Collections.newSetFromMap(new ConcurrentHashMap());

    public RangeStreamer(TokenMetadata metadata, InetAddress address, OperationType opType) {
        this.metadata = metadata;
        this.address = address;
        this.opType = opType;
    }

    public void addSourceFilter(ISourceFilter filter) {
        this.sourceFilters.add(filter);
    }

    public void addRanges(String table, Collection<Range<Token>> ranges) {
        Multimap<Range<Token>, InetAddress> rangesForTable = this.getAllRangesWithSourcesFor(table, ranges);
        if (logger.isDebugEnabled()) {
            for (Map.Entry entry : rangesForTable.entries()) {
                logger.debug(String.format("%s: range %s exists on %s", new Object[]{this.opType, entry.getKey(), entry.getValue()}));
            }
        }
        for (Map.Entry entry : RangeStreamer.getRangeFetchMap(rangesForTable, this.sourceFilters).asMap().entrySet()) {
            if (logger.isDebugEnabled()) {
                for (Range r : (Collection)entry.getValue()) {
                    logger.debug(String.format("%s: range %s from source %s for table %s", new Object[]{this.opType, r, entry.getKey(), table}));
                }
            }
            this.toFetch.put((Object)table, (Object)entry);
        }
    }

    private Multimap<Range<Token>, InetAddress> getAllRangesWithSourcesFor(String table, Collection<Range<Token>> desiredRanges) {
        AbstractReplicationStrategy strat = Table.open(table).getReplicationStrategy();
        Multimap<Range<Token>, InetAddress> rangeAddresses = strat.getRangeAddresses(this.metadata.cloneOnlyTokenMap());
        ArrayListMultimap rangeSources = ArrayListMultimap.create();
        for (Range<Token> desiredRange : desiredRanges) {
            for (Range range : rangeAddresses.keySet()) {
                if (!range.contains(desiredRange)) continue;
                List<InetAddress> preferred = DatabaseDescriptor.getEndpointSnitch().getSortedListByProximity(this.address, rangeAddresses.get((Object)range));
                rangeSources.putAll(desiredRange, preferred);
                break;
            }
            if (rangeSources.keySet().contains(desiredRange)) continue;
            throw new IllegalStateException("No sources found for " + desiredRange);
        }
        return rangeSources;
    }

    private static Multimap<InetAddress, Range<Token>> getRangeFetchMap(Multimap<Range<Token>, InetAddress> rangesWithSources, Collection<ISourceFilter> sourceFilters) {
        HashMultimap rangeFetchMapMap = HashMultimap.create();
        for (Range range : rangesWithSources.keySet()) {
            boolean foundSource = false;
            block1: for (InetAddress address : rangesWithSources.get((Object)range)) {
                if (address.equals(FBUtilities.getBroadcastAddress())) {
                    foundSource = true;
                    continue;
                }
                for (ISourceFilter filter : sourceFilters) {
                    if (filter.shouldInclude(address)) continue;
                    continue block1;
                }
                rangeFetchMapMap.put((Object)address, (Object)range);
                foundSource = true;
                break;
            }
            if (foundSource) continue;
            throw new IllegalStateException("unable to find sufficient sources for streaming range " + range);
        }
        return rangeFetchMapMap;
    }

    public static Multimap<InetAddress, Range<Token>> getWorkMap(Multimap<Range<Token>, InetAddress> rangesWithSourceTarget) {
        return RangeStreamer.getRangeFetchMap(rangesWithSourceTarget, Collections.singleton(new FailureDetectorSourceFilter(FailureDetector.instance)));
    }

    Multimap<String, Map.Entry<InetAddress, Collection<Range<Token>>>> toFetch() {
        return this.toFetch;
    }

    public void fetch() {
        this.latch = new CountDownLatch(this.toFetch.entries().size());
        for (Map.Entry entry : this.toFetch.entries()) {
            final String table = (String)entry.getKey();
            final InetAddress source = (InetAddress)((Map.Entry)entry.getValue()).getKey();
            final Collection ranges = (Collection)((Map.Entry)entry.getValue()).getValue();
            IStreamCallback callback = new IStreamCallback(){

                @Override
                public void onSuccess() {
                    RangeStreamer.this.completed.addAll(ranges);
                    RangeStreamer.this.latch.countDown();
                    if (logger.isDebugEnabled()) {
                        logger.debug(String.format("Removed %s/%s as a %s source; remaining is %s", new Object[]{source, table, RangeStreamer.this.opType, RangeStreamer.this.latch.getCount()}));
                    }
                }

                @Override
                public void onFailure() {
                    RangeStreamer.this.latch.countDown();
                    logger.warn("Streaming from " + source + " failed");
                }
            };
            if (logger.isDebugEnabled()) {
                logger.debug("" + (Object)((Object)this.opType) + "ing from " + source + " ranges " + StringUtils.join((Collection)ranges, (String)", "));
            }
            StreamIn.requestRanges(source, table, ranges, callback, this.opType);
        }
        try {
            this.latch.await();
            for (Map.Entry entry : this.toFetch.entries()) {
                if (this.completed.containsAll((Collection)((Map.Entry)entry.getValue()).getValue())) continue;
                throw new RuntimeException(String.format("Unable to fetch range %s for keyspace %s from any hosts", ((Map.Entry)entry.getValue()).getValue(), entry.getKey()));
            }
        }
        catch (InterruptedException e) {
            throw new AssertionError((Object)e);
        }
    }

    public static class SingleDatacenterFilter
    implements ISourceFilter {
        private final String sourceDc;
        private final IEndpointSnitch snitch;

        public SingleDatacenterFilter(IEndpointSnitch snitch, String sourceDc) {
            this.sourceDc = sourceDc;
            this.snitch = snitch;
        }

        @Override
        public boolean shouldInclude(InetAddress endpoint) {
            return this.snitch.getDatacenter(endpoint).equals(this.sourceDc);
        }
    }

    public static class FailureDetectorSourceFilter
    implements ISourceFilter {
        private final IFailureDetector fd;

        public FailureDetectorSourceFilter(IFailureDetector fd) {
            this.fd = fd;
        }

        @Override
        public boolean shouldInclude(InetAddress endpoint) {
            return this.fd.isAlive(endpoint);
        }
    }

    public static interface ISourceFilter {
        public boolean shouldInclude(InetAddress var1);
    }
}

