/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.service;

import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.RangeCommand;
import org.apache.cassandra.db.RangeReply;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.db.Row;
import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.net.IAsyncResult;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.DigestMismatchException;
import org.apache.cassandra.service.InvalidRequestException;
import org.apache.cassandra.service.QuorumResponseHandler;
import org.apache.cassandra.service.ReadResponseResolver;
import org.apache.cassandra.service.StorageProxyMBean;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.UnavailableException;
import org.apache.cassandra.service.WriteResponseResolver;
import org.apache.cassandra.utils.TimedStatsDeque;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;

public class StorageProxy
implements StorageProxyMBean {
    private static Logger logger = Logger.getLogger(StorageProxy.class);
    private static TimedStatsDeque readStats = new TimedStatsDeque(60000L);
    private static TimedStatsDeque rangeStats = new TimedStatsDeque(60000L);
    private static TimedStatsDeque writeStats = new TimedStatsDeque(60000L);

    private StorageProxy() {
    }

    private static Map<EndPoint, Message> createWriteMessages(RowMutation rm, Map<EndPoint, EndPoint> endpointMap) throws IOException {
        HashMap<EndPoint, Message> messageMap = new HashMap<EndPoint, Message>();
        Message message = rm.makeRowMutationMessage();
        for (Map.Entry<EndPoint, EndPoint> entry : endpointMap.entrySet()) {
            EndPoint hint;
            EndPoint target = entry.getKey();
            if (!target.equals(hint = entry.getValue())) {
                Message hintedMessage = rm.makeRowMutationMessage();
                hintedMessage.addHeader("HINT", EndPoint.toBytes(hint));
                if (logger.isDebugEnabled()) {
                    logger.debug((Object)("Sending the hint of " + hint.getHost() + " to " + target.getHost()));
                }
                messageMap.put(target, hintedMessage);
                continue;
            }
            messageMap.put(target, message);
        }
        return messageMap;
    }

    public static void insert(RowMutation rm) {
        long startTime = System.currentTimeMillis();
        try {
            Map<EndPoint, EndPoint> endpointMap = StorageService.instance().getHintedStorageEndpointMap(rm.key());
            Map<EndPoint, Message> messageMap = StorageProxy.createWriteMessages(rm, endpointMap);
            for (Map.Entry<EndPoint, Message> entry : messageMap.entrySet()) {
                Message message = entry.getValue();
                EndPoint endpoint = entry.getKey();
                if (logger.isDebugEnabled()) {
                    logger.debug((Object)("insert writing key " + rm.key() + " to " + message.getMessageId() + "@" + endpoint));
                }
                MessagingService.getMessagingInstance().sendOneWay(message, endpoint);
            }
        }
        catch (IOException e) {
            throw new RuntimeException("error inserting key " + rm.key(), e);
        }
        finally {
            writeStats.add(System.currentTimeMillis() - startTime);
        }
    }

    public static void insertBlocking(RowMutation rm, int consistency_level) throws UnavailableException {
        long startTime = System.currentTimeMillis();
        Message message = null;
        try {
            message = rm.makeRowMutationMessage();
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        try {
            Map<EndPoint, EndPoint> endpointMap = StorageService.instance().getHintedStorageEndpointMap(rm.key());
            int blockFor = StorageProxy.determineBlockFor(consistency_level);
            List<EndPoint> primaryNodes = StorageProxy.getUnhintedNodes(endpointMap);
            if (primaryNodes.size() < blockFor) {
                throw new UnavailableException();
            }
            QuorumResponseHandler<Boolean> quorumResponseHandler = new QuorumResponseHandler<Boolean>(blockFor, new WriteResponseResolver());
            if (logger.isDebugEnabled()) {
                logger.debug((Object)("insertBlocking writing key " + rm.key() + " to " + message.getMessageId() + "@[" + StringUtils.join(endpointMap.keySet(), (String)", ") + "]"));
            }
            MessagingService.getMessagingInstance().sendRR(message, primaryNodes.toArray(new EndPoint[primaryNodes.size()]), quorumResponseHandler);
            if (!quorumResponseHandler.get().booleanValue()) {
                throw new UnavailableException();
            }
            if (primaryNodes.size() < endpointMap.size()) {
                for (Map.Entry<EndPoint, EndPoint> e : endpointMap.entrySet()) {
                    if (e.getKey() == e.getValue()) continue;
                    MessagingService.getMessagingInstance().sendOneWay(message, e.getKey());
                }
            }
        }
        catch (Exception e) {
            logger.error((Object)("error writing key " + rm.key()), (Throwable)e);
            throw new UnavailableException();
        }
        finally {
            writeStats.add(System.currentTimeMillis() - startTime);
        }
    }

    private static List<EndPoint> getUnhintedNodes(Map<EndPoint, EndPoint> endpointMap) {
        ArrayList<EndPoint> liveEndPoints = new ArrayList<EndPoint>(endpointMap.size());
        for (Map.Entry<EndPoint, EndPoint> e : endpointMap.entrySet()) {
            if (e.getKey() != e.getValue()) continue;
            liveEndPoints.add(e.getKey());
        }
        return liveEndPoints;
    }

    private static int determineBlockFor(int consistency_level) {
        int blockFor;
        if (consistency_level == 1) {
            blockFor = 1;
        } else if (consistency_level == 2) {
            blockFor = DatabaseDescriptor.getReplicationFactor() / 2 + 1;
        } else if (consistency_level == 3) {
            blockFor = DatabaseDescriptor.getReplicationFactor();
        } else {
            throw new UnsupportedOperationException("invalid consistency level " + consistency_level);
        }
        return blockFor;
    }

    public static void insertBlocking(RowMutation rm) throws UnavailableException {
        StorageProxy.insertBlocking(rm, 2);
    }

    private static List<Row> weakReadRemote(List<ReadCommand> commands) throws IOException, UnavailableException {
        if (logger.isDebugEnabled()) {
            logger.debug((Object)("weakreadlocal reading " + StringUtils.join(commands, (String)", ")));
        }
        ArrayList<Row> rows = new ArrayList<Row>();
        ArrayList<IAsyncResult> iars = new ArrayList<IAsyncResult>();
        int commandIndex = 0;
        for (ReadCommand command : commands) {
            EndPoint endPoint = StorageService.instance().findSuitableEndPoint(command.key);
            Message message = command.makeReadMessage();
            if (logger.isDebugEnabled()) {
                logger.debug((Object)("weakreadremote reading " + command + " from " + message.getMessageId() + "@" + endPoint));
            }
            message.addHeader("READ-REPAIR", "READ-REPAIR".getBytes());
            iars.add(MessagingService.getMessagingInstance().sendRR(message, endPoint));
        }
        for (IAsyncResult iar : iars) {
            byte[] body;
            try {
                body = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
            }
            catch (TimeoutException e) {
                throw new RuntimeException("error reading key " + commands.get((int)commandIndex).key, e);
            }
            DataInputBuffer bufIn = new DataInputBuffer();
            bufIn.reset(body, body.length);
            ReadResponse response = ReadResponse.serializer().deserialize(bufIn);
            if (response.row() != null) {
                rows.add(response.row());
            }
            ++commandIndex;
        }
        return rows;
    }

    public static List<Row> readProtocol(List<ReadCommand> commands, int consistency_level) throws IOException, TimeoutException, InvalidRequestException, UnavailableException {
        long startTime = System.currentTimeMillis();
        ArrayList<Row> rows = new ArrayList();
        if (consistency_level == 1) {
            ArrayList<ReadCommand> localCommands = new ArrayList<ReadCommand>();
            ArrayList<ReadCommand> remoteCommands = new ArrayList<ReadCommand>();
            for (ReadCommand command : commands) {
                EndPoint[] endpoints = StorageService.instance().getReadStorageEndPoints(command.key);
                boolean foundLocal = Arrays.asList(endpoints).contains(StorageService.getLocalStorageEndPoint());
                if (foundLocal && !StorageService.instance().isBootstrapMode()) {
                    localCommands.add(command);
                    continue;
                }
                remoteCommands.add(command);
            }
            if (localCommands.size() > 0) {
                rows.addAll(StorageProxy.weakReadLocal(localCommands));
            }
            if (remoteCommands.size() > 0) {
                rows.addAll(StorageProxy.weakReadRemote(remoteCommands));
            }
        } else {
            assert (consistency_level == 2);
            rows = StorageProxy.strongRead(commands);
        }
        readStats.add(System.currentTimeMillis() - startTime);
        return rows;
    }

    private static List<Row> strongRead(List<ReadCommand> commands) throws IOException, TimeoutException, InvalidRequestException, UnavailableException {
        ArrayList<QuorumResponseHandler<Row>> quorumResponseHandlers = new ArrayList<QuorumResponseHandler<Row>>();
        ArrayList<EndPoint[]> commandEndPoints = new ArrayList<EndPoint[]>();
        ArrayList<Row> rows = new ArrayList<Row>();
        int commandIndex = 0;
        for (ReadCommand readCommand : commands) {
            assert (!readCommand.isDigestQuery());
            ReadCommand readMessageDigestOnly = readCommand.copy();
            readMessageDigestOnly.setDigestQuery(true);
            Message message = readCommand.makeReadMessage();
            Message messageDigestOnly = readMessageDigestOnly.makeReadMessage();
            ReadResponseResolver readResponseResolver = new ReadResponseResolver();
            QuorumResponseHandler<Row> quorumResponseHandler = new QuorumResponseHandler<Row>(DatabaseDescriptor.getQuorum(), readResponseResolver);
            EndPoint dataPoint = StorageService.instance().findSuitableEndPoint(readCommand.key);
            ArrayList<EndPoint> endpointList = new ArrayList<EndPoint>(Arrays.asList(StorageService.instance().getReadStorageEndPoints(readCommand.key)));
            endpointList.remove(dataPoint);
            EndPoint[] endPoints = new EndPoint[endpointList.size() + 1];
            Message[] messages = new Message[endpointList.size() + 1];
            endPoints[0] = dataPoint;
            messages[0] = message;
            if (logger.isDebugEnabled()) {
                logger.debug((Object)("strongread reading data for " + readCommand + " from " + message.getMessageId() + "@" + dataPoint));
            }
            for (int i = 1; i < endPoints.length; ++i) {
                EndPoint digestPoint;
                endPoints[i] = digestPoint = (EndPoint)endpointList.get(i - 1);
                messages[i] = messageDigestOnly;
                if (!logger.isDebugEnabled()) continue;
                logger.debug((Object)("strongread reading digest for " + readCommand + " from " + messageDigestOnly.getMessageId() + "@" + digestPoint));
            }
            MessagingService.getMessagingInstance().sendRR(messages, endPoints, quorumResponseHandler);
            quorumResponseHandlers.add(quorumResponseHandler);
            commandEndPoints.add(endPoints);
        }
        for (QuorumResponseHandler quorumResponseHandler : quorumResponseHandlers) {
            block12: {
                Row row = null;
                ReadCommand command = commands.get(commandIndex);
                try {
                    long startTime2 = System.currentTimeMillis();
                    row = (Row)quorumResponseHandler.get();
                    if (row != null) {
                        rows.add(row);
                    }
                    if (logger.isDebugEnabled()) {
                        logger.debug((Object)("quorumResponseHandler: " + (System.currentTimeMillis() - startTime2) + " ms."));
                    }
                }
                catch (DigestMismatchException ex) {
                    if (!DatabaseDescriptor.getConsistencyCheck()) break block12;
                    ReadResponseResolver readResponseResolverRepair = new ReadResponseResolver();
                    QuorumResponseHandler<Row> quorumResponseHandlerRepair = new QuorumResponseHandler<Row>(DatabaseDescriptor.getQuorum(), readResponseResolverRepair);
                    logger.info((Object)("DigestMismatchException: " + command.key));
                    Message messageRepair = command.makeReadMessage();
                    MessagingService.getMessagingInstance().sendRR(messageRepair, (EndPoint[])commandEndPoints.get(commandIndex), quorumResponseHandlerRepair);
                    try {
                        row = quorumResponseHandlerRepair.get();
                        if (row != null) {
                            rows.add(row);
                        }
                    }
                    catch (DigestMismatchException e) {
                        throw new RuntimeException("digest mismatch reading key " + command.key, e);
                    }
                }
            }
            ++commandIndex;
        }
        return rows;
    }

    private static Map<String, Message[]> constructReplicaMessages(Map<String, ReadCommand[]> readMessages) throws IOException {
        HashMap<String, Message[]> messages = new HashMap<String, Message[]>();
        Set<String> keys = readMessages.keySet();
        for (String key : keys) {
            Message[] msg = new Message[DatabaseDescriptor.getReplicationFactor()];
            ReadCommand[] readParameters = readMessages.get(key);
            msg[0] = readParameters[0].makeReadMessage();
            for (int i = 1; i < msg.length; ++i) {
                msg[i] = readParameters[1].makeReadMessage();
            }
        }
        return messages;
    }

    private static List<Row> weakReadLocal(List<ReadCommand> commands) throws IOException {
        ArrayList<Row> rows = new ArrayList<Row>();
        for (ReadCommand command : commands) {
            Table table;
            Row row;
            List<EndPoint> endpoints = StorageService.instance().getLiveReadStorageEndPoints(command.key);
            endpoints.remove(StorageService.getLocalStorageEndPoint());
            if (logger.isDebugEnabled()) {
                logger.debug((Object)("weakreadlocal reading " + command));
            }
            if ((row = command.getRow(table = Table.open(command.table))) != null) {
                rows.add(row);
            }
            if (endpoints.size() <= 0 || !DatabaseDescriptor.getConsistencyCheck()) continue;
            StorageService.instance().doConsistencyCheck(row, endpoints, command);
        }
        return rows;
    }

    static List<String> getKeyRange(RangeCommand rawCommand) throws IOException, UnavailableException {
        EndPoint endPoint;
        long startTime = System.currentTimeMillis();
        Comparator<String> comparator = StorageService.getPartitioner().getDecoratedKeyComparator();
        TokenMetadata tokenMetadata = StorageService.instance().getTokenMetadata();
        List<Object> allKeys = new ArrayList();
        RangeCommand command = rawCommand;
        EndPoint startEndpoint = endPoint = StorageService.instance().findSuitableEndPoint(command.startWith);
        EndPoint wrapEndpoint = tokenMetadata.getFirstEndpoint();
        do {
            byte[] responseBody;
            Message message = command.getMessage();
            if (logger.isDebugEnabled()) {
                logger.debug((Object)("reading " + command + " from " + message.getMessageId() + "@" + endPoint));
            }
            IAsyncResult iar = MessagingService.getMessagingInstance().sendRR(message, endPoint);
            try {
                responseBody = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
            }
            catch (TimeoutException e) {
                throw new RuntimeException(e);
            }
            RangeReply rangeReply = RangeReply.read(responseBody);
            List<String> rangeKeys = rangeReply.keys;
            if (rangeKeys.size() > 0) {
                if (allKeys.size() > 0) {
                    if (comparator.compare(rangeKeys.get(rangeKeys.size() - 1), (String)allKeys.get(0)) <= 0) {
                        if (rangeKeys.get(rangeKeys.size() - 1).equals(allKeys.get(0))) {
                            rangeKeys.remove(rangeKeys.size() - 1);
                        }
                        rangeKeys.addAll(allKeys);
                        allKeys = rangeKeys;
                    } else if (comparator.compare((String)allKeys.get(allKeys.size() - 1), rangeKeys.get(0)) <= 0) {
                        if (((String)allKeys.get(allKeys.size() - 1)).equals(rangeKeys.get(0))) {
                            allKeys.remove(allKeys.size() - 1);
                        }
                        allKeys.addAll(rangeKeys);
                    } else {
                        HashSet<Object> keys = new HashSet<Object>(allKeys);
                        keys.addAll(rangeKeys);
                        allKeys = new ArrayList<Object>(keys);
                        Collections.sort(allKeys);
                    }
                } else {
                    allKeys = rangeKeys;
                }
            }
            if (allKeys.size() >= rawCommand.maxResults || rangeReply.rangeCompletedLocally) break;
            int maxResults = (endPoint = tokenMetadata.getNextEndpoint(endPoint)) == wrapEndpoint ? rawCommand.maxResults : rawCommand.maxResults - allKeys.size();
            command = new RangeCommand(command.table, command.columnFamily, command.startWith, command.stopAt, maxResults);
        } while (!endPoint.equals(startEndpoint));
        rangeStats.add(System.currentTimeMillis() - startTime);
        return allKeys.size() > rawCommand.maxResults ? allKeys.subList(0, rawCommand.maxResults) : allKeys;
    }

    @Override
    public double getReadLatency() {
        return readStats.mean();
    }

    @Override
    public double getRangeLatency() {
        return rangeStats.mean();
    }

    @Override
    public double getWriteLatency() {
        return writeStats.mean();
    }

    @Override
    public int getReadOperations() {
        return readStats.size();
    }

    @Override
    public int getRangeOperations() {
        return rangeStats.size();
    }

    @Override
    public int getWriteOperations() {
        return writeStats.size();
    }

    static {
        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
        try {
            mbs.registerMBean(new StorageProxy(), new ObjectName("org.apache.cassandra.service:type=StorageProxy"));
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

