/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.db;

import com.google.common.collect.ImmutableSortedSet;
import com.google.common.util.concurrent.RateLimiter;
import java.io.DataInputStream;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.HintedHandOffManagerMBean;
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.RangeSliceCommand;
import org.apache.cassandra.db.Row;
import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.db.RowPosition;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.UnknownColumnFamilyException;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.filter.NamesQueryFilter;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.filter.SliceQueryFilter;
import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.DigestMismatchException;
import org.apache.cassandra.service.IWriteResponseHandler;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.WriteResponseHandler;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
import org.apache.cassandra.utils.WrappedRunnable;
import org.cliffc.high_scale_lib.NonBlockingHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HintedHandOffManager
implements HintedHandOffManagerMBean {
    public static final HintedHandOffManager instance = new HintedHandOffManager();
    private static final Logger logger = LoggerFactory.getLogger(HintedHandOffManager.class);
    private static final int PAGE_SIZE = 128;
    private static final int LARGE_NUMBER = 65536;
    static final CompositeType comparator = CompositeType.getInstance(Arrays.asList(UUIDType.instance, Int32Type.instance));
    private final NonBlockingHashSet<InetAddress> queuedDeliveries = new NonBlockingHashSet();
    private final ThreadPoolExecutor executor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getMaxHintsThread(), Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("HintedHandoff", 1), "HintedHandoff");

    public void start() {
        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
        try {
            mbs.registerMBean(this, new ObjectName("org.apache.cassandra.db:type=HintedHandoffManager"));
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        logger.debug("Created HHOM instance, registered MBean.");
        Runnable runnable = new Runnable(){

            @Override
            public void run() {
                HintedHandOffManager.this.scheduleAllDeliveries();
            }
        };
        StorageService.optionalTasks.scheduleWithFixedDelay(runnable, 10L, 10L, TimeUnit.MINUTES);
    }

    private static void sendMutation(InetAddress endpoint, MessageOut<?> message) throws WriteTimeoutException {
        IWriteResponseHandler responseHandler = WriteResponseHandler.create(endpoint);
        MessagingService.instance().sendRR(message, endpoint, responseHandler);
        responseHandler.get();
    }

    private static void deleteHint(ByteBuffer tokenBytes, ByteBuffer columnName, long timestamp) throws IOException {
        RowMutation rm = new RowMutation("system", tokenBytes);
        rm.delete(new QueryPath("hints", null, columnName), timestamp);
        rm.applyUnsafe();
    }

    @Override
    public void deleteHintsForEndpoint(String ipOrHostname) {
        try {
            InetAddress endpoint = InetAddress.getByName(ipOrHostname);
            this.deleteHintsForEndpoint(endpoint);
        }
        catch (UnknownHostException e) {
            logger.warn("Unable to find " + ipOrHostname + ", not a hostname or ipaddr of a node?:");
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

    public void deleteHintsForEndpoint(final InetAddress endpoint) {
        if (!StorageService.instance.getTokenMetadata().isMember(endpoint)) {
            return;
        }
        UUID hostId = StorageService.instance.getTokenMetadata().getHostId(endpoint);
        ByteBuffer hostIdBytes = ByteBuffer.wrap(UUIDGen.decompose(hostId));
        final RowMutation rm = new RowMutation("system", hostIdBytes);
        rm.delete(new QueryPath("hints"), System.currentTimeMillis());
        Runnable runnable = new Runnable(){

            @Override
            public void run() {
                try {
                    logger.info("Deleting any stored hints for " + endpoint);
                    rm.apply();
                    HintedHandOffManager.this.compact();
                }
                catch (Exception e) {
                    logger.warn("Could not delete hints for " + endpoint + ": " + e);
                }
            }
        };
        StorageService.optionalTasks.execute(runnable);
    }

    private Future<?> compact() throws ExecutionException, InterruptedException {
        ColumnFamilyStore hintStore = Table.open("system").getColumnFamilyStore("hints");
        hintStore.forceBlockingFlush();
        ArrayList<Descriptor> descriptors = new ArrayList<Descriptor>();
        for (SSTableReader sstable : hintStore.getSSTables()) {
            descriptors.add(sstable.descriptor);
        }
        return CompactionManager.instance.submitUserDefined(hintStore, descriptors, Integer.MAX_VALUE);
    }

    private static boolean pagingFinished(ColumnFamily hintColumnFamily, ByteBuffer startColumn) {
        return hintColumnFamily == null || hintColumnFamily.getSortedColumns().size() == 1 && hintColumnFamily.getColumn(startColumn) != null;
    }

    private int waitForSchemaAgreement(InetAddress endpoint) throws TimeoutException {
        Gossiper gossiper = Gossiper.instance;
        int waited = 0;
        while (gossiper.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA) == null) {
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                throw new AssertionError((Object)e);
            }
            if ((waited += 1000) <= 2 * StorageService.RING_DELAY) continue;
            throw new TimeoutException("Didin't receive gossiped schema from " + endpoint + " in " + 2 * StorageService.RING_DELAY + "ms");
        }
        waited = 0;
        while (!gossiper.getEndpointStateForEndpoint((InetAddress)endpoint).getApplicationState((ApplicationState)ApplicationState.SCHEMA).value.equals(gossiper.getEndpointStateForEndpoint((InetAddress)FBUtilities.getBroadcastAddress()).getApplicationState((ApplicationState)ApplicationState.SCHEMA).value)) {
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                throw new AssertionError((Object)e);
            }
            if ((waited += 1000) <= 2 * StorageService.RING_DELAY) continue;
            throw new TimeoutException("Could not reach schema agreement with " + endpoint + " in " + 2 * StorageService.RING_DELAY + "ms");
        }
        logger.debug("schema for {} matches local schema", (Object)endpoint);
        return waited;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void deliverHintsToEndpoint(InetAddress endpoint) throws IOException, DigestMismatchException, InvalidRequestException, InterruptedException {
        try {
            this.deliverHintsToEndpointInternal(endpoint);
        }
        finally {
            this.queuedDeliveries.remove((Object)endpoint);
        }
    }

    private void deliverHintsToEndpointInternal(InetAddress endpoint) throws IOException, DigestMismatchException, InvalidRequestException, InterruptedException {
        QueryFilter filter;
        ColumnFamily hintsPage;
        int throttleInKB;
        ColumnFamilyStore hintStore = Table.open("system").getColumnFamilyStore("hints");
        if (hintStore.isEmpty()) {
            return;
        }
        logger.debug("Checking remote({}) schema before delivering hints", (Object)endpoint);
        try {
            this.waitForSchemaAgreement(endpoint);
        }
        catch (TimeoutException e) {
            return;
        }
        if (!FailureDetector.instance.isAlive(endpoint)) {
            logger.debug("Endpoint {} died before hint delivery, aborting", (Object)endpoint);
            return;
        }
        UUID hostId = StorageService.instance.getTokenMetadata().getHostId(endpoint);
        logger.info("Started hinted handoff for host: {} with IP: {}", (Object)hostId, (Object)endpoint);
        ByteBuffer hostIdBytes = ByteBuffer.wrap(UUIDGen.decompose(hostId));
        DecoratedKey epkey = StorageService.getPartitioner().decorateKey(hostIdBytes);
        int rowsReplayed = 0;
        ByteBuffer startColumn = ByteBufferUtil.EMPTY_BYTE_BUFFER;
        int pageSize = 128;
        if (hintStore.getMeanColumns() > 0) {
            int averageColumnSize = (int)(hintStore.getMeanRowSize() / (long)hintStore.getMeanColumns());
            pageSize = Math.min(128, DatabaseDescriptor.getInMemoryCompactionLimit() / averageColumnSize);
            pageSize = Math.max(2, pageSize);
            logger.debug("average hinted-row column size is {}; using pageSize of {}", (Object)averageColumnSize, (Object)pageSize);
        }
        RateLimiter rateLimiter = RateLimiter.create((double)((throttleInKB = DatabaseDescriptor.getHintedHandoffThrottleInKB()) == 0 ? Double.MAX_VALUE : (double)(throttleInKB * 1024)));
        block8: while (!HintedHandOffManager.pagingFinished(hintsPage = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter = QueryFilter.getSliceFilter(epkey, new QueryPath("hints"), startColumn, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, pageSize)), (int)(System.currentTimeMillis() / 1000L)), startColumn)) {
            for (IColumn hint : hintsPage.getSortedColumns()) {
                RowMutation rm;
                if (!hint.isLive()) continue;
                startColumn = hint.name();
                ByteBuffer[] components = comparator.split(hint.name());
                int version = Int32Type.instance.compose(components[1]);
                DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(hint.value()));
                try {
                    rm = RowMutation.serializer.deserialize(in, version);
                }
                catch (UnknownColumnFamilyException e) {
                    logger.debug("Skipping delivery of hint for deleted columnfamily", (Throwable)e);
                    rm = null;
                }
                try {
                    if (rm != null) {
                        MessageOut<RowMutation> message = rm.createMessage();
                        rateLimiter.acquire(message.serializedSize(5));
                        HintedHandOffManager.sendMutation(endpoint, message);
                        ++rowsReplayed;
                    }
                    HintedHandOffManager.deleteHint(hostIdBytes, hint.name(), hint.maxTimestamp());
                }
                catch (WriteTimeoutException e) {
                    logger.info(String.format("Timed out replaying hints to %s; aborting further deliveries", endpoint));
                    break block8;
                }
            }
        }
        if (rowsReplayed > 0) {
            try {
                this.compact().get();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        logger.info(String.format("Finished hinted handoff of %s rows to endpoint %s", rowsReplayed, endpoint));
    }

    private void scheduleAllDeliveries() {
        if (logger.isDebugEnabled()) {
            logger.debug("Started scheduleAllDeliveries");
        }
        ColumnFamilyStore hintStore = Table.open("system").getColumnFamilyStore("hints");
        IPartitioner p = StorageService.getPartitioner();
        Token.KeyBound minPos = ((Token)p.getMinimumToken()).minKeyBound();
        Range<RowPosition> range = new Range<RowPosition>(minPos, minPos, p);
        NamesQueryFilter filter = new NamesQueryFilter((SortedSet<ByteBuffer>)ImmutableSortedSet.of());
        List<Row> rows = hintStore.getRangeSlice(null, range, Integer.MAX_VALUE, filter, null);
        for (Row row : rows) {
            UUID hostId = UUIDGen.getUUID(row.key.key);
            InetAddress target = StorageService.instance.getTokenMetadata().getEndpointForHostId(hostId);
            if (target == null) continue;
            this.scheduleHintDelivery(target);
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Finished scheduleAllDeliveries");
        }
    }

    public void scheduleHintDelivery(final InetAddress to) {
        if (this.queuedDeliveries.contains((Object)to) || !this.queuedDeliveries.add((Object)to)) {
            return;
        }
        logger.debug("Scheduling delivery of Hints to {}", (Object)to);
        WrappedRunnable r = new WrappedRunnable(){

            @Override
            public void runMayThrow() throws Exception {
                HintedHandOffManager.this.deliverHintsToEndpoint(to);
            }
        };
        this.executor.execute(r);
    }

    @Override
    public void scheduleHintDelivery(String to) throws UnknownHostException {
        this.scheduleHintDelivery(InetAddress.getByName(to));
    }

    @Override
    public List<String> listEndpointsPendingHints() {
        List<Row> rows = this.getHintsSlice(1);
        LinkedList<String> result = new LinkedList<String>();
        for (Row r : rows) {
            if (r.cf == null) continue;
            result.addFirst(new String(r.key.key.array()));
        }
        return result;
    }

    @Override
    public Map<String, Integer> countPendingHints() {
        List<Row> rows = this.getHintsSlice(Integer.MAX_VALUE);
        HashMap<String, Integer> result = new HashMap<String, Integer>();
        Token.TokenFactory tokenFactory = StorageService.getPartitioner().getTokenFactory();
        for (Row r : rows) {
            int count;
            if (r.cf == null || (count = r.cf.getColumnCount()) <= 0) continue;
            result.put(tokenFactory.toString(r.key.token), count);
        }
        return result;
    }

    private List<Row> getHintsSlice(int columnCount) {
        List<Row> rows;
        ColumnParent parent = new ColumnParent("hints");
        SliceQueryFilter predicate = new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, columnCount);
        IPartitioner partitioner = StorageService.getPartitioner();
        Token.KeyBound minPos = ((Token)partitioner.getMinimumToken()).minKeyBound();
        Range<RowPosition> range = new Range<RowPosition>(minPos, minPos);
        try {
            rows = StorageProxy.getRangeSlice(new RangeSliceCommand("system", parent, predicate, range, null, 65536), ConsistencyLevel.ONE);
        }
        catch (Exception e) {
            logger.info("HintsCF getEPPendingHints timed out.");
            throw new RuntimeException(e);
        }
        return rows;
    }
}

