/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.db;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.RateLimiter;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.cassandra.concurrent.JMXEnabledScheduledThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DataRange;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.HintedHandOffManagerMBean;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.PartitionRangeReadCommand;
import org.apache.cassandra.db.ReadOrderGroup;
import org.apache.cassandra.db.SinglePartitionReadCommand;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.UnknownColumnFamilyException;
import org.apache.cassandra.db.WriteType;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.filter.RowFilter;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.rows.BTreeBackedRow;
import org.apache.cassandra.db.rows.BufferCell;
import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.RowIterator;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.rows.UnfilteredRowIterators;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.metrics.HintedHandoffMetrics;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.WriteResponseHandler;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.UUIDGen;
import org.apache.cassandra.utils.concurrent.OpOrder;
import org.cliffc.high_scale_lib.NonBlockingHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HintedHandOffManager
implements HintedHandOffManagerMBean {
    public static final String MBEAN_NAME = "org.apache.cassandra.db:type=HintedHandoffManager";
    public static final HintedHandOffManager instance = new HintedHandOffManager();
    private static final Logger logger = LoggerFactory.getLogger(HintedHandOffManager.class);
    private static final int MAX_SIMULTANEOUSLY_REPLAYED_HINTS = 128;
    private static final int LARGE_NUMBER = 65536;
    public final HintedHandoffMetrics metrics = new HintedHandoffMetrics();
    private volatile boolean hintedHandOffPaused = false;
    static final int maxHintTTL = Integer.parseInt(System.getProperty("cassandra.maxHintTTL", String.valueOf(Integer.MAX_VALUE)));
    private final NonBlockingHashSet<InetAddress> queuedDeliveries = new NonBlockingHashSet();
    private final JMXEnabledScheduledThreadPoolExecutor executor = new JMXEnabledScheduledThreadPoolExecutor(DatabaseDescriptor.getMaxHintsThread(), new NamedThreadFactory("HintedHandoff", 1), "internal");
    private final ColumnFamilyStore hintStore = Keyspace.open("system").getColumnFamilyStore("hints");
    private static final ColumnDefinition hintColumn = SystemKeyspace.Hints.compactValueColumn();

    public Mutation hintFor(Mutation mutation, long now, int ttl, UUID targetId) {
        assert (ttl > 0);
        InetAddress endpoint = StorageService.instance.getTokenMetadata().getEndpointForHostId(targetId);
        if (endpoint != null) {
            this.metrics.incrCreatedHints(endpoint);
        } else {
            logger.warn("Unable to find matching endpoint for target {} when storing a hint", (Object)targetId);
        }
        UUID hintId = UUIDGen.getTimeUUID();
        DecoratedKey key = StorageService.getPartitioner().decorateKey(UUIDType.instance.decompose(targetId));
        Clustering clustering = SystemKeyspace.Hints.comparator.make(hintId, 10);
        ByteBuffer value = ByteBuffer.wrap(FBUtilities.serialize(mutation, Mutation.serializer, 10));
        BufferCell cell = BufferCell.expiring(hintColumn, now, ttl, FBUtilities.nowInSeconds(), value);
        return new Mutation(PartitionUpdate.singleRowUpdate(SystemKeyspace.Hints, key, BTreeBackedRow.singleCellRow(clustering, cell)));
    }

    public static int calculateHintTTL(Mutation mutation) {
        int ttl = maxHintTTL;
        for (PartitionUpdate upd : mutation.getPartitionUpdates()) {
            ttl = Math.min(ttl, upd.metadata().getGcGraceSeconds());
        }
        return ttl;
    }

    public void start() {
        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
        try {
            mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        logger.debug("Created HHOM instance, registered MBean.");
        Runnable runnable = new Runnable(){

            @Override
            public void run() {
                HintedHandOffManager.this.scheduleAllDeliveries();
                HintedHandOffManager.this.metrics.log();
            }
        };
        this.executor.scheduleWithFixedDelay(runnable, 10L, 10L, TimeUnit.MINUTES);
    }

    private static void deleteHint(ByteBuffer tokenBytes, Clustering clustering, long timestamp) {
        DecoratedKey dk = StorageService.getPartitioner().decorateKey(tokenBytes);
        BufferCell cell = BufferCell.tombstone(hintColumn, timestamp, FBUtilities.nowInSeconds());
        PartitionUpdate upd = PartitionUpdate.singleRowUpdate(SystemKeyspace.Hints, dk, BTreeBackedRow.singleCellRow(clustering, cell));
        new Mutation(upd).applyUnsafe();
    }

    @Override
    public void deleteHintsForEndpoint(String ipOrHostname) {
        try {
            InetAddress endpoint = InetAddress.getByName(ipOrHostname);
            this.deleteHintsForEndpoint(endpoint);
        }
        catch (UnknownHostException e) {
            logger.warn("Unable to find {}, not a hostname or ipaddr of a node", (Object)ipOrHostname);
            throw new RuntimeException(e);
        }
    }

    public void deleteHintsForEndpoint(final InetAddress endpoint) {
        if (!StorageService.instance.getTokenMetadata().isMember(endpoint)) {
            return;
        }
        UUID hostId = StorageService.instance.getTokenMetadata().getHostId(endpoint);
        DecoratedKey dk = StorageService.getPartitioner().decorateKey(ByteBuffer.wrap(UUIDGen.decompose(hostId)));
        final Mutation mutation = new Mutation(PartitionUpdate.fullPartitionDelete(SystemKeyspace.Hints, dk, System.currentTimeMillis(), FBUtilities.nowInSeconds()));
        Runnable runnable = new Runnable(){

            @Override
            public void run() {
                try {
                    logger.info("Deleting any stored hints for {}", (Object)endpoint);
                    mutation.apply();
                    HintedHandOffManager.this.hintStore.forceBlockingFlush();
                    HintedHandOffManager.this.compact();
                }
                catch (Exception e) {
                    JVMStabilityInspector.inspectThrowable(e);
                    logger.warn("Could not delete hints for {}: {}", (Object)endpoint, (Object)e);
                }
            }
        };
        this.executor.submit(runnable);
    }

    @Override
    public void truncateAllHints() throws ExecutionException, InterruptedException {
        Runnable runnable = new Runnable(){

            @Override
            public void run() {
                try {
                    logger.info("Truncating all stored hints.");
                    Keyspace.open("system").getColumnFamilyStore("hints").truncateBlocking();
                }
                catch (Exception e) {
                    logger.warn("Could not truncate all hints.", (Throwable)e);
                }
            }
        };
        this.executor.submit(runnable).get();
    }

    @VisibleForTesting
    protected synchronized void compact() {
        ArrayList<Descriptor> descriptors = new ArrayList<Descriptor>();
        for (SSTable sSTable : this.hintStore.getTracker().getUncompacting()) {
            descriptors.add(sSTable.descriptor);
        }
        if (descriptors.isEmpty()) {
            return;
        }
        try {
            CompactionManager.instance.submitUserDefined(this.hintStore, descriptors, (int)(System.currentTimeMillis() / 1000L)).get();
        }
        catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    private int waitForSchemaAgreement(InetAddress endpoint) throws TimeoutException {
        Gossiper gossiper = Gossiper.instance;
        int waited = 0;
        while (gossiper.getEndpointStateForEndpoint(endpoint) != null && gossiper.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA) == null) {
            Uninterruptibles.sleepUninterruptibly((long)1L, (TimeUnit)TimeUnit.SECONDS);
            if ((waited += 1000) <= 2 * StorageService.RING_DELAY) continue;
            throw new TimeoutException("Didin't receive gossiped schema from " + endpoint + " in " + 2 * StorageService.RING_DELAY + "ms");
        }
        if (gossiper.getEndpointStateForEndpoint(endpoint) == null) {
            throw new TimeoutException("Node " + endpoint + " vanished while waiting for agreement");
        }
        waited = 0;
        while (gossiper.getEndpointStateForEndpoint(endpoint) != null && !gossiper.getEndpointStateForEndpoint((InetAddress)endpoint).getApplicationState((ApplicationState)ApplicationState.SCHEMA).value.equals(gossiper.getEndpointStateForEndpoint((InetAddress)FBUtilities.getBroadcastAddress()).getApplicationState((ApplicationState)ApplicationState.SCHEMA).value)) {
            Uninterruptibles.sleepUninterruptibly((long)1L, (TimeUnit)TimeUnit.SECONDS);
            if ((waited += 1000) <= 2 * StorageService.RING_DELAY) continue;
            throw new TimeoutException("Could not reach schema agreement with " + endpoint + " in " + 2 * StorageService.RING_DELAY + "ms");
        }
        if (gossiper.getEndpointStateForEndpoint(endpoint) == null) {
            throw new TimeoutException("Node " + endpoint + " vanished while waiting for agreement");
        }
        logger.debug("schema for {} matches local schema", (Object)endpoint);
        return waited;
    }

    private void deliverHintsToEndpoint(InetAddress endpoint) {
        if (this.hintStore.isEmpty()) {
            return;
        }
        if (this.hintedHandOffPaused) {
            logger.debug("Hints delivery process is paused, aborting");
            return;
        }
        logger.debug("Checking remote({}) schema before delivering hints", (Object)endpoint);
        try {
            this.waitForSchemaAgreement(endpoint);
        }
        catch (TimeoutException e) {
            return;
        }
        if (!FailureDetector.instance.isAlive(endpoint)) {
            logger.debug("Endpoint {} died before hint delivery, aborting", (Object)endpoint);
            return;
        }
        this.doDeliverHintsToEndpoint(endpoint);
        this.hintStore.forceBlockingFlush();
    }

    private boolean checkDelivered(InetAddress endpoint, List<WriteResponseHandler<Mutation>> handlers, AtomicInteger rowsReplayed) {
        for (WriteResponseHandler<Mutation> handler : handlers) {
            try {
                handler.get();
            }
            catch (WriteTimeoutException e) {
                logger.info("Failed replaying hints to {}; aborting ({} delivered), error : {}", new Object[]{endpoint, rowsReplayed, e.getMessage()});
                return false;
            }
        }
        return true;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private void doDeliverHintsToEndpoint(InetAddress endpoint) {
        UUID hostId = Gossiper.instance.getHostId(endpoint);
        logger.info("Started hinted handoff for host: {} with IP: {}", (Object)hostId, (Object)endpoint);
        final ByteBuffer hostIdBytes = ByteBuffer.wrap(UUIDGen.decompose(hostId));
        DecoratedKey epkey = StorageService.getPartitioner().decorateKey(hostIdBytes);
        final AtomicInteger rowsReplayed = new AtomicInteger(0);
        int throttleInKB = DatabaseDescriptor.getHintedHandoffThrottleInKB() / (StorageService.instance.getTokenMetadata().getAllEndpoints().size() - 1);
        RateLimiter rateLimiter = RateLimiter.create((double)(throttleInKB == 0 ? Double.MAX_VALUE : (double)(throttleInKB * 1024)));
        int nowInSec = FBUtilities.nowInSeconds();
        try (OpOrder.Group op = this.hintStore.readOrdering.start();
             RowIterator iter = UnfilteredRowIterators.filter(SinglePartitionReadCommand.fullPartitionRead(SystemKeyspace.Hints, nowInSec, epkey).queryMemtableAndDisk(this.hintStore, op), nowInSec);){
            ArrayList responseHandlers = Lists.newArrayList();
            while (iter.hasNext()) {
                Mutation mutation;
                if (!FailureDetector.instance.isAlive(endpoint)) {
                    logger.info("Endpoint {} died during hint delivery; aborting ({} delivered)", (Object)endpoint, (Object)rowsReplayed);
                    return;
                }
                if (this.hintedHandOffPaused) {
                    logger.debug("Hints delivery process is paused, aborting");
                    return;
                }
                if (responseHandlers.size() > 128 && !this.checkDelivered(endpoint, responseHandlers, rowsReplayed)) {
                    return;
                }
                final Row hint = (Row)iter.next();
                int version = (Integer)Int32Type.instance.compose(hint.clustering().get(1));
                Cell cell = hint.getCell(hintColumn);
                final long timestamp = cell.timestamp();
                DataInputBuffer in = new DataInputBuffer(cell.value(), true);
                try {
                    mutation = Mutation.serializer.deserialize(in, version);
                }
                catch (UnknownColumnFamilyException e) {
                    logger.debug("Skipping delivery of hint for deleted table", (Throwable)e);
                    HintedHandOffManager.deleteHint(hostIdBytes, hint.clustering(), timestamp);
                    continue;
                }
                catch (IOException e) {
                    throw new AssertionError((Object)e);
                }
                for (UUID cfId : mutation.getColumnFamilyIds()) {
                    if (timestamp > SystemKeyspace.getTruncatedAt(cfId)) continue;
                    logger.debug("Skipping delivery of hint for truncated table {}", (Object)cfId);
                    mutation = mutation.without(cfId);
                }
                if (mutation.isEmpty()) {
                    HintedHandOffManager.deleteHint(hostIdBytes, hint.clustering(), timestamp);
                    continue;
                }
                MessageOut<Mutation> message = mutation.createMessage();
                rateLimiter.acquire(message.serializedSize(10));
                Runnable callback = new Runnable(){

                    @Override
                    public void run() {
                        rowsReplayed.incrementAndGet();
                        HintedHandOffManager.deleteHint(hostIdBytes, hint.clustering(), timestamp);
                    }
                };
                WriteResponseHandler responseHandler = new WriteResponseHandler(endpoint, WriteType.SIMPLE, callback);
                MessagingService.instance().sendRR(message, endpoint, responseHandler, false);
                responseHandlers.add(responseHandler);
            }
            if (!this.checkDelivered(endpoint, responseHandlers, rowsReplayed)) return;
            logger.info("Finished hinted handoff of {} rows to endpoint {}", (Object)rowsReplayed, (Object)endpoint);
            return;
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private void scheduleAllDeliveries() {
        logger.debug("Started scheduleAllDeliveries");
        this.compact();
        PartitionRangeReadCommand cmd = new PartitionRangeReadCommand(this.hintStore.metadata, FBUtilities.nowInSeconds(), ColumnFilter.all(this.hintStore.metadata), RowFilter.NONE, DataLimits.cqlLimits(Integer.MAX_VALUE, 1), DataRange.allData(StorageService.getPartitioner()));
        try (ReadOrderGroup orderGroup = cmd.startOrderGroup();
             UnfilteredPartitionIterator iter = cmd.executeLocally(orderGroup);){
            while (iter.hasNext()) {
                UnfilteredRowIterator partition;
                block35: {
                    partition = (UnfilteredRowIterator)iter.next();
                    Throwable throwable = null;
                    try {
                        UUID hostId = UUIDGen.getUUID(partition.partitionKey().getKey());
                        InetAddress target = StorageService.instance.getTokenMetadata().getEndpointForHostId(hostId);
                        if (target != null) {
                            this.scheduleHintDelivery(target, false);
                        }
                        if (partition == null) continue;
                        if (throwable == null) break block35;
                    }
                    catch (Throwable throwable2) {
                        try {
                            throwable = throwable2;
                            throw throwable2;
                        }
                        catch (Throwable throwable3) {
                            if (partition == null) throw throwable3;
                            if (throwable == null) {
                                partition.close();
                                throw throwable3;
                            }
                            try {
                                partition.close();
                                throw throwable3;
                            }
                            catch (Throwable throwable4) {
                                throwable.addSuppressed(throwable4);
                                throw throwable3;
                            }
                        }
                    }
                    try {
                        partition.close();
                        continue;
                    }
                    catch (Throwable throwable5) {
                        throwable.addSuppressed(throwable5);
                        continue;
                    }
                }
                partition.close();
            }
        }
        logger.debug("Finished scheduleAllDeliveries");
    }

    public void scheduleHintDelivery(final InetAddress to, final boolean precompact) {
        if (!this.queuedDeliveries.add((Object)to)) {
            return;
        }
        logger.debug("Scheduling delivery of Hints to {}", (Object)to);
        this.executor.execute(new Runnable(){

            @Override
            public void run() {
                try {
                    if (precompact) {
                        HintedHandOffManager.this.compact();
                    }
                    HintedHandOffManager.this.deliverHintsToEndpoint(to);
                }
                finally {
                    HintedHandOffManager.this.queuedDeliveries.remove((Object)to);
                }
            }
        });
    }

    @Override
    public void scheduleHintDelivery(String to) throws UnknownHostException {
        this.scheduleHintDelivery(InetAddress.getByName(to), true);
    }

    @Override
    public void pauseHintsDelivery(boolean b) {
        this.hintedHandOffPaused = b;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public List<String> listEndpointsPendingHints() {
        Token.TokenFactory tokenFactory = StorageService.getPartitioner().getTokenFactory();
        LinkedList<String> result = new LinkedList<String>();
        PartitionRangeReadCommand cmd = PartitionRangeReadCommand.allDataRead(SystemKeyspace.Hints, FBUtilities.nowInSeconds());
        try (ReadOrderGroup orderGroup = cmd.startOrderGroup();
             UnfilteredPartitionIterator iter = cmd.executeLocally(orderGroup);){
            while (iter.hasNext()) {
                UnfilteredRowIterator partition;
                block35: {
                    partition = (UnfilteredRowIterator)iter.next();
                    Throwable throwable = null;
                    try {
                        if (partition.hasNext()) {
                            result.addFirst(tokenFactory.toString(partition.partitionKey().getToken()));
                        }
                        if (partition == null) continue;
                        if (throwable == null) break block35;
                    }
                    catch (Throwable throwable2) {
                        try {
                            throwable = throwable2;
                            throw throwable2;
                        }
                        catch (Throwable throwable3) {
                            if (partition == null) throw throwable3;
                            if (throwable != null) {
                                try {
                                    partition.close();
                                    throw throwable3;
                                }
                                catch (Throwable throwable4) {
                                    throwable.addSuppressed(throwable4);
                                    throw throwable3;
                                }
                            }
                            partition.close();
                            throw throwable3;
                        }
                    }
                    try {
                        partition.close();
                        continue;
                    }
                    catch (Throwable throwable5) {
                        throwable.addSuppressed(throwable5);
                        continue;
                    }
                }
                partition.close();
            }
            return result;
        }
    }
}

