/*
 * Decompiled with CFR 0.152.
 */
package com.linecorp.centraldogma.server.internal.replication;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.linecorp.centraldogma.internal.Jackson;
import com.linecorp.centraldogma.internal.shaded.guava.annotations.VisibleForTesting;
import com.linecorp.centraldogma.internal.shaded.guava.collect.ImmutableList;
import com.linecorp.centraldogma.internal.shaded.guava.escape.Escaper;
import com.linecorp.centraldogma.internal.shaded.guava.escape.Escapers;
import com.linecorp.centraldogma.internal.shaded.guava.util.concurrent.MoreExecutors;
import com.linecorp.centraldogma.server.ZooKeeperReplicationConfig;
import com.linecorp.centraldogma.server.command.AbstractCommandExecutor;
import com.linecorp.centraldogma.server.command.Command;
import com.linecorp.centraldogma.server.command.CommandExecutor;
import com.linecorp.centraldogma.server.internal.replication.EmbeddedZooKeeper;
import com.linecorp.centraldogma.server.internal.replication.ReplicationException;
import com.linecorp.centraldogma.server.internal.replication.ReplicationLog;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.curator.framework.api.transaction.CuratorOp;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.RetryForever;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.server.auth.DigestLoginModule;
import org.apache.zookeeper.server.auth.SASLAuthenticationProvider;
import org.apache.zookeeper.server.quorum.QuorumPeer;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class ZooKeeperCommandExecutor
extends AbstractCommandExecutor
implements PathChildrenCacheListener {
    private static final Logger logger = LoggerFactory.getLogger(ZooKeeperCommandExecutor.class);
    private static final Escaper jaasValueEscaper = Escapers.builder().addEscape('\"', "\\\"").addEscape('\\', "\\\\").build();
    private static final String PATH_PREFIX = "/dogma";
    private static final int MAX_BYTES = 1047552;
    @VisibleForTesting
    static final String LOG_PATH = "logs";
    @VisibleForTesting
    static final String LOG_BLOCK_PATH = "log_blocks";
    @VisibleForTesting
    static final String LOCK_PATH = "lock";
    @VisibleForTesting
    static final String LEADER_PATH = "leader";
    private static final RetryPolicy RETRY_POLICY_ALWAYS = new RetryForever(500);
    private static final RetryPolicy RETRY_POLICY_NEVER = (retryCount, elapsedTimeMs, sleeper) -> false;
    private final ZooKeeperReplicationConfig cfg;
    private final File revisionFile;
    private final File zkConfFile;
    private final File zkDataDir;
    private final File zkLogDir;
    private final CommandExecutor delegate;
    private final MeterRegistry meterRegistry;
    private volatile EmbeddedZooKeeper quorumPeer;
    private volatile CuratorFramework curator;
    private volatile RetryPolicy retryPolicy = RETRY_POLICY_NEVER;
    private volatile ExecutorService executor;
    private volatile ExecutorService logWatcherExecutor;
    private volatile PathChildrenCache logWatcher;
    private volatile OldLogRemover oldLogRemover;
    private volatile ExecutorService leaderSelectorExecutor;
    private volatile LeaderSelector leaderSelector;
    private volatile boolean createdParentNodes;
    private volatile ListenerInfo listenerInfo;
    private final ConcurrentMap<String, InterProcessMutex> mutexMap = new ConcurrentHashMap<String, InterProcessMutex>();

    public ZooKeeperCommandExecutor(ZooKeeperReplicationConfig cfg, File dataDir, CommandExecutor delegate, MeterRegistry meterRegistry, @Nullable Consumer<CommandExecutor> onTakeLeadership, @Nullable Consumer<CommandExecutor> onReleaseLeadership) {
        super(onTakeLeadership, onReleaseLeadership);
        this.cfg = Objects.requireNonNull(cfg, "cfg");
        Objects.requireNonNull(dataDir, "dataDir");
        this.revisionFile = new File(dataDir.getAbsolutePath() + File.separatorChar + "last_revision");
        this.zkConfFile = new File(dataDir.getAbsolutePath() + File.separatorChar + "_zookeeper" + File.separatorChar + "config.properties");
        this.zkDataDir = new File(dataDir.getAbsolutePath() + File.separatorChar + "_zookeeper" + File.separatorChar + "data");
        this.zkLogDir = new File(dataDir.getAbsolutePath() + File.separatorChar + "_zookeeper" + File.separatorChar + "log");
        this.delegate = Objects.requireNonNull(delegate, "delegate");
        this.meterRegistry = Objects.requireNonNull(meterRegistry, "meterRegistry");
        Gauge.builder((String)"replica.id", (Object)this, self -> this.replicaId()).register(meterRegistry);
        Gauge.builder((String)"replica.readOnly", (Object)this, self -> self.isWritable() ? 0.0 : 1.0).register(meterRegistry);
        Gauge.builder((String)"replica.replicating", (Object)this, self -> self.isStarted() ? 1.0 : 0.0).register(meterRegistry);
        Gauge.builder((String)"replica.hasLeadership", (Object)this, self -> {
            OldLogRemover remover = self.oldLogRemover;
            return remover != null && remover.hasLeadership ? 1.0 : 0.0;
        }).register(meterRegistry);
        Gauge.builder((String)"replica.lastReplayedRevision", (Object)this, self -> {
            ListenerInfo info = self.listenerInfo;
            if (info == null) {
                return 0.0;
            }
            return info.lastReplayedRevision;
        }).register(meterRegistry);
    }

    @Override
    public int replicaId() {
        return this.cfg.serverId();
    }

    @Override
    protected void doStart(@Nullable Runnable onTakeLeadership, @Nullable Runnable onReleaseLeadership) throws Exception {
        try {
            try {
                long lastReplayedRevision = this.getLastReplayedRevision();
                this.listenerInfo = new ListenerInfo(lastReplayedRevision, onTakeLeadership, onReleaseLeadership);
            }
            catch (Exception e) {
                throw new ReplicationException("failed to read " + this.revisionFile, e);
            }
            this.quorumPeer = this.startZooKeeper();
            this.retryPolicy = RETRY_POLICY_ALWAYS;
            this.curator = CuratorFrameworkFactory.newClient((String)("127.0.0.1:" + this.quorumPeer.getClientPort()), (int)this.cfg.timeoutMillis(), (int)this.cfg.timeoutMillis(), (retryCount, elapsedTimeMs, sleeper) -> this.retryPolicy.allowRetry(retryCount, elapsedTimeMs, sleeper));
            this.curator.start();
            this.logWatcherExecutor = ExecutorServiceMetrics.monitor((MeterRegistry)this.meterRegistry, (ExecutorService)Executors.newSingleThreadExecutor((ThreadFactory)new DefaultThreadFactory("zookeeper-log-watcher", true)), (String)"zkLogWatcher", (Tag[])new Tag[0]);
            this.logWatcher = new PathChildrenCache(this.curator, ZooKeeperCommandExecutor.absolutePath(LOG_PATH), true, false, this.logWatcherExecutor);
            this.logWatcher.getListenable().addListener((Object)this, MoreExecutors.directExecutor());
            this.logWatcher.start();
            this.oldLogRemover = new OldLogRemover();
            this.leaderSelectorExecutor = ExecutorServiceMetrics.monitor((MeterRegistry)this.meterRegistry, (ExecutorService)Executors.newSingleThreadExecutor((ThreadFactory)new DefaultThreadFactory("zookeeper-leader-selector", true)), (String)"zkLeaderSelector", (Tag[])new Tag[0]);
            this.leaderSelector = new LeaderSelector(this.curator, ZooKeeperCommandExecutor.absolutePath(LEADER_PATH), this.leaderSelectorExecutor, (LeaderSelectorListener)this.oldLogRemover);
            this.leaderSelector.start();
            this.delegate.start();
            ThreadPoolExecutor executor = new ThreadPoolExecutor(this.cfg.numWorkers(), this.cfg.numWorkers(), 60L, TimeUnit.SECONDS, new LinkedTransferQueue<Runnable>(), (ThreadFactory)new DefaultThreadFactory("zookeeper-command-executor", true));
            executor.allowCoreThreadTimeOut(true);
            this.executor = ExecutorServiceMetrics.monitor((MeterRegistry)this.meterRegistry, (ExecutorService)executor, (String)"zkCommandExecutor", (Tag[])new Tag[0]);
        }
        catch (ReplicationException | InterruptedException e) {
            throw e;
        }
        catch (Exception e) {
            throw new ReplicationException(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private EmbeddedZooKeeper startZooKeeper() throws Exception {
        logger.info("Starting the ZooKeeper peer ({}) ..", (Object)this.cfg.serverId());
        EmbeddedZooKeeper peer = null;
        boolean success = false;
        try {
            QuorumPeer.ServerState state;
            Properties zkProps = new Properties();
            this.copyZkProperty(zkProps, "initLimit", "5");
            this.copyZkProperty(zkProps, "syncLimit", "10");
            this.copyZkProperty(zkProps, "tickTime", "3000");
            this.copyZkProperty(zkProps, "syncEnabled", "true");
            this.copyZkProperty(zkProps, "autopurge.snapRetainCount", "3");
            this.copyZkProperty(zkProps, "autopurge.purgeInterval", "1");
            System.setProperty("zookeeper.fsync.warningthresholdms", this.cfg.additionalProperties().getOrDefault("fsync.warningthresholdms", "1000"));
            zkProps.setProperty("dataDir", this.zkDataDir.getPath());
            zkProps.setProperty("dataLogDir", this.zkLogDir.getPath());
            this.zkDataDir.mkdirs();
            this.zkLogDir.mkdirs();
            try (FileOutputStream out = new FileOutputStream(new File(this.zkDataDir, "myid"));){
                out.write((this.cfg.serverId() + "\n").getBytes(StandardCharsets.US_ASCII));
            }
            File jaasConfFile = new File(this.zkDataDir, "jaas.conf");
            try (FileOutputStream out = new FileOutputStream(jaasConfFile);){
                StringBuilder buf = new StringBuilder();
                String newline = System.lineSeparator();
                String escapedSecret = jaasValueEscaper.escape(this.cfg.secret());
                ImmutableList.of((Object)"Server", (Object)"QuorumServer").forEach(name -> {
                    buf.append((String)name).append(" {").append(newline);
                    buf.append(DigestLoginModule.class.getName()).append(" required").append(newline);
                    buf.append("user_super=\"").append(escapedSecret).append("\";").append(newline);
                    buf.append("};").append(newline);
                });
                ImmutableList.of((Object)"Client", (Object)"QuorumLearner").forEach(name -> {
                    buf.append((String)name).append(" {").append(newline);
                    buf.append(DigestLoginModule.class.getName()).append(" required").append(newline);
                    buf.append("username=\"super\"").append(newline);
                    buf.append("password=\"").append(escapedSecret).append("\";").append(newline);
                    buf.append("};").append(newline);
                });
                out.write(buf.toString().getBytes());
            }
            System.setProperty("java.security.auth.login.config", jaasConfFile.getAbsolutePath());
            System.setProperty("zookeeper.authProvider.1", SASLAuthenticationProvider.class.getName());
            zkProps.setProperty("clientPort", String.valueOf(this.cfg.serverAddress().clientPort()));
            this.cfg.servers().forEach((id, addr) -> zkProps.setProperty("server." + id, addr.host() + ':' + addr.quorumPort() + ':' + addr.electionPort() + ":participant"));
            zkProps.setProperty("admin.enableServer", "false");
            this.zkConfFile.getParentFile().mkdirs();
            out = new FileOutputStream(this.zkConfFile);
            try {
                zkProps.store(out, null);
            }
            finally {
                out.close();
            }
            QuorumPeerConfig zkCfg = new QuorumPeerConfig();
            zkCfg.parse(this.zkConfFile.getPath());
            peer = new EmbeddedZooKeeper(zkCfg, this.meterRegistry);
            peer.start();
            while ((state = peer.getPeerState()) != QuorumPeer.ServerState.FOLLOWING && state != QuorumPeer.ServerState.LEADING) {
                if (this.isStopping()) {
                    throw new InterruptedException("Stop requested before joining the cluster");
                }
                logger.info("Waiting for the ZooKeeper peer ({}) to join the cluster ..", (Object)peer.getId());
                Thread.sleep(1000L);
            }
            if (peer.getId() == peer.getCurrentVote().getId()) {
                logger.info("The ZooKeeper peer ({}) has joined the cluster as a leader.", (Object)peer.getId());
            } else {
                logger.info("The ZooKeeper peer ({}) has joined the cluster, following {}.", (Object)peer.getId(), (Object)peer.getCurrentVote().getId());
            }
            success = true;
            EmbeddedZooKeeper embeddedZooKeeper = peer;
            return embeddedZooKeeper;
        }
        finally {
            if (!success && peer != null) {
                try {
                    peer.shutdown();
                }
                catch (Exception e) {
                    logger.warn("Failed to shutdown the failed ZooKeeper peer: {}", (Object)e.getMessage(), (Object)e);
                }
            }
        }
    }

    private void copyZkProperty(Properties zkProps, String key, String defaultValue) {
        zkProps.setProperty(key, this.cfg.additionalProperties().getOrDefault(key, defaultValue));
    }

    private void stopLater() {
        ForkJoinPool.commonPool().execute(this::stop);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    protected void doStop(@Nullable Runnable onReleaseLeadership) throws Exception {
        this.listenerInfo = null;
        logger.info("Stopping the worker threads");
        boolean interrupted = ZooKeeperCommandExecutor.shutdown(this.executor);
        logger.info("Stopped the worker threads");
        try {
            logger.info("Stopping the delegate command executor");
            this.delegate.stop();
            logger.info("Stopped the delegate command executor");
            return;
        }
        catch (Exception e) {
            logger.warn("Failed to stop the delegate command executor {}: {}", new Object[]{this.delegate, e.getMessage(), e});
            return;
        }
        finally {
            this.retryPolicy = RETRY_POLICY_NEVER;
            try {
                if (this.leaderSelector != null) {
                    logger.info("Closing the leader selector");
                    this.leaderSelector.close();
                    interrupted |= ZooKeeperCommandExecutor.shutdown(this.leaderSelectorExecutor);
                    logger.info("Closed the leader selector");
                }
            }
            catch (Exception e) {
                logger.warn("Failed to close the leader selector: {}", (Object)e.getMessage(), (Object)e);
            }
            finally {
                try {
                    if (this.logWatcher != null) {
                        logger.info("Closing the log watcher");
                        this.logWatcher.close();
                        interrupted |= ZooKeeperCommandExecutor.shutdown(this.logWatcherExecutor);
                        logger.info("Closed the log watcher");
                    }
                }
                catch (Exception e) {
                    logger.warn("Failed to close the log watcher: {}", (Object)e.getMessage(), (Object)e);
                }
                finally {
                    try {
                        if (this.curator != null) {
                            logger.info("Closing the Curator framework");
                            this.curator.close();
                            logger.info("Closed the Curator framework");
                        }
                    }
                    catch (Exception e) {
                        logger.warn("Failed to close the Curator framework: {}", (Object)e.getMessage(), (Object)e);
                    }
                    finally {
                        try {
                            if (this.quorumPeer != null) {
                                long peerId = this.quorumPeer.getId();
                                logger.info("Shutting down the ZooKeeper peer ({})", (Object)peerId);
                                this.quorumPeer.shutdown();
                                logger.info("Shut down the ZooKeeper peer ({})", (Object)peerId);
                            }
                        }
                        catch (Exception e) {
                            logger.warn("Failed to shut down the ZooKeeper peer: {}", (Object)e.getMessage(), (Object)e);
                        }
                        finally {
                            if (interrupted) {
                                Thread.currentThread().interrupt();
                            }
                        }
                    }
                }
            }
        }
    }

    private static boolean shutdown(@Nullable ExecutorService executor) {
        if (executor == null) {
            return false;
        }
        boolean interrupted = false;
        while (!executor.isTerminated()) {
            executor.shutdown();
            try {
                executor.awaitTermination(1L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                interrupted = true;
            }
        }
        return interrupted;
    }

    private long getLastReplayedRevision() throws Exception {
        FileInputStream fis;
        try {
            fis = new FileInputStream(this.revisionFile);
        }
        catch (FileNotFoundException ignored) {
            return -1L;
        }
        try (BufferedReader br = new BufferedReader(new InputStreamReader(fis));){
            String l = br.readLine();
            if (l == null) {
                long l2 = -1L;
                return l2;
            }
            long l3 = Long.parseLong(l.trim());
            return l3;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateLastReplayedRevision(long lastReplayedRevision) throws Exception {
        boolean success = false;
        try (FileOutputStream fos = new FileOutputStream(this.revisionFile);){
            fos.write(String.valueOf(lastReplayedRevision).getBytes(StandardCharsets.UTF_8));
            success = true;
        }
        finally {
            if (success) {
                logger.info("Updated lastReplayedRevision to: {}", (Object)lastReplayedRevision);
            } else {
                logger.error("Failed to update lastReplayedRevision to: {}", (Object)lastReplayedRevision);
            }
        }
    }

    private synchronized void replayLogs(long targetRevision) {
        ListenerInfo info = this.listenerInfo;
        if (info == null) {
            return;
        }
        if (targetRevision <= info.lastReplayedRevision) {
            return;
        }
        try {
            long nextRevision;
            do {
                Optional<ReplicationLog<?>> log;
                if ((log = this.loadLog(nextRevision = info.lastReplayedRevision + 1L, true)).isPresent()) {
                    Object actualResult;
                    ReplicationLog<?> l = log.get();
                    Command<?> command = l.command();
                    Object expectedResult = l.result();
                    if (!Objects.equals(expectedResult, actualResult = this.delegate.execute(command).get())) {
                        throw new ReplicationException("mismatching replay result at revision " + info.lastReplayedRevision + ": " + actualResult + " (expected: " + expectedResult + ", command: " + command + ')');
                    }
                }
                info.lastReplayedRevision = nextRevision;
                this.updateLastReplayedRevision(nextRevision);
            } while (nextRevision != targetRevision);
        }
        catch (Throwable t) {
            logger.error("Failed to replay a log at revision {}; entering read-only mode", (Object)info.lastReplayedRevision, (Object)t);
            this.stopLater();
            if (t instanceof ReplicationException) {
                throw (ReplicationException)t;
            }
            throw new ReplicationException("failed to replay a log at revision " + info.lastReplayedRevision, t);
        }
    }

    public void childEvent(CuratorFramework unused, PathChildrenCacheEvent event) throws Exception {
        if (event.getType() != PathChildrenCacheEvent.Type.CHILD_ADDED) {
            return;
        }
        long lastKnownRevision = ZooKeeperCommandExecutor.revisionFromPath(event.getData().getPath());
        try {
            this.replayLogs(lastKnownRevision);
        }
        catch (ReplicationException ignored) {
            return;
        }
        this.oldLogRemover.touch();
    }

    private SafeLock safeLock(String executionPath) {
        InterProcessMutex mtx = this.mutexMap.computeIfAbsent(executionPath, k -> new InterProcessMutex(this.curator, ZooKeeperCommandExecutor.absolutePath(LOCK_PATH, executionPath)));
        try {
            mtx.acquire();
        }
        catch (Exception e) {
            logger.error("Failed to acquire a lock for {}; entering read-only mode", (Object)executionPath, (Object)e);
            this.stopLater();
            throw new ReplicationException("failed to acquire a lock for " + executionPath, e);
        }
        return () -> {
            try {
                mtx.release();
            }
            catch (Exception exception) {
                // empty catch block
            }
        };
    }

    @VisibleForTesting
    static String path(String ... pathElements) {
        StringBuilder sb = new StringBuilder();
        for (String path : pathElements) {
            if (path.startsWith("/")) {
                path = path.substring(1);
            }
            if (path.endsWith("/")) {
                path = path.substring(0, path.length() - 1);
            }
            if (path.isEmpty()) continue;
            sb.append('/');
            sb.append(path);
        }
        return sb.toString();
    }

    private static String absolutePath(String ... pathElements) {
        if (pathElements.length == 0) {
            return PATH_PREFIX;
        }
        return ZooKeeperCommandExecutor.path(PATH_PREFIX, ZooKeeperCommandExecutor.path(pathElements));
    }

    private long storeLog(ReplicationLog<?> log) {
        try {
            byte[] bytes = Jackson.writeValueAsBytes(log);
            assert (bytes.length > 0);
            LogMeta logMeta = new LogMeta(log.replicaId(), System.currentTimeMillis(), bytes.length);
            int count = (bytes.length + 1047552 - 1) / 1047552;
            for (int i = 0; i < count; ++i) {
                int start = i * 1047552;
                int end = Math.min((i + 1) * 1047552, bytes.length);
                byte[] b = Arrays.copyOfRange(bytes, start, end);
                String blockPath = (String)((ACLBackgroundPathAndBytesable)this.curator.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL)).forPath(ZooKeeperCommandExecutor.absolutePath(LOG_BLOCK_PATH) + '/', b);
                long blockId = ZooKeeperCommandExecutor.revisionFromPath(blockPath);
                logMeta.appendBlock(blockId);
            }
            String logPath = (String)((ACLBackgroundPathAndBytesable)this.curator.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL)).forPath(ZooKeeperCommandExecutor.absolutePath(LOG_PATH) + '/', Jackson.writeValueAsBytes((Object)logMeta));
            return ZooKeeperCommandExecutor.revisionFromPath(logPath);
        }
        catch (Exception e) {
            logger.error("Failed to store a log; entering read-only mode: {}", log, (Object)e);
            this.stopLater();
            throw new ReplicationException("failed to store a log: " + log, e);
        }
    }

    @VisibleForTesting
    Optional<ReplicationLog<?>> loadLog(long revision, boolean skipIfSameReplica) {
        try {
            this.createParentNodes();
            String logPath = ZooKeeperCommandExecutor.absolutePath(LOG_PATH) + '/' + ZooKeeperCommandExecutor.pathFromRevision(revision);
            LogMeta logMeta = (LogMeta)Jackson.readValue((byte[])((byte[])this.curator.getData().forPath(logPath)), LogMeta.class);
            if (skipIfSameReplica && this.replicaId() == logMeta.replicaId()) {
                return Optional.empty();
            }
            byte[] bytes = new byte[logMeta.size()];
            int offset = 0;
            for (long blockId : logMeta.blocks()) {
                String blockPath = ZooKeeperCommandExecutor.absolutePath(LOG_BLOCK_PATH) + '/' + ZooKeeperCommandExecutor.pathFromRevision(blockId);
                byte[] b = (byte[])this.curator.getData().forPath(blockPath);
                System.arraycopy(b, 0, bytes, offset, b.length);
                offset += b.length;
            }
            assert (logMeta.size() == offset);
            ReplicationLog log = (ReplicationLog)Jackson.readValue((byte[])bytes, ReplicationLog.class);
            return Optional.of(log);
        }
        catch (Exception e) {
            logger.error("Failed to load a log at revision {}; entering read-only mode", (Object)revision, (Object)e);
            this.stopLater();
            throw new ReplicationException("failed to load a log at revision " + revision, e);
        }
    }

    private static long revisionFromPath(String path) {
        String[] s = path.split("/");
        return Long.parseLong(s[s.length - 1]);
    }

    private static String pathFromRevision(long revision) {
        return String.format("%010d", revision);
    }

    @Override
    protected <T> CompletableFuture<T> doExecute(Command<T> command) throws Exception {
        CompletableFuture future = new CompletableFuture();
        this.executor.execute(() -> {
            try {
                future.complete(this.blockingExecute(command));
            }
            catch (Throwable t) {
                future.completeExceptionally(t);
            }
        });
        return future;
    }

    private <T> T blockingExecute(Command<T> command) throws Exception {
        this.createParentNodes();
        try (SafeLock ignored = this.safeLock(command.executionPath());){
            List recentRevisions = (List)this.curator.getChildren().forPath(ZooKeeperCommandExecutor.absolutePath(LOG_PATH));
            if (!recentRevisions.isEmpty()) {
                long lastRevision = recentRevisions.stream().mapToLong(Long::parseLong).max().getAsLong();
                this.replayLogs(lastRevision);
            }
            T result = this.delegate.execute(command).get();
            ReplicationLog<T> log = new ReplicationLog<T>(this.replicaId(), command, result);
            long revision = this.storeLog(log);
            logger.debug("logging OK. revision = {}, log = {}", (Object)revision, log);
            T t = result;
            return t;
        }
    }

    private void createParentNodes() throws Exception {
        if (this.createdParentNodes) {
            return;
        }
        this.createZkPathIfMissing(ZooKeeperCommandExecutor.absolutePath(new String[0]));
        this.createZkPathIfMissing(ZooKeeperCommandExecutor.absolutePath(LOG_PATH));
        this.createZkPathIfMissing(ZooKeeperCommandExecutor.absolutePath(LOG_BLOCK_PATH));
        this.createZkPathIfMissing(ZooKeeperCommandExecutor.absolutePath(LOCK_PATH));
        this.createdParentNodes = true;
    }

    private void createZkPathIfMissing(String zkPath) throws Exception {
        try {
            this.curator.create().forPath(zkPath);
        }
        catch (KeeperException.NodeExistsException nodeExistsException) {
            // empty catch block
        }
    }

    private static class LogMeta {
        private final int replicaId;
        private final long timestamp;
        private final int size;
        private final List<Long> blocks = new ArrayList<Long>();

        @JsonCreator
        LogMeta(@JsonProperty(value="replicaId", required=true) int replicaId, @JsonProperty(value="timestamp", defaultValue="0") long timestamp, @JsonProperty(value="size") int size) {
            this.replicaId = replicaId;
            this.timestamp = timestamp;
            this.size = size;
        }

        @JsonProperty
        int replicaId() {
            return this.replicaId;
        }

        @JsonProperty
        long timestamp() {
            return this.timestamp;
        }

        @JsonProperty
        int size() {
            return this.size;
        }

        @JsonProperty
        List<Long> blocks() {
            return Collections.unmodifiableList(this.blocks);
        }

        public void appendBlock(long blockId) {
            this.blocks.add(blockId);
        }
    }

    @FunctionalInterface
    private static interface SafeLock
    extends AutoCloseable {
        @Override
        public void close();
    }

    private static final class ListenerInfo {
        long lastReplayedRevision;
        final Runnable onTakeLeadership;
        final Runnable onReleaseLeadership;

        ListenerInfo(long lastReplayedRevision, @Nullable Runnable onTakeLeadership, @Nullable Runnable onReleaseLeadership) {
            this.lastReplayedRevision = lastReplayedRevision;
            this.onReleaseLeadership = onReleaseLeadership;
            this.onTakeLeadership = onTakeLeadership;
        }
    }

    private class OldLogRemover
    implements LeaderSelectorListener {
        volatile boolean hasLeadership;

        private OldLogRemover() {
        }

        public void stateChanged(CuratorFramework client, ConnectionState newState) {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        public void takeLeadership(CuratorFramework client) throws Exception {
            ListenerInfo listenerInfo = ZooKeeperCommandExecutor.this.listenerInfo;
            if (listenerInfo == null) {
                return;
            }
            logger.info("Taking leadership: {}", (Object)ZooKeeperCommandExecutor.this.replicaId());
            try {
                this.hasLeadership = true;
                if (listenerInfo.onTakeLeadership != null) {
                    listenerInfo.onTakeLeadership.run();
                }
                while (ZooKeeperCommandExecutor.this.curator.getState() == CuratorFrameworkState.STARTED) {
                    this.deleteLogs();
                    OldLogRemover oldLogRemover = this;
                    synchronized (oldLogRemover) {
                        this.wait();
                    }
                }
                return;
            }
            catch (InterruptedException interruptedException) {
                return;
            }
            catch (Exception e) {
                logger.error("Leader stopped due to an unexpected exception:", (Throwable)e);
                return;
            }
            finally {
                this.hasLeadership = false;
                logger.info("Releasing leadership: {}", (Object)ZooKeeperCommandExecutor.this.replicaId());
                if (listenerInfo.onReleaseLeadership != null) {
                    listenerInfo.onReleaseLeadership.run();
                }
                if (ZooKeeperCommandExecutor.this.listenerInfo != null) {
                    ZooKeeperCommandExecutor.this.leaderSelector.requeue();
                }
            }
        }

        public synchronized void touch() {
            this.notify();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void deleteLogs() throws Exception {
            List children = (List)ZooKeeperCommandExecutor.this.curator.getChildren().forPath(ZooKeeperCommandExecutor.absolutePath(new String[]{ZooKeeperCommandExecutor.LOG_PATH}));
            if (children.size() <= ZooKeeperCommandExecutor.this.cfg.maxLogCount()) {
                return;
            }
            long minAllowedTimestamp = System.currentTimeMillis() - ZooKeeperCommandExecutor.this.cfg.minLogAgeMillis();
            int targetCount = children.size() - ZooKeeperCommandExecutor.this.cfg.maxLogCount();
            ArrayList<String> deleted = new ArrayList<String>(targetCount);
            children.sort(Comparator.comparingLong(Long::parseLong));
            try {
                for (int i = 0; i < targetCount; ++i) {
                    String logPath = ZooKeeperCommandExecutor.absolutePath(new String[]{ZooKeeperCommandExecutor.LOG_PATH, (String)children.get(i)});
                    LogMeta meta = (LogMeta)Jackson.readValue((byte[])((byte[])ZooKeeperCommandExecutor.this.curator.getData().forPath(logPath)), LogMeta.class);
                    if (meta.timestamp() >= minAllowedTimestamp) {
                        break;
                    }
                    ArrayList<CuratorOp> ops = new ArrayList<CuratorOp>();
                    ops.add((CuratorOp)ZooKeeperCommandExecutor.this.curator.transactionOp().delete().forPath(logPath));
                    for (long blockId : meta.blocks()) {
                        String blockPath = ZooKeeperCommandExecutor.absolutePath(new String[]{ZooKeeperCommandExecutor.LOG_BLOCK_PATH}) + '/' + ZooKeeperCommandExecutor.pathFromRevision(blockId);
                        ops.add((CuratorOp)ZooKeeperCommandExecutor.this.curator.transactionOp().delete().forPath(blockPath));
                    }
                    ZooKeeperCommandExecutor.this.curator.transaction().forOperations(ops);
                    deleted.add((String)children.get(i));
                }
            }
            finally {
                logger.info("delete logs: {}", deleted);
            }
        }
    }
}

