/*
 * Decompiled with CFR 0.152.
 */
package com.linecorp.centraldogma.server.internal.replication;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import com.linecorp.centraldogma.internal.Jackson;
import com.linecorp.centraldogma.server.internal.command.AbstractCommandExecutor;
import com.linecorp.centraldogma.server.internal.command.Command;
import com.linecorp.centraldogma.server.internal.command.CommandExecutor;
import com.linecorp.centraldogma.server.internal.replication.ReplicationException;
import com.linecorp.centraldogma.server.internal.replication.ReplicationLog;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.curator.framework.api.transaction.CuratorTransactionBridge;
import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class ZooKeeperCommandExecutor
extends AbstractCommandExecutor
implements PathChildrenCacheListener {
    public static final int DEFAULT_TIMEOUT_MILLIS = 1000;
    public static final int DEFAULT_NUM_WORKERS = 16;
    public static final int DEFAULT_MAX_LOG_COUNT = 100;
    public static final long DEFAULT_MIN_LOG_AGE_MILLIS = TimeUnit.HOURS.toMillis(1L);
    private static final Logger logger = LoggerFactory.getLogger(ZooKeeperCommandExecutor.class);
    private static final int MAX_BYTES = 1047552;
    @VisibleForTesting
    static final String LOG_PATH = "logs";
    @VisibleForTesting
    static final String LOG_BLOCK_PATH = "log_blocks";
    @VisibleForTesting
    static final String LOCK_PATH = "lock";
    @VisibleForTesting
    static final String LEADER_PATH = "leader";
    private final CommandExecutor delegate;
    private final CuratorFramework curator;
    private final String zkPath;
    private final boolean createPathIfNotExist;
    private final ExecutorService executor;
    private final PathChildrenCache logWatcher;
    private final OldLogRemover oldLogRemover;
    private final LeaderSelector leaderSelector;
    private final File revisionFile;
    private final int maxLogCount;
    private final long minLogAgeMillis;
    private volatile ListenerInfo listenerInfo;
    private final ConcurrentMap<String, InterProcessMutex> mutexMap = new ConcurrentHashMap<String, InterProcessMutex>();

    public static Builder builder() {
        return new Builder();
    }

    private ZooKeeperCommandExecutor(String replicaId, CommandExecutor delegate, CuratorFramework curator, String zkPath, boolean createPathIfNotExist, File revisionFile, int numWorkers, int maxLogCount, long minLogAgeMillis) {
        super(replicaId);
        this.delegate = delegate;
        this.revisionFile = revisionFile;
        this.curator = curator;
        this.zkPath = zkPath;
        this.createPathIfNotExist = createPathIfNotExist;
        this.maxLogCount = maxLogCount;
        this.minLogAgeMillis = minLogAgeMillis;
        ThreadPoolExecutor executor = new ThreadPoolExecutor(numWorkers, numWorkers, 60L, TimeUnit.SECONDS, new LinkedTransferQueue<Runnable>(), (ThreadFactory)new DefaultThreadFactory("zookeeper-command-executor", true));
        executor.allowCoreThreadTimeOut(true);
        this.executor = executor;
        this.logWatcher = new PathChildrenCache(curator, this.absolutePath(LOG_PATH), true);
        this.logWatcher.getListenable().addListener((Object)this, MoreExecutors.directExecutor());
        this.oldLogRemover = new OldLogRemover();
        this.leaderSelector = new LeaderSelector(curator, this.absolutePath(LEADER_PATH), (LeaderSelectorListener)this.oldLogRemover);
        this.leaderSelector.autoRequeue();
    }

    @Override
    protected void doStart(@Nullable Runnable onTakeLeadership, @Nullable Runnable onReleaseLeadership) {
        try {
            long lastReplayedRevision;
            this.delegate.start(null, null);
            try {
                lastReplayedRevision = this.getLastReplayedRevision();
            }
            catch (Exception e) {
                throw new ReplicationException(e);
            }
            this.listenerInfo = new ListenerInfo(lastReplayedRevision, onTakeLeadership, onReleaseLeadership);
            this.curator.start();
            if (this.createPathIfNotExist) {
                this.createZkPathIfMissing(this.zkPath);
                this.createZkPathIfMissing(this.zkPath + '/' + LOG_PATH);
                this.createZkPathIfMissing(this.zkPath + '/' + LOG_BLOCK_PATH);
                this.createZkPathIfMissing(this.zkPath + '/' + LOCK_PATH);
            }
            this.logWatcher.start();
            this.leaderSelector.start();
        }
        catch (Exception e) {
            throw new ReplicationException(e);
        }
    }

    private void createZkPathIfMissing(String zkPath) throws Exception {
        if (this.curator.checkExists().forPath(zkPath) == null) {
            this.curator.create().forPath(zkPath);
        }
    }

    private void stopLater() {
        ForkJoinPool.commonPool().execute(this::stop);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    protected void doStop() {
        boolean interruptLater = false;
        while (!this.executor.isTerminated()) {
            this.executor.shutdown();
            try {
                this.executor.awaitTermination(1L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                interruptLater = true;
            }
        }
        try {
            this.leaderSelector.close();
            return;
        }
        catch (Exception e) {
            logger.warn("Failed to close the leader selector: {}", (Object)e.getMessage(), (Object)e);
            return;
        }
        finally {
            try {
                this.logWatcher.close();
            }
            catch (IOException e) {
                logger.warn("Failed to close the log watcher: {}", (Object)e.getMessage(), (Object)e);
            }
            finally {
                this.listenerInfo = null;
                this.curator.close();
                if (interruptLater) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    private long getLastReplayedRevision() throws Exception {
        FileInputStream fis;
        try {
            fis = new FileInputStream(this.revisionFile);
        }
        catch (FileNotFoundException ignored) {
            return -1L;
        }
        try (BufferedReader br = new BufferedReader(new InputStreamReader(fis));){
            String l = br.readLine();
            if (l == null) {
                long l2 = -1L;
                return l2;
            }
            long l3 = Long.parseLong(l.trim());
            return l3;
        }
    }

    private void updateLastReplayedRevision(long lastReplayedRevision) throws Exception {
        try (FileOutputStream fos = new FileOutputStream(this.revisionFile);){
            fos.write(String.valueOf(lastReplayedRevision).getBytes(StandardCharsets.UTF_8));
        }
        logger.info("Update lastReplayedRevision to: {}", (Object)lastReplayedRevision);
    }

    private synchronized void replayLogs(long targetRevision) {
        ListenerInfo info = this.listenerInfo;
        if (info == null) {
            return;
        }
        if (targetRevision <= info.lastReplayedRevision) {
            return;
        }
        try {
            long nextRevision;
            do {
                Optional<ReplicationLog<?>> log;
                if ((log = this.loadLog(nextRevision = info.lastReplayedRevision + 1L, true)).isPresent()) {
                    Object actualResult;
                    ReplicationLog<?> l = log.get();
                    String originatingReplicaId = l.replicaId();
                    Command<?> command = l.command();
                    Object expectedResult = l.result();
                    if (!Objects.equals(expectedResult, actualResult = this.delegate.execute(originatingReplicaId, command).join())) {
                        throw new ReplicationException("mismatching replay result: " + actualResult + " (expected: " + expectedResult + ", command: " + command + ')');
                    }
                }
                info.lastReplayedRevision = nextRevision;
            } while (nextRevision != targetRevision);
        }
        catch (Exception e) {
            logger.warn("log replay fails. stop.", (Throwable)e);
            this.stopLater();
            if (e instanceof ReplicationException) {
                throw (ReplicationException)e;
            }
            throw new ReplicationException(e);
        }
        try {
            this.updateLastReplayedRevision(targetRevision);
        }
        catch (Exception e) {
            throw new ReplicationException(e);
        }
    }

    public void childEvent(CuratorFramework unused, PathChildrenCacheEvent event) throws Exception {
        if (event.getType() != PathChildrenCacheEvent.Type.CHILD_ADDED) {
            return;
        }
        long lastKnownRevision = ZooKeeperCommandExecutor.revisionFromPath(event.getData().getPath());
        this.replayLogs(lastKnownRevision);
        this.oldLogRemover.touch();
    }

    private SafeLock safeLock(String executionPath) {
        InterProcessMutex mtx = this.mutexMap.computeIfAbsent(executionPath, k -> new InterProcessMutex(this.curator, this.absolutePath(LOCK_PATH, executionPath)));
        try {
            mtx.acquire();
        }
        catch (Exception e) {
            throw new ReplicationException(e);
        }
        return () -> {
            try {
                mtx.release();
            }
            catch (Exception exception) {
                // empty catch block
            }
        };
    }

    @VisibleForTesting
    static String path(String ... pathElements) {
        StringBuilder sb = new StringBuilder();
        for (String path : pathElements) {
            if (path.startsWith("/")) {
                path = path.substring(1);
            }
            if (path.endsWith("/")) {
                path = path.substring(0, path.length() - 1);
            }
            if (path.isEmpty()) continue;
            sb.append('/');
            sb.append(path);
        }
        return sb.toString();
    }

    private String absolutePath(String ... pathElements) {
        if (pathElements.length == 0) {
            return this.zkPath;
        }
        return ZooKeeperCommandExecutor.path(this.zkPath, ZooKeeperCommandExecutor.path(pathElements));
    }

    private long storeLog(ReplicationLog<?> log) {
        try {
            byte[] bytes = Jackson.writeValueAsBytes(log);
            assert (bytes.length > 0);
            LogMeta logMeta = new LogMeta(log.replicaId(), System.currentTimeMillis(), bytes.length);
            int count = (bytes.length + 1047552 - 1) / 1047552;
            for (int i = 0; i < count; ++i) {
                int start = i * 1047552;
                int end = Math.min((i + 1) * 1047552, bytes.length);
                byte[] b = Arrays.copyOfRange(bytes, start, end);
                String blockPath = (String)((ACLBackgroundPathAndBytesable)this.curator.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL)).forPath(this.absolutePath(LOG_BLOCK_PATH) + '/', b);
                long blockId = ZooKeeperCommandExecutor.revisionFromPath(blockPath);
                logMeta.appendBlock(blockId);
            }
            String logPath = (String)((ACLBackgroundPathAndBytesable)this.curator.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL)).forPath(this.absolutePath(LOG_PATH) + '/', Jackson.writeValueAsBytes((Object)logMeta));
            return ZooKeeperCommandExecutor.revisionFromPath(logPath);
        }
        catch (Exception e) {
            throw new ReplicationException(e);
        }
    }

    @VisibleForTesting
    Optional<ReplicationLog<?>> loadLog(long revision, boolean skipIfSameReplica) {
        try {
            String logPath = this.absolutePath(LOG_PATH) + '/' + ZooKeeperCommandExecutor.pathFromRevision(revision);
            LogMeta logMeta = (LogMeta)Jackson.readValue((byte[])((byte[])this.curator.getData().forPath(logPath)), LogMeta.class);
            if (skipIfSameReplica && Objects.equals(this.replicaId(), logMeta.replicaId())) {
                return Optional.empty();
            }
            byte[] bytes = new byte[logMeta.size()];
            int offset = 0;
            for (long blockId : logMeta.blocks()) {
                String blockPath = this.absolutePath(LOG_BLOCK_PATH) + '/' + ZooKeeperCommandExecutor.pathFromRevision(blockId);
                byte[] b = (byte[])this.curator.getData().forPath(blockPath);
                System.arraycopy(b, 0, bytes, offset, b.length);
                offset += b.length;
            }
            assert (logMeta.size() == offset);
            ReplicationLog log = (ReplicationLog)Jackson.readValue((byte[])bytes, ReplicationLog.class);
            return Optional.of(log);
        }
        catch (Exception e) {
            throw new ReplicationException(e);
        }
    }

    private static long revisionFromPath(String path) {
        String[] s = path.split("/");
        return Long.parseLong(s[s.length - 1]);
    }

    private static String pathFromRevision(long revision) {
        return String.format("%010d", revision);
    }

    @Override
    protected <T> CompletableFuture<T> doExecute(String replicaId, Command<T> command) throws Exception {
        CompletableFuture future = new CompletableFuture();
        this.executor.execute(() -> {
            try {
                future.complete(this.blockingExecute(replicaId, command));
            }
            catch (Throwable t) {
                future.completeExceptionally(t);
            }
        });
        return future;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private <T> T blockingExecute(String replicaId, Command<T> command) throws Exception {
        try (SafeLock ignored = this.safeLock(command.executionPath());){
            List recentRevisions = (List)this.curator.getChildren().forPath(this.absolutePath(LOG_PATH));
            if (!recentRevisions.isEmpty()) {
                long lastRevision = recentRevisions.stream().mapToLong(Long::parseLong).max().getAsLong();
                this.replayLogs(lastRevision);
            }
            T result = this.delegate.execute(replicaId, command).join();
            ReplicationLog<T> log = new ReplicationLog<T>(this.replicaId(), command, result);
            long revision = this.storeLog(log);
            logger.debug("logging OK. revision = {}, log = {}", (Object)revision, log);
            T t = result;
            return t;
        }
        catch (ReplicationException e) {
            this.stopLater();
            throw e;
        }
    }

    private static class LogMeta {
        private final String replicaId;
        private final long timestamp;
        private final int size;
        private final List<Long> blocks = new ArrayList<Long>();

        @JsonCreator
        LogMeta(@JsonProperty(value="replicaId") String replicaId, @JsonProperty(value="timestamp", defaultValue="0") long timestamp, @JsonProperty(value="size") int size) {
            this.replicaId = replicaId;
            this.timestamp = timestamp;
            this.size = size;
        }

        @JsonProperty
        String replicaId() {
            return this.replicaId;
        }

        @JsonProperty
        long timestamp() {
            return this.timestamp;
        }

        @JsonProperty
        int size() {
            return this.size;
        }

        @JsonProperty
        List<Long> blocks() {
            return Collections.unmodifiableList(this.blocks);
        }

        public void appendBlock(long blockId) {
            this.blocks.add(blockId);
        }
    }

    @FunctionalInterface
    private static interface SafeLock
    extends AutoCloseable {
        @Override
        public void close();
    }

    public static class Builder {
        private String replicaId;
        private CommandExecutor delegate;
        private int numWorkers = 16;
        private String connectionString;
        private int timeoutMillis = 1000;
        private boolean createPathIfNotExist;
        private String path;
        private File revisionFile;
        private int maxLogCount = 100;
        private long minLogAgeMillis = DEFAULT_MIN_LOG_AGE_MILLIS;

        public Builder replicaId(String replicaId) {
            this.replicaId = replicaId;
            return this;
        }

        public Builder delegate(CommandExecutor delegate) {
            this.delegate = delegate;
            return this;
        }

        public Builder numWorkers(int numWorkers) {
            if (numWorkers <= 0) {
                throw new IllegalArgumentException("numWorkers: " + numWorkers + " (expected: > 0)");
            }
            this.numWorkers = numWorkers;
            return this;
        }

        public Builder connectionString(String connectionString) {
            this.connectionString = connectionString;
            return this;
        }

        public Builder timeoutMillis(int timeoutMillis) {
            this.timeoutMillis = timeoutMillis;
            return this;
        }

        public Builder createPathIfNotExist(boolean b) {
            this.createPathIfNotExist = b;
            return this;
        }

        public Builder path(String path) {
            path = path.endsWith("/") ? path.substring(0, path.length() - 1) : path;
            Preconditions.checkArgument((!path.isEmpty() ? 1 : 0) != 0, (Object)"ZooKeeper path must not refer to the root node.");
            this.path = path;
            return this;
        }

        public Builder revisionFile(File f) {
            this.revisionFile = f;
            return this;
        }

        public Builder maxLogCount(int c) {
            if (c <= 0) {
                throw new IllegalArgumentException("maxLogCount: " + this.maxLogCount + " (expected: > 0)");
            }
            this.maxLogCount = c;
            return this;
        }

        public Builder minLogAge(long minLogAge, TimeUnit unit) {
            if (minLogAge <= 0L) {
                throw new IllegalArgumentException("minLogAge: " + minLogAge + " (expected: > 0)");
            }
            this.minLogAgeMillis = Objects.requireNonNull(unit, "unit").toMillis(minLogAge);
            return this;
        }

        public ZooKeeperCommandExecutor build() {
            Objects.requireNonNull(this.replicaId, "replicaId");
            Objects.requireNonNull(this.delegate, "delegate");
            Objects.requireNonNull(this.connectionString, "connectionString");
            Objects.requireNonNull(this.path, "path");
            Objects.requireNonNull(this.revisionFile, "revisionFile");
            CuratorFramework curator = CuratorFrameworkFactory.newClient((String)this.connectionString, (RetryPolicy)new ExponentialBackoffRetry(this.timeoutMillis, 3));
            return new ZooKeeperCommandExecutor(this.replicaId, this.delegate, curator, this.path, this.createPathIfNotExist, this.revisionFile, this.numWorkers, this.maxLogCount, this.minLogAgeMillis);
        }
    }

    private static final class ListenerInfo {
        long lastReplayedRevision;
        final Runnable onTakeLeadership;
        final Runnable onReleaseLeadership;

        ListenerInfo(long lastReplayedRevision, @Nullable Runnable onTakeLeadership, @Nullable Runnable onReleaseLeadership) {
            this.lastReplayedRevision = lastReplayedRevision;
            this.onReleaseLeadership = onReleaseLeadership;
            this.onTakeLeadership = onTakeLeadership;
        }
    }

    private class OldLogRemover
    implements LeaderSelectorListener {
        private OldLogRemover() {
        }

        public void stateChanged(CuratorFramework client, ConnectionState newState) {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        public void takeLeadership(CuratorFramework client) throws Exception {
            ListenerInfo listenerInfo = ZooKeeperCommandExecutor.this.listenerInfo;
            if (listenerInfo == null) {
                return;
            }
            logger.info("Taking leadership: {}", (Object)ZooKeeperCommandExecutor.this.replicaId());
            try {
                if (listenerInfo.onTakeLeadership != null) {
                    listenerInfo.onTakeLeadership.run();
                }
                while (ZooKeeperCommandExecutor.this.curator.getState() == CuratorFrameworkState.STARTED) {
                    this.deleteLogs();
                    OldLogRemover oldLogRemover = this;
                    synchronized (oldLogRemover) {
                        this.wait();
                    }
                }
                return;
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return;
            }
            catch (Exception e) {
                logger.error("Leader stopped due to an unexpected exception:", (Throwable)e);
                return;
            }
            finally {
                logger.info("Releasing leadership: {}", (Object)ZooKeeperCommandExecutor.this.replicaId());
                if (listenerInfo.onReleaseLeadership != null) {
                    listenerInfo.onReleaseLeadership.run();
                }
            }
        }

        public synchronized void touch() {
            this.notify();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void deleteLogs() throws Exception {
            List children = (List)ZooKeeperCommandExecutor.this.curator.getChildren().forPath(ZooKeeperCommandExecutor.this.absolutePath(new String[]{ZooKeeperCommandExecutor.LOG_PATH}));
            if (children.size() <= ZooKeeperCommandExecutor.this.maxLogCount) {
                return;
            }
            long minAllowedTimestamp = System.currentTimeMillis() - ZooKeeperCommandExecutor.this.minLogAgeMillis;
            int targetCount = children.size() - ZooKeeperCommandExecutor.this.maxLogCount;
            ArrayList deleted = new ArrayList(targetCount);
            children.sort(Comparator.comparingLong(Long::parseLong));
            try {
                for (int i = 0; i < targetCount; ++i) {
                    String logPath = ZooKeeperCommandExecutor.this.absolutePath(new String[]{ZooKeeperCommandExecutor.LOG_PATH, (String)children.get(i)});
                    LogMeta meta = (LogMeta)Jackson.readValue((byte[])((byte[])ZooKeeperCommandExecutor.this.curator.getData().forPath(logPath)), LogMeta.class);
                    if (meta.timestamp() >= minAllowedTimestamp) {
                        break;
                    }
                    CuratorTransactionFinal tr = ((CuratorTransactionBridge)ZooKeeperCommandExecutor.this.curator.inTransaction().delete().forPath(logPath)).and();
                    for (long blockId : meta.blocks()) {
                        String blockPath = ZooKeeperCommandExecutor.this.absolutePath(new String[]{ZooKeeperCommandExecutor.LOG_BLOCK_PATH}) + '/' + ZooKeeperCommandExecutor.pathFromRevision(blockId);
                        ((CuratorTransactionBridge)tr.delete().forPath(blockPath)).and();
                    }
                    tr.commit();
                    deleted.add(children.get(i));
                }
            }
            finally {
                logger.info("delete logs: {}", deleted);
            }
        }
    }
}

