/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.service;

import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.AbstractQueue;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
import org.apache.cassandra.gms.IFailureDetectionEventListener;
import org.apache.cassandra.gms.IFailureDetector;
import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.locator.EndpointsByRange;
import org.apache.cassandra.locator.EndpointsForRange;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.RequestCallback;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.repair.CommonRange;
import org.apache.cassandra.repair.RepairJobDesc;
import org.apache.cassandra.repair.RepairParallelism;
import org.apache.cassandra.repair.RepairSession;
import org.apache.cassandra.repair.consistent.CoordinatorSessions;
import org.apache.cassandra.repair.consistent.LocalSessions;
import org.apache.cassandra.repair.messages.PrepareMessage;
import org.apache.cassandra.repair.messages.RepairMessage;
import org.apache.cassandra.repair.messages.RepairOption;
import org.apache.cassandra.repair.messages.SyncComplete;
import org.apache.cassandra.repair.messages.ValidationComplete;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.service.ActiveRepairServiceMBean;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.utils.CassandraVersion;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MBeanWrapper;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.UUIDGen;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ActiveRepairService
implements IEndpointStateChangeSubscriber,
IFailureDetectionEventListener,
ActiveRepairServiceMBean {
    public final ConsistentSessions consistent = new ConsistentSessions();
    private boolean registeredForEndpointChanges = false;
    public static CassandraVersion SUPPORTS_GLOBAL_PREPARE_FLAG_VERSION = new CassandraVersion("2.2.1");
    private static final Logger logger = LoggerFactory.getLogger(ActiveRepairService.class);
    public static final ActiveRepairService instance = new ActiveRepairService(FailureDetector.instance, Gossiper.instance);
    public static final long UNREPAIRED_SSTABLE = 0L;
    public static final UUID NO_PENDING_REPAIR = null;
    private final ConcurrentMap<UUID, RepairSession> sessions = new ConcurrentHashMap<UUID, RepairSession>();
    private final ConcurrentMap<UUID, ParentRepairSession> parentRepairSessions = new ConcurrentHashMap<UUID, ParentRepairSession>();
    public static final ExecutorService repairCommandExecutor;
    private final IFailureDetector failureDetector;
    private final Gossiper gossiper;
    private final Cache<Integer, Pair<ParentRepairStatus, List<String>>> repairStatusByCmd;

    public ActiveRepairService(IFailureDetector failureDetector, Gossiper gossiper) {
        this.failureDetector = failureDetector;
        this.gossiper = gossiper;
        this.repairStatusByCmd = CacheBuilder.newBuilder().expireAfterWrite(Long.getLong("cassandra.parent_repair_status_expiry_seconds", TimeUnit.SECONDS.convert(1L, TimeUnit.DAYS)).longValue(), TimeUnit.SECONDS).maximumSize(Long.getLong("cassandra.parent_repair_status_cache_size", 100000L).longValue()).build();
        MBeanWrapper.instance.registerMBean((Object)this, "org.apache.cassandra.db:type=RepairService");
    }

    public void start() {
        this.consistent.local.start();
        ScheduledExecutors.optionalTasks.scheduleAtFixedRate(this.consistent.local::cleanup, 0L, LocalSessions.CLEANUP_INTERVAL, TimeUnit.SECONDS);
    }

    @Override
    public List<Map<String, String>> getSessions(boolean all) {
        return this.consistent.local.sessionInfo(all);
    }

    @Override
    public void failSession(String session, boolean force) {
        UUID sessionID = UUID.fromString(session);
        this.consistent.local.cancelSession(sessionID, force);
    }

    @Override
    public void setRepairSessionSpaceInMegabytes(int sizeInMegabytes) {
        DatabaseDescriptor.setRepairSessionSpaceInMegabytes(sizeInMegabytes);
    }

    @Override
    public int getRepairSessionSpaceInMegabytes() {
        return DatabaseDescriptor.getRepairSessionSpaceInMegabytes();
    }

    public RepairSession submitRepairSession(UUID parentRepairSession, CommonRange range, String keyspace, RepairParallelism parallelismDegree, boolean isIncremental, boolean pullRepair, boolean force, PreviewKind previewKind, boolean optimiseStreams, ListeningExecutorService executor, String ... cfnames) {
        if (range.endpoints.isEmpty()) {
            return null;
        }
        if (cfnames.length == 0) {
            return null;
        }
        final RepairSession session = new RepairSession(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, isIncremental, pullRepair, force, previewKind, optimiseStreams, cfnames);
        this.sessions.put(session.getId(), session);
        this.registerOnFdAndGossip(session);
        session.addListener(new Runnable(){

            @Override
            public void run() {
                ActiveRepairService.this.sessions.remove(session.getId());
            }
        }, MoreExecutors.directExecutor());
        session.start(executor);
        return session;
    }

    @Override
    public boolean getUseOffheapMerkleTrees() {
        return DatabaseDescriptor.useOffheapMerkleTrees();
    }

    @Override
    public void setUseOffheapMerkleTrees(boolean value) {
        DatabaseDescriptor.useOffheapMerkleTrees(value);
    }

    private <T extends AbstractFuture & IFailureDetectionEventListener> void registerOnFdAndGossip(final T task) {
        this.gossiper.register((IEndpointStateChangeSubscriber)task);
        this.failureDetector.registerFailureDetectionEventListener(task);
        task.addListener(new Runnable(){

            @Override
            public void run() {
                ActiveRepairService.this.failureDetector.unregisterFailureDetectionEventListener((IFailureDetectionEventListener)task);
                ActiveRepairService.this.gossiper.unregister((IEndpointStateChangeSubscriber)task);
            }
        }, MoreExecutors.directExecutor());
    }

    public synchronized void terminateSessions() {
        IOException cause = new IOException("Terminate session is called");
        for (RepairSession session : this.sessions.values()) {
            session.forceShutdown(cause);
        }
        this.parentRepairSessions.clear();
    }

    public void recordRepairStatus(int cmd, ParentRepairStatus parentRepairStatus, List<String> messages) {
        this.repairStatusByCmd.put((Object)cmd, Pair.create(parentRepairStatus, messages));
    }

    Pair<ParentRepairStatus, List<String>> getRepairStatus(Integer cmd) {
        return (Pair)this.repairStatusByCmd.getIfPresent((Object)cmd);
    }

    public static EndpointsForRange getNeighbors(String keyspaceName, Iterable<Range<Token>> keyspaceLocalRanges, Range<Token> toRepair, Collection<String> dataCenters, Collection<String> hosts) {
        StorageService ss = StorageService.instance;
        EndpointsByRange replicaSets = ss.getRangeToAddressMap(keyspaceName);
        Range<Token> rangeSuperSet = null;
        for (Range<Token> range : keyspaceLocalRanges) {
            if (range.contains((Token)((Object)toRepair))) {
                rangeSuperSet = range;
                break;
            }
            if (!range.intersects(toRepair)) continue;
            throw new IllegalArgumentException(String.format("Requested range %s intersects a local range (%s) but is not fully contained in one; this would lead to imprecise repair. keyspace: %s", toRepair.toString(), range.toString(), keyspaceName));
        }
        if (rangeSuperSet == null || !replicaSets.containsKey(rangeSuperSet)) {
            return EndpointsForRange.empty(toRepair);
        }
        EndpointsForRange neighbors = (EndpointsForRange)replicaSets.get(rangeSuperSet).withoutSelf();
        if (dataCenters != null && !dataCenters.isEmpty()) {
            TokenMetadata.Topology topology = ss.getTokenMetadata().cloneOnlyTokenMap().getTopology();
            Multimap<String, InetAddressAndPort> dcEndpointsMap = topology.getDatacenterEndpoints();
            Iterable dcEndpoints = Iterables.concat((Iterable)Iterables.transform(dataCenters, arg_0 -> dcEndpointsMap.get(arg_0)));
            return (EndpointsForRange)neighbors.select(dcEndpoints, true);
        }
        if (hosts != null && !hosts.isEmpty()) {
            HashSet<InetAddressAndPort> specifiedHost = new HashSet<InetAddressAndPort>();
            for (String host : hosts) {
                try {
                    InetAddressAndPort endpoint = InetAddressAndPort.getByName(host.trim());
                    if (!endpoint.equals(FBUtilities.getBroadcastAddressAndPort()) && !neighbors.endpoints().contains(endpoint)) continue;
                    specifiedHost.add(endpoint);
                }
                catch (UnknownHostException e) {
                    throw new IllegalArgumentException("Unknown host specified " + host, e);
                }
            }
            if (!specifiedHost.contains(FBUtilities.getBroadcastAddressAndPort())) {
                throw new IllegalArgumentException("The current host must be part of the repair");
            }
            if (specifiedHost.size() <= 1) {
                String msg = "Specified hosts %s do not share range %s needed for repair. Either restrict repair ranges with -st/-et options, or specify one of the neighbors that share this range with this node: %s.";
                throw new IllegalArgumentException(String.format(msg, hosts, toRepair, neighbors));
            }
            specifiedHost.remove(FBUtilities.getBroadcastAddressAndPort());
            return (EndpointsForRange)neighbors.keep(specifiedHost);
        }
        return neighbors;
    }

    static long getRepairedAt(RepairOption options, boolean force) {
        if (options.isIncremental() && options.isGlobal() && !force) {
            return System.currentTimeMillis();
        }
        return 0L;
    }

    public UUID prepareForRepair(UUID parentRepairSession, InetAddressAndPort coordinator, Set<InetAddressAndPort> endpoints, RepairOption options, boolean isForcedRepair, List<ColumnFamilyStore> columnFamilyStores) {
        long repairedAt = ActiveRepairService.getRepairedAt(options, isForcedRepair);
        this.registerParentRepairSession(parentRepairSession, coordinator, columnFamilyStores, options.getRanges(), options.isIncremental(), repairedAt, options.isGlobal(), options.getPreviewKind());
        final CountDownLatch prepareLatch = new CountDownLatch(endpoints.size());
        final AtomicBoolean status = new AtomicBoolean(true);
        final Set failedNodes = Collections.synchronizedSet(new HashSet());
        RequestCallback callback = new RequestCallback(){

            public void onResponse(Message msg) {
                prepareLatch.countDown();
            }

            @Override
            public void onFailure(InetAddressAndPort from, RequestFailureReason failureReason) {
                status.set(false);
                failedNodes.add(from.toString());
                prepareLatch.countDown();
            }

            @Override
            public boolean invokeOnFailure() {
                return true;
            }
        };
        ArrayList<TableId> tableIds = new ArrayList<TableId>(columnFamilyStores.size());
        for (ColumnFamilyStore cfs : columnFamilyStores) {
            tableIds.add(cfs.metadata.id);
        }
        for (InetAddressAndPort neighbour : endpoints) {
            if (FailureDetector.instance.isAlive(neighbour)) {
                PrepareMessage message = new PrepareMessage(parentRepairSession, tableIds, options.getRanges(), options.isIncremental(), repairedAt, options.isGlobal(), options.getPreviewKind());
                Message<PrepareMessage> msg = Message.out(Verb.REPAIR_REQ, message);
                MessagingService.instance().sendWithCallback(msg, neighbour, callback);
                continue;
            }
            if (isForcedRepair && !options.isIncremental()) {
                prepareLatch.countDown();
                continue;
            }
            this.failRepair(parentRepairSession, "Endpoint not alive: " + neighbour);
        }
        try {
            if (!prepareLatch.await(1L, TimeUnit.HOURS)) {
                this.failRepair(parentRepairSession, "Did not get replies from all endpoints.");
            }
        }
        catch (InterruptedException e) {
            this.failRepair(parentRepairSession, "Interrupted while waiting for prepare repair response.");
        }
        if (!status.get()) {
            this.failRepair(parentRepairSession, "Got negative replies from endpoints " + failedNodes);
        }
        return parentRepairSession;
    }

    private void failRepair(UUID parentRepairSession, String errorMsg) {
        this.removeParentRepairSession(parentRepairSession);
        throw new RuntimeException(errorMsg);
    }

    public synchronized void registerParentRepairSession(UUID parentRepairSession, InetAddressAndPort coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, long repairedAt, boolean isGlobal, PreviewKind previewKind) {
        assert (isIncremental || repairedAt == 0L);
        if (!this.registeredForEndpointChanges) {
            Gossiper.instance.register(this);
            FailureDetector.instance.registerFailureDetectionEventListener(this);
            this.registeredForEndpointChanges = true;
        }
        if (!this.parentRepairSessions.containsKey(parentRepairSession)) {
            this.parentRepairSessions.put(parentRepairSession, new ParentRepairSession(coordinator, columnFamilyStores, ranges, isIncremental, repairedAt, isGlobal, previewKind));
        }
    }

    public ParentRepairSession getParentRepairSession(UUID parentSessionId) {
        ParentRepairSession session = (ParentRepairSession)this.parentRepairSessions.get(parentSessionId);
        if (session == null) {
            throw new RuntimeException("Parent repair session with id = " + parentSessionId + " has failed.");
        }
        return session;
    }

    public synchronized ParentRepairSession removeParentRepairSession(UUID parentSessionId) {
        String snapshotName = parentSessionId.toString();
        for (ColumnFamilyStore cfs : this.getParentRepairSession(parentSessionId).columnFamilyStores.values()) {
            if (!cfs.snapshotExists(snapshotName)) continue;
            cfs.clearSnapshot(snapshotName);
        }
        return (ParentRepairSession)this.parentRepairSessions.remove(parentSessionId);
    }

    public void handleMessage(InetAddressAndPort endpoint, RepairMessage message) {
        RepairJobDesc desc = message.desc;
        RepairSession session = (RepairSession)this.sessions.get(desc.sessionId);
        if (session == null) {
            return;
        }
        switch (message.messageType) {
            case VALIDATION_COMPLETE: {
                ValidationComplete validation = (ValidationComplete)message;
                session.validationComplete(desc, endpoint, validation.trees);
                break;
            }
            case SYNC_COMPLETE: {
                SyncComplete sync = (SyncComplete)message;
                session.syncComplete(desc, sync.nodes, sync.success, sync.summaries);
                break;
            }
        }
    }

    @Override
    public void onJoin(InetAddressAndPort endpoint, EndpointState epState) {
    }

    @Override
    public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {
    }

    @Override
    public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value) {
    }

    @Override
    public void onAlive(InetAddressAndPort endpoint, EndpointState state) {
    }

    @Override
    public void onDead(InetAddressAndPort endpoint, EndpointState state) {
    }

    @Override
    public void onRemove(InetAddressAndPort endpoint) {
        this.convict(endpoint, Double.MAX_VALUE);
    }

    @Override
    public void onRestart(InetAddressAndPort endpoint, EndpointState state) {
        this.convict(endpoint, Double.MAX_VALUE);
    }

    @Override
    public void convict(InetAddressAndPort ep, double phi) {
        if (phi < 2.0 * DatabaseDescriptor.getPhiConvictThreshold() || this.parentRepairSessions.isEmpty()) {
            return;
        }
        HashSet toRemove = new HashSet();
        for (Map.Entry repairSessionEntry : this.parentRepairSessions.entrySet()) {
            if (!((ParentRepairSession)repairSessionEntry.getValue()).coordinator.equals(ep)) continue;
            toRemove.add(repairSessionEntry.getKey());
        }
        if (!toRemove.isEmpty()) {
            logger.debug("Removing {} in parent repair sessions", toRemove);
            for (UUID id : toRemove) {
                this.removeParentRepairSession(id);
            }
        }
    }

    static {
        Config.RepairCommandPoolFullStrategy strategy = DatabaseDescriptor.getRepairCommandPoolFullStrategy();
        AbstractQueue queue = strategy == Config.RepairCommandPoolFullStrategy.reject ? new SynchronousQueue() : new LinkedBlockingQueue();
        repairCommandExecutor = new JMXEnabledThreadPoolExecutor(1, DatabaseDescriptor.getRepairCommandPoolSize(), 1L, TimeUnit.HOURS, (BlockingQueue<Runnable>)((Object)queue), new NamedThreadFactory("Repair-Task"), "internal", new ThreadPoolExecutor.AbortPolicy());
    }

    public static class ParentRepairSession {
        private final Keyspace keyspace;
        private final Map<TableId, ColumnFamilyStore> columnFamilyStores = new HashMap<TableId, ColumnFamilyStore>();
        private final Collection<Range<Token>> ranges;
        public final boolean isIncremental;
        public final boolean isGlobal;
        public final long repairedAt;
        public final InetAddressAndPort coordinator;
        public final PreviewKind previewKind;

        public ParentRepairSession(InetAddressAndPort coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, long repairedAt, boolean isGlobal, PreviewKind previewKind) {
            this.coordinator = coordinator;
            HashSet<Keyspace> keyspaces = new HashSet<Keyspace>();
            for (ColumnFamilyStore cfs : columnFamilyStores) {
                keyspaces.add(cfs.keyspace);
                this.columnFamilyStores.put(cfs.metadata.id, cfs);
            }
            Preconditions.checkArgument((keyspaces.size() == 1 ? 1 : 0) != 0, (Object)"repair sessions cannot operate on multiple keyspaces");
            this.keyspace = (Keyspace)Iterables.getOnlyElement(keyspaces);
            this.ranges = ranges;
            this.repairedAt = repairedAt;
            this.isIncremental = isIncremental;
            this.isGlobal = isGlobal;
            this.previewKind = previewKind;
        }

        public boolean isPreview() {
            return this.previewKind != PreviewKind.NONE;
        }

        public Collection<ColumnFamilyStore> getColumnFamilyStores() {
            return ImmutableSet.builder().addAll(this.columnFamilyStores.values()).build();
        }

        public Keyspace getKeyspace() {
            return this.keyspace;
        }

        public Set<TableId> getTableIds() {
            return ImmutableSet.copyOf((Iterable)Iterables.transform(this.getColumnFamilyStores(), cfs -> cfs.metadata.id));
        }

        public Set<Range<Token>> getRanges() {
            return ImmutableSet.copyOf(this.ranges);
        }

        public String toString() {
            return "ParentRepairSession{columnFamilyStores=" + this.columnFamilyStores + ", ranges=" + this.ranges + ", repairedAt=" + this.repairedAt + '}';
        }
    }

    public static class ConsistentSessions {
        public final LocalSessions local = new LocalSessions();
        public final CoordinatorSessions coordinated = new CoordinatorSessions();
    }

    public static enum ParentRepairStatus {
        IN_PROGRESS,
        COMPLETED,
        FAILED;

    }
}

