/*
 * Decompiled with CFR 0.152.
 */
package com.vesoft.nebula.storage.client;

import com.facebook.thrift.TException;
import com.facebook.thrift.protocol.TBinaryProtocol;
import com.facebook.thrift.transport.TSocket;
import com.facebook.thrift.transport.TTransport;
import com.facebook.thrift.transport.TTransportException;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.net.HostAndPort;
import com.vesoft.nebula.HostAddr;
import com.vesoft.nebula.Pair;
import com.vesoft.nebula.meta.client.MetaClientImpl;
import com.vesoft.nebula.storage.ExecResponse;
import com.vesoft.nebula.storage.GeneralResponse;
import com.vesoft.nebula.storage.GetRequest;
import com.vesoft.nebula.storage.PutRequest;
import com.vesoft.nebula.storage.RemoveRequest;
import com.vesoft.nebula.storage.ResultCode;
import com.vesoft.nebula.storage.StorageService;
import com.vesoft.nebula.storage.client.StorageClient;
import com.vesoft.nebula.utils.IPv4IntTransformer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.codec.digest.MurmurHash2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StorageClientImpl
implements StorageClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(StorageClientImpl.class);
    private TTransport transport = null;
    private Map<HostAddr, StorageService.Client> clientMap;
    private final int connectionRetry;
    private final int timeout;
    private MetaClientImpl metaClient;
    private Map<Integer, Map<Integer, HostAddr>> leaders;
    private Map<Integer, Map<Integer, List<HostAddr>>> partsAlloc;
    private ExecutorService threadPool;

    public StorageClientImpl(List<HostAndPort> addresses, int timeout, int connectionRetry) {
        Preconditions.checkArgument((timeout > 0 ? 1 : 0) != 0);
        Preconditions.checkArgument((connectionRetry > 0 ? 1 : 0) != 0);
        this.timeout = timeout;
        this.connectionRetry = connectionRetry;
        this.leaders = Maps.newConcurrentMap();
        this.clientMap = Maps.newConcurrentMap();
        this.threadPool = Executors.newFixedThreadPool(10);
    }

    public StorageClientImpl(MetaClientImpl metaClient) {
        this(Lists.newArrayList(), 1000, 3);
        this.metaClient = metaClient;
        this.metaClient.init();
        this.partsAlloc = this.metaClient.getParts();
    }

    private StorageService.Client connect(HostAddr addr) {
        if (this.clientMap.containsKey(addr)) {
            return this.clientMap.get(addr);
        }
        int retry = this.connectionRetry;
        while (retry-- != 0) {
            String ip = IPv4IntTransformer.intToIPv4(addr.getIp());
            int port = addr.getPort();
            this.transport = new TSocket(ip, port, this.timeout);
            TBinaryProtocol protocol = new TBinaryProtocol(this.transport);
            try {
                this.transport.open();
                StorageService.Client client = new StorageService.Client(protocol);
                this.clientMap.put(addr, client);
                return client;
            }
            catch (TTransportException tte) {
                LOGGER.error("Connect failed: " + tte.getMessage());
            }
            catch (TException te) {
                LOGGER.error("Connect failed: " + te.getMessage());
            }
        }
        return null;
    }

    @Override
    public boolean put(int space, String key, String value) {
        int part = this.keyToPartId(space, key);
        HostAddr leader = this.getLeader(space, part);
        if (leader == null) {
            return false;
        }
        PutRequest request = new PutRequest();
        request.setSpace_id(space);
        HashMap parts = Maps.newHashMap();
        ArrayList pairs = Lists.newArrayList((Object[])new Pair[]{new Pair(key, value)});
        parts.put(part, pairs);
        request.setParts(parts);
        LOGGER.debug(String.format("Put Request: %s", request.toString()));
        return this.doPut(space, leader, request);
    }

    @Override
    public boolean put(final int space, Map<String, String> kvs) {
        HashMap groups = Maps.newHashMap();
        for (Map.Entry<String, String> kv : kvs.entrySet()) {
            int n = this.keyToPartId(space, (String)kv.getKey());
            if (!groups.containsKey(n)) {
                groups.put(n, new ArrayList());
            }
            ((List)groups.get(n)).add(new Pair((String)kv.getKey(), (String)kv.getValue()));
        }
        HashMap requests = Maps.newHashMap();
        for (Map.Entry entry : groups.entrySet()) {
            PutRequest request;
            int part = (Integer)entry.getKey();
            HostAddr leader = this.getLeader(space, part);
            if (!requests.containsKey(leader)) {
                request = new PutRequest();
                request.setSpace_id(space);
                HashMap parts = Maps.newHashMap();
                parts.put(part, entry.getValue());
                request.setParts(parts);
                LOGGER.debug(String.format("Put Request: %s", request.toString()));
                requests.put(leader, request);
                continue;
            }
            request = (PutRequest)requests.get(leader);
            if (!request.parts.containsKey(part)) {
                request.parts.put(part, (List<Pair>)entry.getValue());
                continue;
            }
            request.parts.get(part).addAll((Collection)entry.getValue());
        }
        final CountDownLatch countDownLatch = new CountDownLatch(groups.size());
        final List<Boolean> list = Collections.synchronizedList(new ArrayList(groups.size()));
        for (final Map.Entry entry : requests.entrySet()) {
            this.threadPool.submit(new Runnable(){

                @Override
                public void run() {
                    if (StorageClientImpl.this.doPut(space, (HostAddr)entry.getKey(), (PutRequest)entry.getValue())) {
                        list.add(true);
                    } else {
                        list.add(false);
                    }
                    countDownLatch.countDown();
                }
            });
        }
        try {
            countDownLatch.await();
        }
        catch (InterruptedException e) {
            LOGGER.error("Put interrupted");
            return false;
        }
        for (Boolean ret : list) {
            if (ret.booleanValue()) continue;
            return false;
        }
        return true;
    }

    private boolean doPut(int space, HostAddr leader, PutRequest request) {
        StorageService.Client client = this.connect(leader);
        if (client == null) {
            return false;
        }
        int retry = this.connectionRetry;
        while (retry-- != 0) {
            try {
                ExecResponse response = client.put(request);
                if (!this.isSuccessfully(response)) {
                    for (ResultCode code : response.result.getFailed_codes()) {
                        HostAddr addr;
                        if (code.getCode() != -11 || (addr = code.getLeader()) == null || addr.getIp() == 0 || addr.getPort() == 0) continue;
                        HostAddr newLeader = new HostAddr(addr.getIp(), addr.getPort());
                        this.updateLeader(space, code.getPart_id(), newLeader);
                        StorageService.Client newClient = this.connect(newLeader);
                        if (newClient == null) continue;
                        client = newClient;
                    }
                    continue;
                }
                return true;
            }
            catch (TException e) {
                for (Integer part : request.parts.keySet()) {
                    this.invalidLeader(space, part);
                }
                LOGGER.error(String.format("Put Failed: %s", e.getMessage()));
            }
        }
        return false;
    }

    @Override
    public Optional<String> get(int space, String key) {
        int part = this.keyToPartId(space, key);
        HostAddr leader = this.getLeader(space, part);
        if (leader == null) {
            return Optional.absent();
        }
        GetRequest request = new GetRequest();
        request.setSpace_id(space);
        HashMap parts = Maps.newHashMap();
        parts.put(part, Arrays.asList(key));
        request.setParts(parts);
        LOGGER.debug(String.format("Get Request: %s", request.toString()));
        Optional<Map<String, String>> result = this.doGet(space, leader, request);
        if (!result.isPresent() || !((Map)result.get()).containsKey(key)) {
            return Optional.absent();
        }
        return Optional.of(((Map)result.get()).get(key));
    }

    @Override
    public Optional<Map<String, String>> get(final int space, List<String> keys) {
        HashMap groups = Maps.newHashMap();
        for (String string : keys) {
            int n = this.keyToPartId(space, string);
            if (!groups.containsKey(n)) {
                groups.put(n, new ArrayList());
            }
            ((List)groups.get(n)).add(string);
        }
        HashMap requests = Maps.newHashMap();
        for (Map.Entry entry : groups.entrySet()) {
            GetRequest request;
            int part = (Integer)entry.getKey();
            HostAddr leader = this.getLeader(space, part);
            if (!requests.containsKey(leader)) {
                request = new GetRequest();
                request.setSpace_id(space);
                HashMap parts = Maps.newHashMap();
                parts.put(part, entry.getValue());
                request.setParts(parts);
                LOGGER.debug(String.format("Get Request: %s", request.toString()));
                requests.put(leader, request);
                continue;
            }
            request = (GetRequest)requests.get(leader);
            if (!request.parts.containsKey(part)) {
                request.parts.put(part, (List<String>)entry.getValue());
                continue;
            }
            request.parts.get(part).addAll((Collection)entry.getValue());
        }
        final CountDownLatch countDownLatch = new CountDownLatch(groups.size());
        final List<Optional> list = Collections.synchronizedList(new ArrayList(groups.size()));
        for (final Map.Entry entry : requests.entrySet()) {
            this.threadPool.submit(new Runnable(){

                @Override
                public void run() {
                    list.add(StorageClientImpl.this.doGet(space, (HostAddr)entry.getKey(), (GetRequest)entry.getValue()));
                    countDownLatch.countDown();
                }
            });
        }
        try {
            countDownLatch.await();
        }
        catch (InterruptedException e) {
            LOGGER.error("Put interrupted");
            return Optional.absent();
        }
        HashMap result = Maps.newHashMap();
        for (Optional response : list) {
            if (!response.isPresent()) continue;
            result.putAll((Map)response.get());
        }
        return Optional.of((Object)result);
    }

    private Optional<Map<String, String>> doGet(int space, HostAddr leader, GetRequest request) {
        StorageService.Client client = this.connect(leader);
        if (client == null) {
            return Optional.absent();
        }
        int retry = this.connectionRetry;
        while (retry-- != 0) {
            try {
                GeneralResponse response = client.get(request);
                if (!this.isSuccessfully(response)) {
                    for (ResultCode code : response.result.getFailed_codes()) {
                        HostAddr addr;
                        if (code.getCode() != -11 || (addr = code.getLeader()) == null || addr.getIp() == 0 || addr.getPort() == 0) continue;
                        HostAddr newLeader = new HostAddr(addr.getIp(), addr.getPort());
                        this.updateLeader(space, code.getPart_id(), newLeader);
                        StorageService.Client newClient = this.connect(newLeader);
                        if (newClient == null) continue;
                        client = newClient;
                    }
                    continue;
                }
                return Optional.of(response.values);
            }
            catch (TException e) {
                for (Integer part : request.parts.keySet()) {
                    this.invalidLeader(space, part);
                }
                LOGGER.error(String.format("Get Failed: %s", e.getMessage()));
                return Optional.absent();
            }
        }
        return Optional.absent();
    }

    @Override
    public boolean remove(int space, String key) {
        int part = this.keyToPartId(space, key);
        HostAddr leader = this.getLeader(space, part);
        if (leader == null) {
            return false;
        }
        RemoveRequest request = new RemoveRequest();
        request.setSpace_id(space);
        HashMap parts = Maps.newHashMap();
        parts.put(part, Arrays.asList(key));
        request.setParts(parts);
        LOGGER.debug(String.format("Remove Request: %s", request.toString()));
        return this.doRemove(space, leader, request);
    }

    @Override
    public boolean remove(final int space, List<String> keys) {
        HashMap groups = Maps.newHashMap();
        for (String string : keys) {
            int n = this.keyToPartId(space, string);
            if (!groups.containsKey(n)) {
                groups.put(n, new ArrayList());
            }
            ((List)groups.get(n)).add(string);
        }
        HashMap requests = Maps.newHashMap();
        for (Map.Entry entry : groups.entrySet()) {
            RemoveRequest request;
            int part = (Integer)entry.getKey();
            HostAddr leader = this.getLeader(space, part);
            if (!requests.containsKey(leader)) {
                request = new RemoveRequest();
                request.setSpace_id(space);
                HashMap parts = Maps.newHashMap();
                parts.put(part, entry.getValue());
                request.setParts(parts);
                LOGGER.debug(String.format("Put Request: %s", request.toString()));
                requests.put(leader, request);
                continue;
            }
            request = (RemoveRequest)requests.get(leader);
            if (!request.parts.containsKey(part)) {
                request.parts.put(part, (List<String>)entry.getValue());
                continue;
            }
            request.parts.get(part).addAll((Collection)entry.getValue());
        }
        final CountDownLatch countDownLatch = new CountDownLatch(groups.size());
        final List<Boolean> list = Collections.synchronizedList(new ArrayList(groups.size()));
        for (final Map.Entry entry : requests.entrySet()) {
            this.threadPool.submit(new Runnable(){

                @Override
                public void run() {
                    if (StorageClientImpl.this.doRemove(space, (HostAddr)entry.getKey(), (RemoveRequest)entry.getValue())) {
                        list.add(true);
                    } else {
                        list.add(false);
                    }
                    countDownLatch.countDown();
                }
            });
        }
        try {
            countDownLatch.await();
        }
        catch (InterruptedException e) {
            LOGGER.error("Put interrupted");
            return false;
        }
        for (Boolean ret : list) {
            if (ret.booleanValue()) continue;
            return false;
        }
        return true;
    }

    private boolean doRemove(int space, HostAddr leader, RemoveRequest request) {
        StorageService.Client client = this.connect(leader);
        if (client == null) {
            return false;
        }
        int retry = this.connectionRetry;
        while (retry-- != 0) {
            try {
                ExecResponse response = client.remove(request);
                if (!this.isSuccessfully(response)) {
                    for (ResultCode code : response.result.getFailed_codes()) {
                        HostAddr addr;
                        if (code.getCode() != -11 || (addr = code.getLeader()) == null || addr.getIp() == 0 || addr.getPort() == 0) continue;
                        HostAddr newLeader = new HostAddr(addr.getIp(), addr.getPort());
                        this.updateLeader(space, code.getPart_id(), newLeader);
                        StorageService.Client newClient = this.connect(newLeader);
                        if (newClient == null) continue;
                        client = newClient;
                    }
                    continue;
                }
                return true;
            }
            catch (TException e) {
                for (Integer part : request.parts.keySet()) {
                    this.invalidLeader(space, part);
                }
                LOGGER.error(String.format("Remove Failed: %s", e.getMessage()));
                return false;
            }
        }
        return false;
    }

    private boolean isSuccessfully(ExecResponse response) {
        return response.result.failed_codes.size() == 0;
    }

    private boolean isSuccessfully(GeneralResponse response) {
        return response.result.failed_codes.size() == 0;
    }

    private void updateLeader(int spaceId, int partId, HostAddr addr) {
        LOGGER.debug("Update leader for space " + spaceId + ", " + partId + " to " + addr);
        if (!this.leaders.containsKey(spaceId)) {
            this.leaders.put(spaceId, Maps.newConcurrentMap());
        }
        this.leaders.get(spaceId).put(partId, addr);
    }

    private void invalidLeader(int spaceId, int partId) {
        LOGGER.debug("Invalid leader for space " + spaceId + ", " + partId);
        if (!this.leaders.containsKey(spaceId)) {
            this.leaders.put(spaceId, Maps.newConcurrentMap());
        }
        this.leaders.get(spaceId).remove(partId);
    }

    private HostAddr getLeader(int space, int part) {
        if (!this.leaders.containsKey(space)) {
            this.leaders.put(space, Maps.newConcurrentMap());
        }
        if (this.leaders.get(space).containsKey(part)) {
            return this.leaders.get(space).get(part);
        }
        List<HostAddr> addrs = this.metaClient.getPart(space, part);
        if (addrs != null) {
            Random random = new Random(System.currentTimeMillis());
            int position = random.nextInt(addrs.size());
            HostAddr leader = addrs.get(position);
            this.leaders.get(space).put(part, leader);
            return leader;
        }
        return null;
    }

    private long hash(String key) {
        return MurmurHash2.hash64((String)key);
    }

    private int keyToPartId(int space, String key) {
        if (!this.partsAlloc.containsKey(space)) {
            LOGGER.error("Invalid part of " + key);
            return -1;
        }
        int partNum = this.partsAlloc.get(space).size();
        return (int)(Math.abs(this.hash(key)) % (long)partNum + 1L);
    }

    @Override
    public void close() {
        this.threadPool.shutdownNow();
        this.transport.close();
    }
}

