/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.dht;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.RangeFetchMapCalculator;
import org.apache.cassandra.dht.StreamStateStore;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.IFailureDetector;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.Endpoints;
import org.apache.cassandra.locator.EndpointsByRange;
import org.apache.cassandra.locator.EndpointsByReplica;
import org.apache.cassandra.locator.EndpointsForRange;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.LocalStrategy;
import org.apache.cassandra.locator.RangesAtEndpoint;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.ReplicaCollection;
import org.apache.cassandra.locator.Replicas;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.streaming.StreamEventHandler;
import org.apache.cassandra.streaming.StreamOperation;
import org.apache.cassandra.streaming.StreamPlan;
import org.apache.cassandra.streaming.StreamResultFuture;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RangeStreamer {
    private static final Logger logger = LoggerFactory.getLogger(RangeStreamer.class);
    public static com.google.common.base.Predicate<Replica> ALIVE_PREDICATE = replica -> (!Gossiper.instance.isEnabled() || Gossiper.instance.getEndpointStateForEndpoint(replica.endpoint()) == null || Gossiper.instance.getEndpointStateForEndpoint(replica.endpoint()).isAlive()) && FailureDetector.instance.isAlive(replica.endpoint());
    private final Collection<Token> tokens;
    private final TokenMetadata metadata;
    private final InetAddressAndPort address;
    private final String description;
    private final Map<String, Multimap<InetAddressAndPort, FetchReplica>> toFetch = new HashMap<String, Multimap<InetAddressAndPort, FetchReplica>>();
    private final List<SourceFilter> sourceFilters = new ArrayList<SourceFilter>();
    private final StreamPlan streamPlan;
    private final boolean useStrictConsistency;
    private final IEndpointSnitch snitch;
    private final StreamStateStore stateStore;

    public RangeStreamer(TokenMetadata metadata, Collection<Token> tokens, InetAddressAndPort address, StreamOperation streamOperation, boolean useStrictConsistency, IEndpointSnitch snitch, StreamStateStore stateStore, boolean connectSequentially, int connectionsPerHost) {
        this(metadata, tokens, address, streamOperation, useStrictConsistency, snitch, stateStore, FailureDetector.instance, connectSequentially, connectionsPerHost);
    }

    RangeStreamer(TokenMetadata metadata, Collection<Token> tokens, InetAddressAndPort address, StreamOperation streamOperation, boolean useStrictConsistency, IEndpointSnitch snitch, StreamStateStore stateStore, IFailureDetector failureDetector, boolean connectSequentially, int connectionsPerHost) {
        Preconditions.checkArgument((streamOperation == StreamOperation.BOOTSTRAP || streamOperation == StreamOperation.REBUILD ? 1 : 0) != 0, (Object)((Object)streamOperation));
        this.metadata = metadata;
        this.tokens = tokens;
        this.address = address;
        this.description = streamOperation.getDescription();
        this.streamPlan = new StreamPlan(streamOperation, connectionsPerHost, connectSequentially, null, PreviewKind.NONE);
        this.useStrictConsistency = useStrictConsistency;
        this.snitch = snitch;
        this.stateStore = stateStore;
        this.streamPlan.listeners(this.stateStore, new StreamEventHandler[0]);
        this.addSourceFilter(new FailureDetectorSourceFilter(failureDetector));
        this.addSourceFilter(new ExcludeLocalNodeFilter());
    }

    public void addSourceFilter(SourceFilter filter) {
        this.sourceFilters.add(filter);
    }

    private static String buildErrorMessage(Collection<SourceFilter> sourceFilters, ReplicaCollection<?> replicas) {
        StringBuilder failureMessage = new StringBuilder();
        block0: for (Replica r : replicas) {
            for (SourceFilter filter : sourceFilters) {
                if (filter.apply(r)) continue;
                failureMessage.append(filter.message(r));
                continue block0;
            }
        }
        return failureMessage.toString();
    }

    public void addRanges(String keyspaceName, ReplicaCollection<?> replicas) {
        Keyspace keyspace = Keyspace.open(keyspaceName);
        AbstractReplicationStrategy strat = keyspace.getReplicationStrategy();
        if (strat instanceof LocalStrategy) {
            logger.info("Not adding ranges for Local Strategy keyspace={}", (Object)keyspaceName);
            return;
        }
        boolean useStrictSource = this.useStrictSourcesForRanges(strat);
        EndpointsByReplica fetchMap = this.calculateRangesToFetchWithPreferredEndpoints(replicas, keyspace, useStrictSource);
        for (Map.Entry entry : fetchMap.flattenEntries()) {
            logger.info("{}: range {} exists on {} for keyspace {}", new Object[]{this.description, entry.getKey(), entry.getValue(), keyspaceName});
        }
        Multimap<InetAddressAndPort, FetchReplica> workMap = useStrictSource || strat == null || strat.getReplicationFactor().allReplicas == 1 || strat.getReplicationFactor().hasTransientReplicas() ? RangeStreamer.convertPreferredEndpointsToWorkMap(fetchMap) : RangeStreamer.getOptimizedWorkMap(fetchMap, this.sourceFilters, keyspaceName);
        if (this.toFetch.put(keyspaceName, workMap) != null) {
            throw new IllegalArgumentException("Keyspace is already added to fetch map");
        }
        if (logger.isTraceEnabled()) {
            for (Map.Entry entry : workMap.asMap().entrySet()) {
                for (FetchReplica r : (Collection)entry.getValue()) {
                    logger.trace("{}: range source {} local range {} for keyspace {}", new Object[]{this.description, r.remote, r.local, keyspaceName});
                }
            }
        }
    }

    private boolean useStrictSourcesForRanges(AbstractReplicationStrategy strat) {
        return this.useStrictConsistency && this.tokens != null && this.metadata.getSizeOfAllEndpoints() != strat.getReplicationFactor().allReplicas;
    }

    private EndpointsByReplica calculateRangesToFetchWithPreferredEndpoints(ReplicaCollection<?> fetchRanges, Keyspace keyspace, boolean useStrictConsistency) {
        AbstractReplicationStrategy strat = keyspace.getReplicationStrategy();
        TokenMetadata tmd = this.metadata.cloneOnlyTokenMap();
        TokenMetadata tmdAfter = null;
        if (this.tokens != null) {
            tmdAfter = tmd.cloneOnlyTokenMap();
            tmdAfter.updateNormalTokens(this.tokens, this.address);
        } else if (useStrictConsistency) {
            throw new IllegalArgumentException("Can't ask for strict consistency and not supply tokens");
        }
        return RangeStreamer.calculateRangesToFetchWithPreferredEndpoints(this.snitch::sortedByProximity, strat, fetchRanges, useStrictConsistency, tmd, tmdAfter, keyspace.getName(), this.sourceFilters);
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public static EndpointsByReplica calculateRangesToFetchWithPreferredEndpoints(BiFunction<InetAddressAndPort, EndpointsForRange, EndpointsForRange> snitchGetSortedListByProximity, AbstractReplicationStrategy strat, ReplicaCollection<?> fetchRanges, boolean useStrictConsistency, TokenMetadata tmdBefore, TokenMetadata tmdAfter, String keyspace, Collection<SourceFilter> sourceFilters) {
        EndpointsByRange rangeAddresses = strat.getRangeAddresses(tmdBefore);
        InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort();
        logger.debug("Keyspace: {}", (Object)keyspace);
        logger.debug("To fetch RN: {}", fetchRanges);
        logger.debug("Fetch ranges: {}", (Object)rangeAddresses);
        com.google.common.base.Predicate testSourceFilters = Predicates.and(sourceFilters);
        Function<EndpointsForRange, EndpointsForRange> sorted = endpoints -> (EndpointsForRange)snitchGetSortedListByProximity.apply(localAddress, (EndpointsForRange)endpoints);
        EndpointsByReplica.Builder rangesToFetchWithPreferredEndpoints = new EndpointsByReplica.Builder();
        for (Replica toFetch : fetchRanges) {
            com.google.common.base.Predicate isSufficient = r -> toFetch.isTransient() || r.isFull();
            logger.debug("To fetch {}", (Object)toFetch);
            for (Range range : rangeAddresses.keySet()) {
                EndpointsForRange sources;
                if (!range.contains(toFetch.range())) continue;
                EndpointsForRange oldEndpoints = sorted.apply(rangeAddresses.get(range));
                if (useStrictConsistency) {
                    EndpointsForRange strictEndpoints;
                    if (oldEndpoints.size() == strat.getReplicationFactor().allReplicas) {
                        EndpointsForRange newEndpoints = strat.calculateNaturalReplicas((Token)toFetch.range().right, tmdAfter);
                        logger.debug("Old endpoints {}", (Object)oldEndpoints);
                        logger.debug("New endpoints {}", (Object)newEndpoints);
                        strictEndpoints = (EndpointsForRange)oldEndpoints.without(newEndpoints.endpoints());
                        if (strictEndpoints.size() > 1) {
                            throw new AssertionError((Object)("Expected <= 1 endpoint but found " + strictEndpoints));
                        }
                        if (!Iterables.all((Iterable)strictEndpoints, (com.google.common.base.Predicate)testSourceFilters)) {
                            throw new IllegalStateException("Necessary replicas for strict consistency were removed by source filters: " + RangeStreamer.buildErrorMessage(sourceFilters, strictEndpoints));
                        }
                        if (strictEndpoints.isEmpty() && toFetch.isTransient()) {
                            throw new AssertionError((Object)("If there are no endpoints to fetch from then we must be transitioning from transient to full for range " + toFetch));
                        }
                        if (!Iterables.any((Iterable)strictEndpoints, (com.google.common.base.Predicate)isSufficient)) {
                            Optional fullReplica = Iterables.tryFind((Iterable)oldEndpoints, (com.google.common.base.Predicate)Predicates.and((com.google.common.base.Predicate)isSufficient, (com.google.common.base.Predicate)testSourceFilters)).toJavaUtil();
                            if (!fullReplica.isPresent()) throw new IllegalStateException("Couldn't find any matching sufficient replica out of " + RangeStreamer.buildErrorMessage(sourceFilters, oldEndpoints));
                            strictEndpoints = Endpoints.concat(strictEndpoints, EndpointsForRange.of((Replica)fullReplica.get()));
                        }
                    } else {
                        strictEndpoints = sorted.apply((EndpointsForRange)oldEndpoints.filter((Predicate)Predicates.and((com.google.common.base.Predicate)isSufficient, (com.google.common.base.Predicate)testSourceFilters)));
                    }
                    sources = strictEndpoints;
                } else {
                    sources = sorted.apply((EndpointsForRange)oldEndpoints.filter((Predicate)Predicates.and((com.google.common.base.Predicate)isSufficient, (com.google.common.base.Predicate)testSourceFilters)));
                    sources = sources.size() > 0 ? (EndpointsForRange)sources.subList(0, 1) : sources;
                }
                rangesToFetchWithPreferredEndpoints.putAll(toFetch, sources, ReplicaCollection.Builder.Conflict.NONE);
                logger.debug("Endpoints to fetch for {} are {}", (Object)toFetch, (Object)sources);
            }
            EndpointsForRange addressList = (EndpointsForRange)rangesToFetchWithPreferredEndpoints.getIfPresent(toFetch);
            if (addressList == null) {
                throw new IllegalStateException("Failed to find endpoints to fetch " + toFetch);
            }
            if (useStrictConsistency && addressList.size() > 1 && (((EndpointsForRange)addressList.filter(Replica::isFull)).size() > 1 || ((EndpointsForRange)addressList.filter(Replica::isTransient)).size() > 1)) {
                throw new IllegalStateException(String.format("Multiple strict sources found for %s, sources: %s", toFetch, addressList));
            }
            if (Iterables.any((Iterable)addressList, (com.google.common.base.Predicate)isSufficient)) continue;
            if (strat.getReplicationFactor().allReplicas == 1) {
                if (useStrictConsistency) {
                    logger.warn("A node required to move the data consistently is down");
                    throw new IllegalStateException("Unable to find sufficient sources for streaming range " + toFetch + " in keyspace " + keyspace + " with RF=1. Ensure this keyspace contains replicas in the source datacenter.");
                }
                logger.warn("Unable to find sufficient sources for streaming range {} in keyspace {} with RF=1. Keyspace might be missing data.", (Object)toFetch, (Object)keyspace);
                continue;
            }
            if (!useStrictConsistency) throw new IllegalStateException("Unable to find sufficient sources for streaming range " + toFetch + " in keyspace " + keyspace);
            logger.warn("A node required to move the data consistently is down");
            throw new IllegalStateException("Unable to find sufficient sources for streaming range " + toFetch + " in keyspace " + keyspace);
        }
        return rangesToFetchWithPreferredEndpoints.build();
    }

    public static Multimap<InetAddressAndPort, FetchReplica> convertPreferredEndpointsToWorkMap(EndpointsByReplica preferredEndpoints) {
        HashMultimap workMap = HashMultimap.create();
        for (Map.Entry e : preferredEndpoints.entrySet()) {
            for (Replica source : (EndpointsForRange)e.getValue()) {
                assert (((Replica)e.getKey()).isSelf());
                assert (!source.isSelf());
                workMap.put((Object)source.endpoint(), (Object)new FetchReplica((Replica)e.getKey(), source));
            }
        }
        logger.debug("Work map {}", (Object)workMap);
        return workMap;
    }

    private static Multimap<InetAddressAndPort, FetchReplica> getOptimizedWorkMap(EndpointsByReplica rangesWithSources, Collection<SourceFilter> sourceFilters, String keyspace) {
        EndpointsByRange.Builder unwrapped = new EndpointsByRange.Builder();
        for (Map.Entry entry : rangesWithSources.flattenEntries()) {
            Replicas.temporaryAssertFull(entry.getValue());
            unwrapped.put(((Replica)entry.getKey()).range(), entry.getValue());
        }
        EndpointsByRange unwrappedView = unwrapped.build();
        RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(unwrappedView, sourceFilters, keyspace);
        Multimap<InetAddressAndPort, Range<Token>> rangeFetchMapMap = calculator.getRangeFetchMap();
        logger.info("Output from RangeFetchMapCalculator for keyspace {}", (Object)keyspace);
        RangeStreamer.validateRangeFetchMap(unwrappedView, rangeFetchMapMap, keyspace);
        HashMultimap wrapped = HashMultimap.create();
        for (Map.Entry entry : rangeFetchMapMap.entries()) {
            Replica toFetch = null;
            for (Replica r : rangesWithSources.keySet()) {
                if (!r.range().equals(entry.getValue())) continue;
                if (toFetch != null) {
                    throw new AssertionError((Object)String.format("There shouldn't be multiple replicas for range %s, replica %s and %s here", r.range(), r, toFetch));
                }
                toFetch = r;
            }
            if (toFetch == null) {
                throw new AssertionError((Object)"Shouldn't be possible for the Replica we fetch to be null here");
            }
            wrapped.put(entry.getKey(), (Object)new FetchReplica(toFetch, Replica.fullReplica((InetAddressAndPort)entry.getKey(), (Range)entry.getValue())));
        }
        return wrapped;
    }

    private static void validateRangeFetchMap(EndpointsByRange rangesWithSources, Multimap<InetAddressAndPort, Range<Token>> rangeFetchMapMap, String keyspace) {
        for (Map.Entry entry : rangeFetchMapMap.entries()) {
            if (((InetAddressAndPort)entry.getKey()).equals(FBUtilities.getBroadcastAddressAndPort())) {
                throw new IllegalStateException("Trying to stream locally. Range: " + entry.getValue() + " in keyspace " + keyspace);
            }
            if (!rangesWithSources.get((Range)entry.getValue()).endpoints().contains(entry.getKey())) {
                throw new IllegalStateException("Trying to stream from wrong endpoint. Range: " + entry.getValue() + " in keyspace " + keyspace + " from endpoint: " + entry.getKey());
            }
            logger.info("Streaming range {} from endpoint {} for keyspace {}", new Object[]{entry.getValue(), entry.getKey(), keyspace});
        }
    }

    @VisibleForTesting
    Map<String, Multimap<InetAddressAndPort, FetchReplica>> toFetch() {
        return this.toFetch;
    }

    public StreamResultFuture fetchAsync() {
        this.toFetch.forEach((keyspace, sources) -> {
            logger.debug("Keyspace {} Sources {}", keyspace, sources);
            sources.asMap().forEach((source, fetchReplicas) -> {
                SystemKeyspace.AvailableRanges available = this.stateStore.getAvailableRanges((String)keyspace, this.metadata.partitioner);
                com.google.common.base.Predicate isAvailable = fetch -> {
                    boolean isInFull = available.full.contains(fetch.local.range());
                    boolean isInTrans = available.trans.contains(fetch.local.range());
                    if (!isInFull && !isInTrans) {
                        return false;
                    }
                    if (fetch.local.isFull()) {
                        return isInFull == fetch.remote.isFull();
                    }
                    return true;
                };
                List remaining = fetchReplicas.stream().filter(Predicates.not((com.google.common.base.Predicate)isAvailable)).collect(Collectors.toList());
                if (remaining.size() < available.full.size() + available.trans.size()) {
                    List skipped = fetchReplicas.stream().filter(isAvailable).collect(Collectors.toList());
                    logger.info("Some ranges of {} are already available. Skipping streaming those ranges. Skipping {}. Fully available {} Transiently available {}", new Object[]{fetchReplicas, skipped, available.full, available.trans});
                }
                if (logger.isTraceEnabled()) {
                    logger.trace("{}ing from {} ranges {}", new Object[]{this.description, source, StringUtils.join(remaining, (String)", ")});
                }
                InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort();
                RangesAtEndpoint full = remaining.stream().filter(pair -> pair.remote.isFull()).map(pair -> pair.local).collect(RangesAtEndpoint.collector(self));
                RangesAtEndpoint transientReplicas = remaining.stream().filter(pair -> pair.remote.isTransient()).map(pair -> pair.local).collect(RangesAtEndpoint.collector(self));
                logger.debug("Source and our replicas {}", fetchReplicas);
                logger.debug("Source {} Keyspace {}  streaming full {} transient {}", new Object[]{source, keyspace, full, transientReplicas});
                this.streamPlan.requestRanges((InetAddressAndPort)source, (String)keyspace, full, transientReplicas);
            });
        });
        return this.streamPlan.execute();
    }

    public static class WhitelistedSourcesFilter
    implements SourceFilter {
        private final Set<InetAddressAndPort> whitelistedSources;

        public WhitelistedSourcesFilter(Set<InetAddressAndPort> whitelistedSources) {
            this.whitelistedSources = whitelistedSources;
        }

        @Override
        public boolean apply(Replica replica) {
            return this.whitelistedSources.contains(replica.endpoint());
        }

        @Override
        public String message(Replica replica) {
            return "Filtered " + replica + " out because it was not whitelisted, whitelisted sources: " + this.whitelistedSources;
        }
    }

    public static class ExcludeLocalNodeFilter
    implements SourceFilter {
        @Override
        public boolean apply(Replica replica) {
            return !replica.isSelf();
        }

        @Override
        public String message(Replica replica) {
            return "Filtered " + replica + " out because it is local";
        }
    }

    public static class SingleDatacenterFilter
    implements SourceFilter {
        private final String sourceDc;
        private final IEndpointSnitch snitch;

        public SingleDatacenterFilter(IEndpointSnitch snitch, String sourceDc) {
            this.sourceDc = sourceDc;
            this.snitch = snitch;
        }

        @Override
        public boolean apply(Replica replica) {
            return this.snitch.getDatacenter(replica).equals(this.sourceDc);
        }

        @Override
        public String message(Replica replica) {
            return "Filtered " + replica + " out because it does not belong to " + this.sourceDc + " datacenter";
        }
    }

    public static class FailureDetectorSourceFilter
    implements SourceFilter {
        private final IFailureDetector fd;

        public FailureDetectorSourceFilter(IFailureDetector fd) {
            this.fd = fd;
        }

        @Override
        public boolean apply(Replica replica) {
            return this.fd.isAlive(replica.endpoint());
        }

        @Override
        public String message(Replica replica) {
            return "Filtered " + replica + " out because it was down";
        }
    }

    public static interface SourceFilter
    extends com.google.common.base.Predicate<Replica> {
        public boolean apply(Replica var1);

        public String message(Replica var1);
    }

    public static class FetchReplica {
        public final Replica local;
        public final Replica remote;

        public FetchReplica(Replica local, Replica remote) {
            Preconditions.checkNotNull((Object)local);
            Preconditions.checkNotNull((Object)remote);
            assert (local.isSelf() && !remote.isSelf());
            this.local = local;
            this.remote = remote;
        }

        public String toString() {
            return "FetchReplica{local=" + this.local + ", remote=" + this.remote + '}';
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            FetchReplica that = (FetchReplica)o;
            if (!this.local.equals(that.local)) {
                return false;
            }
            return this.remote.equals(that.remote);
        }

        public int hashCode() {
            int result = this.local.hashCode();
            result = 31 * result + this.remote.hashCode();
            return result;
        }
    }
}

