/*
 * Decompiled with CFR 0.152.
 */
package com.vesoft.nebula.client.storage;

import com.facebook.thrift.TException;
import com.facebook.thrift.protocol.TCompactProtocol;
import com.facebook.thrift.transport.TSocket;
import com.facebook.thrift.transport.TTransport;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.net.HostAndPort;
import com.vesoft.nebula.AbstractClient;
import com.vesoft.nebula.HostAddr;
import com.vesoft.nebula.Pair;
import com.vesoft.nebula.client.meta.MetaClientImpl;
import com.vesoft.nebula.client.storage.StorageClient;
import com.vesoft.nebula.meta.EdgeItem;
import com.vesoft.nebula.meta.TagItem;
import com.vesoft.nebula.storage.EntryId;
import com.vesoft.nebula.storage.ExecResponse;
import com.vesoft.nebula.storage.GeneralResponse;
import com.vesoft.nebula.storage.GetRequest;
import com.vesoft.nebula.storage.PropDef;
import com.vesoft.nebula.storage.PutRequest;
import com.vesoft.nebula.storage.RemoveRequest;
import com.vesoft.nebula.storage.ResultCode;
import com.vesoft.nebula.storage.ScanEdgeRequest;
import com.vesoft.nebula.storage.ScanEdgeResponse;
import com.vesoft.nebula.storage.ScanVertexRequest;
import com.vesoft.nebula.storage.ScanVertexResponse;
import com.vesoft.nebula.storage.StorageService;
import com.vesoft.nebula.utils.AddressUtil;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import org.apache.commons.codec.digest.MurmurHash2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StorageClientImpl
extends AbstractClient
implements StorageClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(StorageClientImpl.class);
    private Map<HostAndPort, StorageService.Client> clients = new ConcurrentHashMap<HostAndPort, StorageService.Client>();
    private MetaClientImpl metaClient;
    private Map<String, Map<Integer, HostAndPort>> leaders = Maps.newHashMap();
    private Map<String, Map<Integer, List<HostAndPort>>> partsAlloc;
    private ExecutorService threadPool;

    public StorageClientImpl(MetaClientImpl client) {
        this.metaClient = client;
        this.partsAlloc = this.metaClient.getPartsAllocFromCache();
    }

    @Override
    public int doConnect(List<HostAndPort> addresses) throws TException {
        for (HostAndPort address : addresses) {
            StorageService.Client client = this.doConnect(address);
            this.clients.put(address, client);
        }
        return 0;
    }

    private StorageService.Client doConnect(HostAndPort address) throws TException {
        TSocket transport = new TSocket(address.getHostText(), address.getPort(), this.timeout);
        ((TTransport)transport).open();
        TCompactProtocol protocol = new TCompactProtocol(transport);
        return new StorageService.Client(protocol);
    }

    @Override
    public boolean put(String spaceName, String key, String value) {
        int spaceID = this.metaClient.getSpaceIdFromCache(spaceName);
        int part = this.keyToPartId(spaceName, key);
        HostAndPort leader = this.getLeader(spaceName, part);
        if (leader == null) {
            return false;
        }
        PutRequest request = new PutRequest();
        request.setSpace_id(spaceID);
        HashMap parts = Maps.newHashMap();
        ArrayList pairs = Lists.newArrayList((Object[])new Pair[]{new Pair(key, value)});
        parts.put(part, pairs);
        request.setParts(parts);
        LOGGER.debug(String.format("Put Request: %s", request.toString()));
        return this.doPut(spaceName, leader, request);
    }

    @Override
    public boolean put(String spaceName, Map<String, String> kvs) {
        int spaceID = this.metaClient.getSpaceIdFromCache(spaceName);
        HashMap groups = Maps.newHashMap();
        for (Map.Entry<String, String> kv : kvs.entrySet()) {
            int n = this.keyToPartId(spaceName, (String)kv.getKey());
            if (!groups.containsKey(n)) {
                groups.put(n, new ArrayList());
            }
            ((List)groups.get(n)).add(new Pair((String)kv.getKey(), (String)kv.getValue()));
        }
        HashMap requests = Maps.newHashMap();
        for (Map.Entry entry : groups.entrySet()) {
            PutRequest request;
            int part = (Integer)entry.getKey();
            HostAndPort leader = this.getLeader(spaceName, part);
            if (!requests.containsKey(leader)) {
                request = new PutRequest();
                request.setSpace_id(spaceID);
                HashMap parts = Maps.newHashMap();
                parts.put(part, entry.getValue());
                request.setParts(parts);
                LOGGER.debug(String.format("Put Request: %s", request.toString()));
                requests.put(leader, request);
                continue;
            }
            request = (PutRequest)requests.get(leader);
            if (!request.parts.containsKey(part)) {
                request.parts.put(part, (List<Pair>)entry.getValue());
                continue;
            }
            request.parts.get(part).addAll((Collection)entry.getValue());
        }
        CountDownLatch countDownLatch = new CountDownLatch(groups.size());
        List<Boolean> list = Collections.synchronizedList(new ArrayList(groups.size()));
        for (Map.Entry entry : requests.entrySet()) {
            this.threadPool.submit(() -> {
                if (this.doPut(spaceName, (HostAndPort)entry.getKey(), (PutRequest)entry.getValue())) {
                    responses.add(true);
                } else {
                    responses.add(false);
                }
                countDownLatch.countDown();
            });
        }
        try {
            countDownLatch.await();
        }
        catch (InterruptedException e) {
            LOGGER.error("Put interrupted");
            return false;
        }
        for (Boolean ret : list) {
            if (ret.booleanValue()) continue;
            return false;
        }
        return true;
    }

    private boolean doPut(String space, HostAndPort leader, PutRequest request) {
        StorageService.Client client = this.connect(leader);
        if (Objects.isNull(client)) {
            this.disconnect(leader);
            return false;
        }
        int retry = this.connectionRetry;
        while (retry-- != 0) {
            try {
                ExecResponse response = client.put(request);
                if (!this.isSuccessfully(response)) {
                    this.handleResultCodes(response.result.failed_codes, space, client, leader);
                    continue;
                }
                return true;
            }
            catch (TException e) {
                for (Integer part : request.parts.keySet()) {
                    this.invalidLeader(space, part);
                }
                this.disconnect(leader);
                LOGGER.error(String.format("Put Failed: %s", e.getMessage()));
                return false;
            }
        }
        return false;
    }

    @Override
    public Optional<String> get(String spaceName, String key) {
        int spaceID = this.metaClient.getSpaceIdFromCache(spaceName);
        int part = this.keyToPartId(spaceName, key);
        HostAndPort leader = this.getLeader(spaceName, part);
        if (leader == null) {
            return Optional.absent();
        }
        GetRequest request = new GetRequest();
        request.setSpace_id(spaceID);
        HashMap parts = Maps.newHashMap();
        parts.put(part, Arrays.asList(key));
        request.setParts(parts);
        LOGGER.debug(String.format("Get Request: %s", request.toString()));
        Optional<Map<String, String>> result = this.doGet(spaceName, leader, request);
        if (!result.isPresent() || !((Map)result.get()).containsKey(key)) {
            return Optional.absent();
        }
        return Optional.of(((Map)result.get()).get(key));
    }

    @Override
    public Optional<Map<String, String>> get(String spaceName, List<String> keys) {
        int spaceID = this.metaClient.getSpaceIdFromCache(spaceName);
        HashMap groups = Maps.newHashMap();
        for (String string : keys) {
            int n = this.keyToPartId(spaceName, string);
            if (!groups.containsKey(n)) {
                groups.put(n, new ArrayList());
            }
            ((List)groups.get(n)).add(string);
        }
        HashMap requests = Maps.newHashMap();
        for (Map.Entry entry : groups.entrySet()) {
            GetRequest request;
            int part = (Integer)entry.getKey();
            HostAndPort leader = this.getLeader(spaceName, part);
            if (!requests.containsKey(leader)) {
                request = new GetRequest();
                request.setSpace_id(spaceID);
                HashMap parts = Maps.newHashMap();
                parts.put(part, entry.getValue());
                request.setParts(parts);
                LOGGER.debug(String.format("Get Request: %s", request.toString()));
                requests.put(leader, request);
                continue;
            }
            request = (GetRequest)requests.get(leader);
            if (!request.parts.containsKey(part)) {
                request.parts.put(part, (List<String>)entry.getValue());
                continue;
            }
            request.parts.get(part).addAll((Collection)entry.getValue());
        }
        CountDownLatch countDownLatch = new CountDownLatch(groups.size());
        List<Optional> list = Collections.synchronizedList(new ArrayList(groups.size()));
        for (Map.Entry entry : requests.entrySet()) {
            this.threadPool.submit(() -> {
                responses.add(this.doGet(spaceName, (HostAndPort)entry.getKey(), (GetRequest)entry.getValue()));
                countDownLatch.countDown();
            });
        }
        try {
            countDownLatch.await();
        }
        catch (InterruptedException e) {
            LOGGER.error("Put interrupted");
            return Optional.absent();
        }
        HashMap result = Maps.newHashMap();
        for (Optional response : list) {
            if (!response.isPresent()) continue;
            result.putAll((Map)response.get());
        }
        return Optional.of((Object)result);
    }

    private Optional<Map<String, String>> doGet(String space, HostAndPort leader, GetRequest request) {
        StorageService.Client client = this.connect(leader);
        if (Objects.isNull(client)) {
            this.disconnect(leader);
            return Optional.absent();
        }
        int retry = this.connectionRetry;
        while (retry-- != 0) {
            try {
                GeneralResponse response = client.get(request);
                if (!this.isSuccessfully(response)) {
                    this.handleResultCodes(response.result.failed_codes, space, client, leader);
                    continue;
                }
                return Optional.of(response.values);
            }
            catch (TException e) {
                for (Integer part : request.parts.keySet()) {
                    this.invalidLeader(space, part);
                }
                this.disconnect(leader);
                LOGGER.error(String.format("Get Failed: %s", e.getMessage()));
                return Optional.absent();
            }
        }
        return Optional.absent();
    }

    @Override
    public boolean remove(String spaceName, String key) {
        int spaceID = this.metaClient.getSpaceIdFromCache(spaceName);
        int part = this.keyToPartId(spaceName, key);
        HostAndPort leader = this.getLeader(spaceName, part);
        if (leader == null) {
            return false;
        }
        RemoveRequest request = new RemoveRequest();
        request.setSpace_id(spaceID);
        HashMap parts = Maps.newHashMap();
        parts.put(part, Arrays.asList(key));
        request.setParts(parts);
        LOGGER.debug(String.format("Remove Request: %s", request.toString()));
        return this.doRemove(spaceName, leader, request);
    }

    @Override
    public boolean remove(String spaceName, List<String> keys) {
        HashMap groups = Maps.newHashMap();
        int spaceID = this.metaClient.getSpaceIdFromCache(spaceName);
        for (String string : keys) {
            int n = this.keyToPartId(spaceName, string);
            if (!groups.containsKey(n)) {
                groups.put(n, new ArrayList());
            }
            ((List)groups.get(n)).add(string);
        }
        HashMap requests = Maps.newHashMap();
        for (Map.Entry entry : groups.entrySet()) {
            RemoveRequest request;
            int part = (Integer)entry.getKey();
            HostAndPort leader = this.getLeader(spaceName, part);
            if (!requests.containsKey(leader)) {
                request = new RemoveRequest();
                request.setSpace_id(spaceID);
                HashMap parts = Maps.newHashMap();
                parts.put(part, entry.getValue());
                request.setParts(parts);
                LOGGER.debug(String.format("Put Request: %s", request.toString()));
                requests.put(leader, request);
                continue;
            }
            request = (RemoveRequest)requests.get(leader);
            if (!request.parts.containsKey(part)) {
                request.parts.put(part, (List<String>)entry.getValue());
                continue;
            }
            request.parts.get(part).addAll((Collection)entry.getValue());
        }
        CountDownLatch countDownLatch = new CountDownLatch(groups.size());
        List<Boolean> list = Collections.synchronizedList(new ArrayList(groups.size()));
        for (Map.Entry entry : requests.entrySet()) {
            this.threadPool.submit(() -> {
                if (this.doRemove(spaceName, (HostAndPort)entry.getKey(), (RemoveRequest)entry.getValue())) {
                    responses.add(true);
                } else {
                    responses.add(false);
                }
                countDownLatch.countDown();
            });
        }
        try {
            countDownLatch.await();
        }
        catch (InterruptedException e) {
            LOGGER.error("Put interrupted");
            return false;
        }
        for (Boolean ret : list) {
            if (ret.booleanValue()) continue;
            return false;
        }
        return true;
    }

    private boolean doRemove(String spaceName, HostAndPort leader, RemoveRequest request) {
        StorageService.Client client = this.connect(leader);
        if (Objects.isNull(client)) {
            this.disconnect(leader);
            return false;
        }
        int retry = this.connectionRetry;
        while (retry-- != 0) {
            try {
                ExecResponse response = client.remove(request);
                if (!this.isSuccessfully(response)) {
                    this.handleResultCodes(response.result.failed_codes, spaceName, client, leader);
                    continue;
                }
                return true;
            }
            catch (TException e) {
                for (Integer part : request.parts.keySet()) {
                    this.invalidLeader(spaceName, part);
                }
                this.disconnect(leader);
                LOGGER.error(String.format("Remove Failed: %s", e.getMessage()));
                return false;
            }
        }
        return false;
    }

    @Override
    public Iterator<ScanEdgeResponse> scanEdge(String space, Map<String, List<String>> returnCols) throws IOException {
        return this.scanEdge(space, returnCols, false, 1000, 0L, Long.MAX_VALUE);
    }

    @Override
    public Iterator<ScanEdgeResponse> scanEdge(String space, Map<String, List<String>> returnCols, boolean allCols, int limit, long startTime, long endTime) throws IOException {
        Set<Integer> partIds = this.metaClient.getPartsAllocFromCache().get(space).keySet();
        Iterator<Integer> iterator = partIds.iterator();
        if (!iterator.hasNext()) {
            throw new IOException("No valid part in space " + space);
        }
        return this.scanEdge(space, iterator, returnCols, allCols, limit, startTime, endTime);
    }

    @Override
    public Iterator<ScanEdgeResponse> scanEdge(String space, int part, Map<String, List<String>> returnCols) throws IOException {
        return this.scanEdge(space, part, returnCols, false, 1000, 0L, Long.MAX_VALUE);
    }

    @Override
    public Iterator<ScanEdgeResponse> scanEdge(String space, int part, Map<String, List<String>> returnCols, boolean allCols, int limit, long startTime, long endTime) throws IOException {
        HostAndPort leader = this.getLeader(space, part);
        if (Objects.isNull(leader)) {
            throw new IllegalArgumentException("Part " + part + " not found in space " + space);
        }
        int spaceId = this.metaClient.getSpaceIdFromCache(space);
        ScanEdgeRequest request = new ScanEdgeRequest();
        Map<Integer, List<PropDef>> columns = this.getEdgeReturnCols(space, returnCols);
        request.setSpace_id(spaceId).setPart_id(part).setReturn_columns(columns).setAll_columns(allCols).setLimit(limit).setStart_time(startTime).setEnd_time(endTime);
        return this.doScanEdge(space, leader, request);
    }

    private Iterator<ScanEdgeResponse> scanEdge(final String space, final Iterator<Integer> parts, final Map<String, List<String>> returnCols, final boolean allCols, final int limit, final long startTime, final long endTime) throws IOException {
        return new Iterator<ScanEdgeResponse>(){
            Iterator<ScanEdgeResponse> iterator;

            @Override
            public boolean hasNext() {
                return parts.hasNext() || this.iterator.hasNext();
            }

            @Override
            public ScanEdgeResponse next() {
                if (Objects.isNull(this.iterator) || !this.iterator.hasNext()) {
                    int part = (Integer)parts.next();
                    HostAndPort leader = StorageClientImpl.this.getLeader(space, part);
                    if (Objects.isNull(leader)) {
                        throw new IllegalArgumentException("Part " + part + " not found in space " + space);
                    }
                    int spaceId = StorageClientImpl.this.metaClient.getSpaceIdFromCache(space);
                    if (spaceId == -1) {
                        throw new IllegalArgumentException("Space " + space + " not found");
                    }
                    ScanEdgeRequest request = new ScanEdgeRequest();
                    Map columns = StorageClientImpl.this.getEdgeReturnCols(space, returnCols);
                    request.setSpace_id(spaceId).setPart_id(part).setReturn_columns(columns).setAll_columns(allCols).setLimit(limit).setStart_time(startTime).setEnd_time(endTime);
                    try {
                        this.iterator = StorageClientImpl.this.doScanEdge(space, leader, request);
                    }
                    catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                return this.iterator.next();
            }
        };
    }

    private Map<Integer, List<PropDef>> getEdgeReturnCols(String space, Map<String, List<String>> returnCols) {
        HashMap<Integer, List<PropDef>> columns = new HashMap<Integer, List<PropDef>>();
        for (Map.Entry<String, List<String>> entry : returnCols.entrySet()) {
            String edgeName = entry.getKey();
            List<String> propNames = entry.getValue();
            EdgeItem edgeItem = this.metaClient.getEdgeItemFromCache(space, edgeName);
            if (Objects.isNull(edgeItem)) {
                throw new IllegalArgumentException("Edge " + edgeName + " not found in space " + space);
            }
            int edgeType = edgeItem.edge_type;
            EntryId id = EntryId.edge_type(edgeType);
            ArrayList<PropDef> propDefs = new ArrayList<PropDef>();
            for (String propName : propNames) {
                PropDef propdef = new PropDef();
                propdef.setOwner(3).setId(id).setName(propName);
                propDefs.add(propdef);
            }
            columns.put(edgeType, propDefs);
        }
        return columns;
    }

    private Iterator<ScanEdgeResponse> doScanEdge(final String space, final HostAndPort leader, final ScanEdgeRequest request) throws IOException {
        final StorageService.Client client = this.connect(leader);
        if (Objects.isNull(client)) {
            this.disconnect(leader);
            throw new IOException("Failed to connect " + leader);
        }
        return new Iterator<ScanEdgeResponse>(){
            private byte[] cursor = null;
            private boolean haveNext = true;

            @Override
            public boolean hasNext() {
                return this.haveNext;
            }

            @Override
            public ScanEdgeResponse next() {
                request.setCursor(this.cursor);
                int retry = StorageClientImpl.this.executionRetry;
                if (retry-- != 0) {
                    ScanEdgeResponse response;
                    try {
                        response = client.scanEdge(request);
                        this.cursor = response.next_cursor;
                        this.haveNext = response.has_next;
                    }
                    catch (TException e) {
                        LOGGER.error(e.getMessage());
                        this.haveNext = false;
                        return null;
                    }
                    if (!response.result.failed_codes.isEmpty()) {
                        StorageClientImpl.this.handleResultCodes(response.result.failed_codes, space, client, leader);
                        this.haveNext = false;
                        return null;
                    }
                    return response;
                }
                return null;
            }
        };
    }

    @Override
    public Iterator<ScanVertexResponse> scanVertex(String space, Map<String, List<String>> returnCols) throws IOException {
        return this.scanVertex(space, returnCols, false, 1000, 0L, Long.MAX_VALUE);
    }

    @Override
    public Iterator<ScanVertexResponse> scanVertex(String space, Map<String, List<String>> returnCols, boolean allCols, int limit, long startTime, long endTime) throws IOException {
        Set<Integer> partIds = this.metaClient.getPartsAllocFromCache().get(space).keySet();
        Iterator<Integer> iterator = partIds.iterator();
        if (!iterator.hasNext()) {
            throw new IOException("No valid part in space " + space);
        }
        return this.scanVertex(space, iterator, returnCols, allCols, limit, startTime, endTime);
    }

    @Override
    public Iterator<ScanVertexResponse> scanVertex(String space, int part, Map<String, List<String>> returnCols) throws IOException {
        return this.scanVertex(space, part, returnCols, false, 1000, 0L, Long.MAX_VALUE);
    }

    @Override
    public Iterator<ScanVertexResponse> scanVertex(String space, int part, Map<String, List<String>> returnCols, boolean allCols, int limit, long startTime, long endTime) throws IOException {
        HostAndPort leader = this.getLeader(space, part);
        if (Objects.isNull(leader)) {
            throw new IllegalArgumentException("Part " + part + " not found in space " + space);
        }
        int spaceId = this.metaClient.getSpaceIdFromCache(space);
        ScanVertexRequest request = new ScanVertexRequest();
        Map<Integer, List<PropDef>> columns = this.getVertexReturnCols(space, returnCols);
        request.setSpace_id(spaceId).setPart_id(part).setReturn_columns(columns).setAll_columns(allCols).setLimit(limit).setStart_time(startTime).setEnd_time(endTime);
        return this.doScanVertex(space, leader, request);
    }

    private Iterator<ScanVertexResponse> scanVertex(final String space, final Iterator<Integer> parts, final Map<String, List<String>> returnCols, final boolean allCols, final int limit, final long startTime, final long endTime) throws IOException {
        return new Iterator<ScanVertexResponse>(){
            Iterator<ScanVertexResponse> iterator;

            @Override
            public boolean hasNext() {
                return parts.hasNext() || this.iterator.hasNext();
            }

            @Override
            public ScanVertexResponse next() {
                if (Objects.isNull(this.iterator) || !this.iterator.hasNext()) {
                    int part = (Integer)parts.next();
                    HostAndPort leader = StorageClientImpl.this.getLeader(space, part);
                    if (Objects.isNull(leader)) {
                        throw new IllegalArgumentException("Part " + part + " not found in space " + space);
                    }
                    int spaceId = StorageClientImpl.this.metaClient.getSpaceIdFromCache(space);
                    if (spaceId == -1) {
                        throw new IllegalArgumentException("Space " + space + " not found");
                    }
                    ScanVertexRequest request = new ScanVertexRequest();
                    Map columns = StorageClientImpl.this.getVertexReturnCols(space, returnCols);
                    request.setSpace_id(spaceId).setPart_id(part).setReturn_columns(columns).setAll_columns(allCols).setLimit(limit).setStart_time(startTime).setEnd_time(endTime);
                    try {
                        this.iterator = StorageClientImpl.this.doScanVertex(space, leader, request);
                    }
                    catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                return this.iterator.next();
            }
        };
    }

    private Map<Integer, List<PropDef>> getVertexReturnCols(String space, Map<String, List<String>> returnCols) {
        HashMap<Integer, List<PropDef>> columns = new HashMap<Integer, List<PropDef>>();
        for (Map.Entry<String, List<String>> entry : returnCols.entrySet()) {
            String tagName = entry.getKey();
            List<String> propNames = entry.getValue();
            TagItem tagItem = this.metaClient.getTagItemFromCache(space, tagName);
            if (Objects.isNull(tagItem)) {
                throw new IllegalArgumentException("Tag " + tagName + " not found in space " + space);
            }
            int tagId = tagItem.tag_id;
            EntryId id = EntryId.tag_id(tagId);
            ArrayList<PropDef> propDefs = new ArrayList<PropDef>();
            for (String propName : propNames) {
                PropDef propdef = new PropDef();
                propdef.setOwner(1).setId(id).setName(propName);
                propDefs.add(propdef);
            }
            columns.put(tagId, propDefs);
        }
        return columns;
    }

    private Iterator<ScanVertexResponse> doScanVertex(final String spaceName, final HostAndPort leader, final ScanVertexRequest request) throws IOException {
        final StorageService.Client client = this.connect(leader);
        if (Objects.isNull(client)) {
            this.disconnect(leader);
            throw new IOException("Failed to connect " + leader);
        }
        int spaceID = this.metaClient.getSpaceIdFromCache(spaceName);
        return new Iterator<ScanVertexResponse>(){
            private byte[] cursor = null;
            private boolean haveNext = true;

            @Override
            public boolean hasNext() {
                return this.haveNext;
            }

            @Override
            public ScanVertexResponse next() {
                request.setCursor(this.cursor);
                int retry = StorageClientImpl.this.executionRetry;
                if (retry-- != 0) {
                    ScanVertexResponse response;
                    try {
                        response = client.scanVertex(request);
                        this.cursor = response.next_cursor;
                        this.haveNext = response.has_next;
                    }
                    catch (TException e) {
                        e.printStackTrace();
                        this.haveNext = false;
                        return null;
                    }
                    if (!response.result.failed_codes.isEmpty()) {
                        StorageClientImpl.this.handleResultCodes(response.result.failed_codes, spaceName, client, leader);
                        this.haveNext = false;
                        return null;
                    }
                    return response;
                }
                return null;
            }
        };
    }

    private boolean isSuccessfully(ExecResponse response) {
        return response.result.failed_codes.size() == 0;
    }

    private boolean isSuccessfully(GeneralResponse response) {
        return response.result.failed_codes.size() == 0;
    }

    private void updateLeader(String spaceName, int partId, HostAndPort address) {
        LOGGER.debug("Update leader for space " + spaceName + ", " + partId + " to " + address);
        if (!this.leaders.containsKey(spaceName)) {
            this.leaders.put(spaceName, Maps.newConcurrentMap());
        }
        this.leaders.get(spaceName).put(partId, address);
    }

    private void invalidLeader(String spaceName, int partId) {
        LOGGER.debug("Invalid leader for space " + spaceName + ", " + partId);
        if (!this.leaders.containsKey(spaceName)) {
            this.leaders.put(spaceName, Maps.newConcurrentMap());
        }
        this.leaders.get(spaceName).remove(partId);
    }

    private HostAndPort getLeader(String spaceName, int part) {
        if (!this.leaders.containsKey(spaceName)) {
            this.leaders.put(spaceName, Maps.newConcurrentMap());
        }
        if (this.leaders.get(spaceName).containsKey(part)) {
            return this.leaders.get(spaceName).get(part);
        }
        List<HostAndPort> address = this.metaClient.getPartFromCache(spaceName, part);
        if (address != null) {
            Random random = new Random(System.currentTimeMillis());
            int position = random.nextInt(address.size());
            HostAndPort leader = address.get(position);
            this.leaders.get(spaceName).put(part, leader);
            return leader;
        }
        return null;
    }

    private void handleResultCodes(List<ResultCode> failedCodes, String space, StorageService.Client client, HostAndPort leader) {
        for (ResultCode code : failedCodes) {
            HostAddr addr;
            if (code.getCode() != -11 || (addr = code.getLeader()) == null || addr.getIp() == 0 || addr.getPort() == 0) continue;
            int ip = addr.getIp();
            HostAndPort newLeader = HostAndPort.fromParts((String)AddressUtil.intToIPv4(ip), (int)addr.getPort());
            this.updateLeader(space, code.getPart_id(), newLeader);
            StorageService.Client newClient = this.clients.get(newLeader);
            if (newClient == null) continue;
            client = newClient;
            leader = newLeader;
        }
    }

    private StorageService.Client connect(HostAndPort address) {
        if (!this.clients.containsKey(address)) {
            try {
                StorageService.Client client = this.doConnect(address);
                this.clients.put(address, client);
                return client;
            }
            catch (TException e) {
                LOGGER.error(e.getMessage());
                return null;
            }
        }
        return this.clients.get(address);
    }

    private void disconnect(HostAndPort address) {
        this.clients.remove(address);
    }

    private long hash(String key) {
        return MurmurHash2.hash64((String)key);
    }

    private long hash(long key) {
        ByteBuffer buffer = ByteBuffer.allocate(8);
        buffer.putLong(key);
        return MurmurHash2.hash64((byte[])buffer.array(), (int)8);
    }

    private int keyToPartId(String spaceName, long vertexId) {
        if (!this.partsAlloc.containsKey(spaceName)) {
            LOGGER.error("Invalid part of " + spaceName);
            return -1;
        }
        int partNum = this.partsAlloc.get(spaceName).size();
        if (partNum <= 0) {
            return -1;
        }
        long hashValue = Long.parseUnsignedLong(Long.toUnsignedString(this.hash(vertexId)));
        return (int)(Math.floorMod(hashValue, (long)partNum) + 1L);
    }

    private int keyToPartId(String spaceName, String key) {
        if (!this.partsAlloc.containsKey(spaceName)) {
            LOGGER.error("Invalid part of " + spaceName);
            return -1;
        }
        int partNum = this.partsAlloc.get(spaceName).size();
        if (partNum <= 0) {
            return -1;
        }
        long hashValue = Long.parseUnsignedLong(Long.toUnsignedString(this.hash(key)));
        return (int)(Math.floorMod(hashValue, (long)partNum) + 1L);
    }

    @Override
    public void close() {
        this.threadPool.shutdownNow();
        this.transport.close();
    }
}

