/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.db;

import com.google.common.collect.ImmutableSortedSet;
import java.io.DataInputStream;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.HintedHandOffManagerMBean;
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.RangeSliceCommand;
import org.apache.cassandra.db.Row;
import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.UnknownColumnFamilyException;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.filter.NamesQueryFilter;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.DigestMismatchException;
import org.apache.cassandra.service.IWriteResponseHandler;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.WriteResponseHandler;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.WrappedRunnable;
import org.cliffc.high_scale_lib.NonBlockingHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HintedHandOffManager
implements HintedHandOffManagerMBean {
    public static final HintedHandOffManager instance = new HintedHandOffManager();
    public static final String HINTS_CF = "HintsColumnFamily";
    private static final Logger logger_ = LoggerFactory.getLogger(HintedHandOffManager.class);
    private static final int PAGE_SIZE = 128;
    private static final int LARGE_NUMBER = 65536;
    private static final String SEPARATOR_08 = "-";
    private final NonBlockingHashSet<InetAddress> queuedDeliveries = new NonBlockingHashSet();
    private final ExecutorService executor_ = new JMXEnabledThreadPoolExecutor("HintedHandoff", 1);

    public void start() {
        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
        try {
            mbs.registerMBean(this, new ObjectName("org.apache.cassandra.db:type=HintedHandoffManager"));
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        logger_.debug("Created HHOM instance, registered MBean.");
        Runnable runnable = new Runnable(){

            @Override
            public void run() {
                HintedHandOffManager.this.scheduleAllDeliveries();
            }
        };
        StorageService.optionalTasks.scheduleWithFixedDelay(runnable, 10L, 10L, TimeUnit.MINUTES);
    }

    private static void sendMutation(InetAddress endpoint, RowMutation mutation) throws TimeoutException {
        IWriteResponseHandler responseHandler = WriteResponseHandler.create(endpoint);
        MessagingService.instance().sendRR(mutation, endpoint, responseHandler);
        responseHandler.get();
        try {
            Thread.sleep(DatabaseDescriptor.getHintedHandoffThrottleDelay());
        }
        catch (InterruptedException e) {
            throw new AssertionError((Object)e);
        }
    }

    private static void deleteHint(ByteBuffer tokenBytes, ByteBuffer hintId, long timestamp) throws IOException {
        RowMutation rm = new RowMutation("system", tokenBytes);
        rm.delete(new QueryPath(HINTS_CF, hintId), timestamp);
        rm.applyUnsafe();
    }

    @Override
    public void deleteHintsForEndpoint(String ipOrHostname) {
        try {
            InetAddress endpoint = InetAddress.getByName(ipOrHostname);
            this.deleteHintsForEndpoint(endpoint);
        }
        catch (UnknownHostException e) {
            logger_.warn("Unable to find " + ipOrHostname + ", not a hostname or ipaddr of a node?:");
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

    public void deleteHintsForEndpoint(final InetAddress endpoint) {
        if (!StorageService.instance.getTokenMetadata().isMember(endpoint)) {
            return;
        }
        Token token = StorageService.instance.getTokenMetadata().getToken(endpoint);
        ByteBuffer tokenBytes = StorageService.getPartitioner().getTokenFactory().toByteArray(token);
        final RowMutation rm = new RowMutation("system", tokenBytes);
        rm.delete(new QueryPath(HINTS_CF), System.currentTimeMillis());
        Runnable runnable = new Runnable(){

            @Override
            public void run() {
                try {
                    logger_.info("Deleting any stored hints for " + endpoint);
                    rm.apply();
                    HintedHandOffManager.this.compact();
                }
                catch (Exception e) {
                    logger_.warn("Could not delete hints for " + endpoint + ": " + e);
                }
            }
        };
        StorageService.optionalTasks.execute(runnable);
    }

    private Future<?> compact() throws ExecutionException, InterruptedException {
        ColumnFamilyStore hintStore = Table.open("system").getColumnFamilyStore(HINTS_CF);
        hintStore.forceBlockingFlush();
        ArrayList<Descriptor> descriptors = new ArrayList<Descriptor>();
        for (SSTableReader sstable : hintStore.getSSTables()) {
            descriptors.add(sstable.descriptor);
        }
        return CompactionManager.instance.submitUserDefined(hintStore, descriptors, Integer.MAX_VALUE);
    }

    private static boolean pagingFinished(ColumnFamily hintColumnFamily, ByteBuffer startColumn) {
        return hintColumnFamily == null || hintColumnFamily.getSortedColumns().size() == 1 && hintColumnFamily.getColumn(startColumn) != null;
    }

    private int waitForSchemaAgreement(InetAddress endpoint) throws TimeoutException {
        Gossiper gossiper = Gossiper.instance;
        int waited = 0;
        while (gossiper.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA) == null) {
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                throw new AssertionError((Object)e);
            }
            if ((waited += 1000) <= 2 * StorageService.RING_DELAY) continue;
            throw new TimeoutException("Didin't receive gossiped schema from " + endpoint + " in " + 2 * StorageService.RING_DELAY + "ms");
        }
        waited = 0;
        while (!gossiper.getEndpointStateForEndpoint((InetAddress)endpoint).getApplicationState((ApplicationState)ApplicationState.SCHEMA).value.equals(gossiper.getEndpointStateForEndpoint((InetAddress)FBUtilities.getBroadcastAddress()).getApplicationState((ApplicationState)ApplicationState.SCHEMA).value)) {
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                throw new AssertionError((Object)e);
            }
            if ((waited += 1000) <= 2 * StorageService.RING_DELAY) continue;
            throw new TimeoutException("Could not reach schema agreement with " + endpoint + " in " + 2 * StorageService.RING_DELAY + "ms");
        }
        logger_.debug("schema for {} matches local schema", (Object)endpoint);
        return waited;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void deliverHintsToEndpoint(InetAddress endpoint) throws IOException, DigestMismatchException, InvalidRequestException, InterruptedException {
        try {
            this.deliverHintsToEndpointInternal(endpoint);
        }
        finally {
            this.queuedDeliveries.remove((Object)endpoint);
        }
    }

    private void deliverHintsToEndpointInternal(InetAddress endpoint) throws IOException, DigestMismatchException, InvalidRequestException, InterruptedException {
        QueryFilter filter;
        ColumnFamily hintsPage;
        int waited;
        ColumnFamilyStore hintStore = Table.open("system").getColumnFamilyStore(HINTS_CF);
        if (hintStore.isEmpty()) {
            return;
        }
        logger_.debug("Checking remote({}) schema before delivering hints", (Object)endpoint);
        try {
            waited = this.waitForSchemaAgreement(endpoint);
        }
        catch (TimeoutException e) {
            return;
        }
        if (waited == 0) {
            int sleep = FBUtilities.threadLocalRandom().nextInt(2000) * 30;
            logger_.debug("Sleeping {}ms to stagger hint delivery", (Object)sleep);
            Thread.sleep(sleep);
        }
        if (!FailureDetector.instance.isAlive(endpoint)) {
            logger_.debug("Endpoint {} died before hint delivery, aborting", (Object)endpoint);
            return;
        }
        Token token = StorageService.instance.getTokenMetadata().getToken(endpoint);
        logger_.info("Started hinted handoff for token: {} with IP: {}", (Object)token, (Object)endpoint);
        ByteBuffer tokenBytes = StorageService.getPartitioner().getTokenFactory().toByteArray(token);
        DecoratedKey epkey = StorageService.getPartitioner().decorateKey(tokenBytes);
        int rowsReplayed = 0;
        ByteBuffer startColumn = ByteBufferUtil.EMPTY_BYTE_BUFFER;
        int pageSize = 128;
        if (hintStore.getMeanColumns() > 0) {
            int averageColumnSize = (int)(hintStore.getMeanRowSize() / (long)hintStore.getMeanColumns());
            pageSize = Math.min(128, DatabaseDescriptor.getInMemoryCompactionLimit() / averageColumnSize);
            pageSize = Math.max(2, pageSize);
            logger_.debug("average hinted-row column size is {}; using pageSize of {}", (Object)averageColumnSize, (Object)pageSize);
        }
        block8: while (!HintedHandOffManager.pagingFinished(hintsPage = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter = QueryFilter.getSliceFilter(epkey, new QueryPath(HINTS_CF), startColumn, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, pageSize)), Integer.MAX_VALUE), startColumn)) {
            block9: for (IColumn hint : hintsPage.getSortedColumns()) {
                RowMutation rm;
                startColumn = hint.name();
                for (IColumn subColumn : hint.getSubColumns()) {
                    if (!ByteBufferUtil.string(subColumn.name()).contains(SEPARATOR_08)) continue;
                    logger_.debug("0.8-style hint found.  This should have been taken care of by purgeIncompatibleHints");
                    HintedHandOffManager.deleteHint(tokenBytes, hint.name(), hint.maxTimestamp());
                    continue block9;
                }
                IColumn versionColumn = hint.getSubColumn(ByteBufferUtil.bytes("version"));
                IColumn tableColumn = hint.getSubColumn(ByteBufferUtil.bytes("table"));
                IColumn keyColumn = hint.getSubColumn(ByteBufferUtil.bytes("key"));
                IColumn mutationColumn = hint.getSubColumn(ByteBufferUtil.bytes("mutation"));
                assert (versionColumn != null);
                assert (tableColumn != null);
                assert (keyColumn != null);
                assert (mutationColumn != null);
                DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(mutationColumn.value()));
                try {
                    rm = RowMutation.serializer().deserialize(in, ByteBufferUtil.toInt(versionColumn.value()));
                }
                catch (UnknownColumnFamilyException e) {
                    logger_.debug("Skipping delivery of hint for deleted columnfamily", (Throwable)e);
                    rm = null;
                }
                try {
                    if (rm != null) {
                        HintedHandOffManager.sendMutation(endpoint, rm);
                        ++rowsReplayed;
                    }
                    HintedHandOffManager.deleteHint(tokenBytes, hint.name(), hint.maxTimestamp());
                }
                catch (TimeoutException e) {
                    logger_.info(String.format("Timed out replaying hints to %s; aborting further deliveries", endpoint));
                    break block8;
                }
            }
        }
        if (rowsReplayed > 0) {
            try {
                this.compact().get();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        logger_.info(String.format("Finished hinted handoff of %s rows to endpoint %s", rowsReplayed, endpoint));
    }

    private void scheduleAllDeliveries() {
        if (logger_.isDebugEnabled()) {
            logger_.debug("Started scheduleAllDeliveries");
        }
        ColumnFamilyStore hintStore = Table.open("system").getColumnFamilyStore(HINTS_CF);
        IPartitioner p = StorageService.getPartitioner();
        Range range = new Range((Token)p.getMinimumToken(), (Token)p.getMinimumToken(), p);
        NamesQueryFilter filter = new NamesQueryFilter((SortedSet<ByteBuffer>)ImmutableSortedSet.of());
        List<Row> rows = hintStore.getRangeSlice(null, range, Integer.MAX_VALUE, filter);
        for (Row row : rows) {
            Token token = StorageService.getPartitioner().getTokenFactory().fromByteArray(row.key.key);
            InetAddress target = StorageService.instance.getTokenMetadata().getEndpoint(token);
            if (target == null) continue;
            this.scheduleHintDelivery(target);
        }
        if (logger_.isDebugEnabled()) {
            logger_.debug("Finished scheduleAllDeliveries");
        }
    }

    public void scheduleHintDelivery(final InetAddress to) {
        logger_.debug("deliverHints to {}", (Object)to);
        if (!this.queuedDeliveries.add((Object)to)) {
            return;
        }
        WrappedRunnable r = new WrappedRunnable(){

            @Override
            public void runMayThrow() throws Exception {
                HintedHandOffManager.this.deliverHintsToEndpoint(to);
            }
        };
        this.executor_.execute(r);
    }

    @Override
    public void scheduleHintDelivery(String to) throws UnknownHostException {
        this.scheduleHintDelivery(InetAddress.getByName(to));
    }

    @Override
    public List<String> listEndpointsPendingHints() {
        List<Row> rows = this.getHintsSlice(1);
        LinkedList<String> result = new LinkedList<String>();
        for (Row r : rows) {
            if (r.cf == null) continue;
            result.addFirst(new String(r.key.key.array()));
        }
        return result;
    }

    @Override
    public Map<String, Integer> countPendingHints() {
        List<Row> rows = this.getHintsSlice(Integer.MAX_VALUE);
        HashMap<String, Integer> result = new HashMap<String, Integer>();
        for (Row r : rows) {
            if (r.cf == null) continue;
            result.put(new String(r.key.key.array()), r.cf.getColumnCount());
        }
        return result;
    }

    private List<Row> getHintsSlice(int column_count) {
        List<Row> rows;
        ColumnParent parent = new ColumnParent(HINTS_CF);
        SlicePredicate predicate = new SlicePredicate();
        SliceRange sliceRange = new SliceRange();
        sliceRange.setStart(new byte[0]).setFinish(new byte[0]);
        sliceRange.setCount(column_count);
        predicate.setSlice_range(sliceRange);
        IPartitioner partitioner = StorageService.getPartitioner();
        ByteBuffer empty = ByteBufferUtil.EMPTY_BYTE_BUFFER;
        Range range = new Range((Token)partitioner.getToken(empty), (Token)partitioner.getToken(empty));
        try {
            rows = StorageProxy.getRangeSlice(new RangeSliceCommand("system", parent, predicate, range, 65536), ConsistencyLevel.ONE);
        }
        catch (Exception e) {
            logger_.info("HintsCF getEPPendingHints timed out.");
            throw new RuntimeException(e);
        }
        return rows;
    }
}

