/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.db;

import com.google.common.base.Charsets;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.CompactionManager;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.net.IAsyncCallback;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.DigestMismatchException;
import org.apache.cassandra.service.IWriteResponseHandler;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.WriteResponseHandler;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.WrappedRunnable;
import org.apache.commons.lang.ArrayUtils;
import org.cliffc.high_scale_lib.NonBlockingHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HintedHandOffManager {
    public static final HintedHandOffManager instance = new HintedHandOffManager();
    private static final Logger logger_ = LoggerFactory.getLogger(HintedHandOffManager.class);
    public static final String HINTS_CF = "HintsColumnFamily";
    private static final int PAGE_SIZE = 10000;
    private static final String SEPARATOR = "-";
    private final NonBlockingHashSet<InetAddress> queuedDeliveries = new NonBlockingHashSet();
    private final ExecutorService executor_ = new JMXEnabledThreadPoolExecutor("HintedHandoff", DatabaseDescriptor.getCompactionThreadPriority());

    private static boolean sendMessage(InetAddress endpoint, String tableName, String cfName, ByteBuffer key) throws IOException {
        QueryFilter filter;
        ColumnFamily cf;
        if (!Gossiper.instance.isKnownEndpoint(endpoint)) {
            logger_.warn("Hints found for endpoint " + endpoint + " which is not part of the gossip network.  discarding.");
            return true;
        }
        if (!FailureDetector.instance.isAlive(endpoint)) {
            return false;
        }
        Table table = Table.open(tableName);
        DecoratedKey dkey = StorageService.getPartitioner().decorateKey(key);
        ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName);
        ByteBuffer startColumn = FBUtilities.EMPTY_BYTE_BUFFER;
        while (!HintedHandOffManager.pagingFinished(cf = cfs.getColumnFamily(filter = QueryFilter.getSliceFilter(dkey, new QueryPath(cfs.getColumnFamilyName()), startColumn, FBUtilities.EMPTY_BYTE_BUFFER, false, 10000)), startColumn)) {
            if (cf.getColumnNames().isEmpty()) {
                logger_.debug("Nothing to hand off for {}", dkey);
                break;
            }
            startColumn = cf.getColumnNames().last();
            RowMutation rm = new RowMutation(tableName, key);
            rm.add(cf);
            Message message = rm.makeRowMutationMessage();
            IWriteResponseHandler responseHandler = WriteResponseHandler.create(endpoint);
            MessagingService.instance.sendRR(message, Arrays.asList(endpoint), (IAsyncCallback)responseHandler);
            try {
                responseHandler.get();
            }
            catch (TimeoutException e) {
                return false;
            }
        }
        return true;
    }

    private static void deleteHintKey(ByteBuffer endpointAddress, ByteBuffer key, ByteBuffer tableCF, long timestamp) throws IOException {
        RowMutation rm = new RowMutation("system", endpointAddress);
        rm.delete(new QueryPath(HINTS_CF, key, tableCF), timestamp);
        rm.apply();
    }

    public static void deleteHintsForEndPoint(InetAddress endpoint) {
        ColumnFamilyStore hintStore = Table.open("system").getColumnFamilyStore(HINTS_CF);
        RowMutation rm = new RowMutation("system", ByteBuffer.wrap(endpoint.getAddress()));
        rm.delete(new QueryPath(HINTS_CF), System.currentTimeMillis());
        try {
            logger_.info("Deleting any stored hints for " + endpoint);
            rm.apply();
            hintStore.forceFlush();
            CompactionManager.instance.submitMajor(hintStore, 0L, Integer.MAX_VALUE).get();
        }
        catch (Exception e) {
            logger_.warn("Could not delete hints for " + endpoint + ": " + e);
        }
    }

    private static boolean pagingFinished(ColumnFamily hintColumnFamily, ByteBuffer startColumn) {
        return hintColumnFamily == null || hintColumnFamily.getSortedColumns().size() == 1 && hintColumnFamily.getColumn(startColumn) != null;
    }

    public static ByteBuffer makeCombinedName(String tableName, String columnFamily) {
        byte[] withsep = ArrayUtils.addAll((byte[])tableName.getBytes(Charsets.UTF_8), (byte[])SEPARATOR.getBytes());
        return ByteBuffer.wrap(ArrayUtils.addAll((byte[])withsep, (byte[])columnFamily.getBytes(Charsets.UTF_8)));
    }

    private static String[] getTableAndCFNames(ByteBuffer joined) {
        int index = ArrayUtils.lastIndexOf((byte[])joined.array(), (byte)SEPARATOR.getBytes()[0], (int)(joined.limit() + joined.arrayOffset()));
        if (index == -1 || index < joined.position() + joined.arrayOffset() + 1) {
            throw new RuntimeException("Corrupted hint name " + new String(joined.array(), joined.arrayOffset() + joined.position(), joined.remaining()));
        }
        String[] parts = new String[]{new String(ArrayUtils.subarray((byte[])joined.array(), (int)(joined.position() + joined.arrayOffset()), (int)index)), new String(ArrayUtils.subarray((byte[])joined.array(), (int)(index + 1), (int)(joined.limit() + joined.arrayOffset())))};
        return parts;
    }

    private void deliverHintsToEndpoint(InetAddress endpoint) throws IOException, DigestMismatchException, InvalidRequestException, TimeoutException {
        QueryFilter filter;
        ColumnFamily hintColumnFamily;
        logger_.info("Started hinted handoff for endpoint " + endpoint);
        this.queuedDeliveries.remove((Object)endpoint);
        DecoratedKey epkey = StorageService.getPartitioner().decorateKey(ByteBuffer.wrap(endpoint.getHostAddress().getBytes(Charsets.UTF_8)));
        int rowsReplayed = 0;
        ColumnFamilyStore hintStore = Table.open("system").getColumnFamilyStore(HINTS_CF);
        ByteBuffer startColumn = FBUtilities.EMPTY_BYTE_BUFFER;
        block2: while (!HintedHandOffManager.pagingFinished(hintColumnFamily = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter = QueryFilter.getSliceFilter(epkey, new QueryPath(HINTS_CF), startColumn, FBUtilities.EMPTY_BYTE_BUFFER, false, 10000)), Integer.MAX_VALUE), startColumn)) {
            Collection<IColumn> keyColumns = hintColumnFamily.getSortedColumns();
            for (IColumn keyColumn : keyColumns) {
                startColumn = keyColumn.name();
                Collection<IColumn> tableCFs = keyColumn.getSubColumns();
                for (IColumn tableCF : tableCFs) {
                    String[] parts = HintedHandOffManager.getTableAndCFNames(tableCF.name());
                    if (HintedHandOffManager.sendMessage(endpoint, parts[0], parts[1], keyColumn.name())) {
                        HintedHandOffManager.deleteHintKey(ByteBuffer.wrap(endpoint.getHostAddress().getBytes(Charsets.UTF_8)), keyColumn.name(), tableCF.name(), tableCF.timestamp());
                        ++rowsReplayed;
                    } else {
                        logger_.info("Could not complete hinted handoff to " + endpoint);
                        break block2;
                    }
                    startColumn = keyColumn.name();
                }
            }
        }
        if (rowsReplayed > 0) {
            hintStore.forceFlush();
            try {
                CompactionManager.instance.submitMajor(hintStore, 0L, Integer.MAX_VALUE).get();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        logger_.info(String.format("Finished hinted handoff of %s rows to endpoint %s", rowsReplayed, endpoint));
    }

    public static void renameHints(String oldTable, String newTable) throws IOException {
        QueryFilter filter;
        ColumnFamily cf;
        DecoratedKey oldTableKey = StorageService.getPartitioner().decorateKey(ByteBuffer.wrap(oldTable.getBytes(Charsets.UTF_8)));
        ColumnFamilyStore hintStore = Table.open("system").getColumnFamilyStore(HINTS_CF);
        ByteBuffer startCol = FBUtilities.EMPTY_BYTE_BUFFER;
        long now = System.currentTimeMillis();
        while (!HintedHandOffManager.pagingFinished(cf = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter = QueryFilter.getSliceFilter(oldTableKey, new QueryPath(HINTS_CF), startCol, FBUtilities.EMPTY_BYTE_BUFFER, false, 10000)), Integer.MAX_VALUE), startCol)) {
            if (newTable != null) {
                RowMutation insert2 = new RowMutation("system", ByteBuffer.wrap(newTable.getBytes(Charsets.UTF_8)));
                insert2.add(cf);
                insert2.apply();
            }
            RowMutation drop = new RowMutation("system", oldTableKey.key);
            for (ByteBuffer key : cf.getColumnNames()) {
                drop.delete(new QueryPath(HINTS_CF, key), now);
                startCol = key;
            }
            drop.apply();
        }
    }

    public void deliverHints(final InetAddress to) {
        if (!this.queuedDeliveries.add((Object)to)) {
            return;
        }
        WrappedRunnable r = new WrappedRunnable(){

            @Override
            public void runMayThrow() throws Exception {
                HintedHandOffManager.this.deliverHintsToEndpoint(to);
            }
        };
        this.executor_.submit(r);
    }

    public void deliverHints(String to) throws UnknownHostException {
        this.deliverHints(InetAddress.getByName(to));
    }
}

