/*
 * Decompiled with CFR 0.152.
 */
package co.cask.tephra.zookeeper;

import co.cask.tephra.zookeeper.BasicACLData;
import co.cask.tephra.zookeeper.BasicNodeChildren;
import co.cask.tephra.zookeeper.BasicNodeData;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.AbstractService;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.twill.common.Cancellable;
import org.apache.twill.common.Threads;
import org.apache.twill.internal.zookeeper.SettableOperationFuture;
import org.apache.twill.zookeeper.ACLData;
import org.apache.twill.zookeeper.NodeChildren;
import org.apache.twill.zookeeper.NodeData;
import org.apache.twill.zookeeper.OperationFuture;
import org.apache.twill.zookeeper.ZKClientService;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TephraZKClientService
extends AbstractService
implements ZKClientService,
Watcher {
    private static final Logger LOG = LoggerFactory.getLogger(TephraZKClientService.class);
    private final String zkStr;
    private final int sessionTimeout;
    private final List<Watcher> connectionWatchers;
    private final Multimap<String, byte[]> authInfos;
    private final AtomicReference<ZooKeeper> zooKeeper;
    private final Runnable stopTask;
    private ExecutorService eventExecutor;

    public TephraZKClientService(String zkStr, int sessionTimeout, Watcher connectionWatcher, Multimap<String, byte[]> authInfos) {
        this.zkStr = zkStr;
        this.sessionTimeout = sessionTimeout;
        this.connectionWatchers = new CopyOnWriteArrayList<Watcher>();
        this.authInfos = this.copyAuthInfo(authInfos);
        this.addConnectionWatcher(connectionWatcher);
        this.zooKeeper = new AtomicReference();
        this.stopTask = this.createStopTask();
    }

    public Long getSessionId() {
        ZooKeeper zk = this.zooKeeper.get();
        return zk == null ? null : Long.valueOf(zk.getSessionId());
    }

    public String getConnectString() {
        return this.zkStr;
    }

    public Cancellable addConnectionWatcher(final Watcher watcher) {
        if (watcher == null) {
            return new Cancellable(){

                public void cancel() {
                }
            };
        }
        this.connectionWatchers.add(watcher);
        return new Cancellable(){

            public void cancel() {
                TephraZKClientService.this.connectionWatchers.remove(watcher);
            }
        };
    }

    public OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode) {
        return this.create(path, data, createMode, true);
    }

    public OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode, boolean createParent) {
        return this.create(path, data, createMode, createParent, ZooDefs.Ids.OPEN_ACL_UNSAFE);
    }

    public OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode, Iterable<ACL> acl) {
        return this.create(path, data, createMode, true, acl);
    }

    public OperationFuture<Stat> exists(String path) {
        return this.exists(path, null);
    }

    public OperationFuture<NodeChildren> getChildren(String path) {
        return this.getChildren(path, null);
    }

    public OperationFuture<NodeData> getData(String path) {
        return this.getData(path, null);
    }

    public OperationFuture<Stat> setData(String path, byte[] data) {
        return this.setData(path, data, -1);
    }

    public OperationFuture<String> delete(String path) {
        return this.delete(path, -1);
    }

    public OperationFuture<Stat> setACL(String path, Iterable<ACL> acl) {
        return this.setACL(path, acl, -1);
    }

    public OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode, boolean createParent, Iterable<ACL> acl) {
        return this.doCreate(path, data, createMode, createParent, (List<ACL>)ImmutableList.copyOf(acl), false);
    }

    private OperationFuture<String> doCreate(final String path, final @Nullable byte[] data, final CreateMode createMode, boolean createParent, final List<ACL> acl, final boolean ignoreNodeExists) {
        SettableOperationFuture createFuture = SettableOperationFuture.create((String)path, (Executor)this.eventExecutor);
        this.getZooKeeper().create(path, data, acl, createMode, Callbacks.STRING, (Object)createFuture);
        if (!createParent) {
            return createFuture;
        }
        final SettableOperationFuture result = SettableOperationFuture.create((String)path, (Executor)this.eventExecutor);
        Futures.addCallback((ListenableFuture)createFuture, (FutureCallback)new FutureCallback<String>(){

            public void onSuccess(String path2) {
                result.set((Object)path2);
            }

            public void onFailure(Throwable t) {
                if (this.updateFailureResult(t, (SettableOperationFuture<String>)result, path, ignoreNodeExists)) {
                    return;
                }
                String parentPath = this.getParent(path);
                if (parentPath.isEmpty()) {
                    result.setException(t);
                    return;
                }
                Futures.addCallback((ListenableFuture)TephraZKClientService.this.doCreate(parentPath, null, CreateMode.PERSISTENT, true, ZooDefs.Ids.OPEN_ACL_UNSAFE, true), (FutureCallback)new FutureCallback<String>(){

                    public void onSuccess(String parentPath) {
                        Futures.addCallback((ListenableFuture)TephraZKClientService.this.doCreate(path, data, createMode, false, acl, ignoreNodeExists), (FutureCallback)new FutureCallback<String>(){

                            public void onSuccess(String pathResult) {
                                result.set((Object)pathResult);
                            }

                            public void onFailure(Throwable t) {
                                this.updateFailureResult(t, (SettableOperationFuture<String>)result, path, ignoreNodeExists);
                            }
                        });
                    }

                    public void onFailure(Throwable t) {
                        result.setException(t);
                    }
                });
            }

            private boolean updateFailureResult(Throwable t, SettableOperationFuture<String> result2, String path2, boolean ignoreNodeExists2) {
                if (!(t instanceof KeeperException)) {
                    result2.setException(t);
                    return true;
                }
                KeeperException.Code code = ((KeeperException)t).code();
                if (ignoreNodeExists2 && code == KeeperException.Code.NODEEXISTS) {
                    result2.set((Object)path2);
                    return false;
                }
                if (code != KeeperException.Code.NONODE) {
                    result2.setException(t);
                    return true;
                }
                return false;
            }

            private String getParent(String path2) {
                String parentPath = path2.substring(0, path2.lastIndexOf(47));
                return parentPath.isEmpty() && !"/".equals(path2) ? "/" : parentPath;
            }
        });
        return result;
    }

    public OperationFuture<Stat> exists(String path, Watcher watcher) {
        SettableOperationFuture result = SettableOperationFuture.create((String)path, (Executor)this.eventExecutor);
        this.getZooKeeper().exists(path, this.wrapWatcher(watcher), Callbacks.STAT_NONODE, (Object)result);
        return result;
    }

    public OperationFuture<NodeChildren> getChildren(String path, Watcher watcher) {
        SettableOperationFuture result = SettableOperationFuture.create((String)path, (Executor)this.eventExecutor);
        this.getZooKeeper().getChildren(path, this.wrapWatcher(watcher), Callbacks.CHILDREN, (Object)result);
        return result;
    }

    public OperationFuture<NodeData> getData(String path, Watcher watcher) {
        SettableOperationFuture result = SettableOperationFuture.create((String)path, (Executor)this.eventExecutor);
        this.getZooKeeper().getData(path, this.wrapWatcher(watcher), Callbacks.DATA, (Object)result);
        return result;
    }

    public OperationFuture<Stat> setData(String dataPath, byte[] data, int version) {
        SettableOperationFuture result = SettableOperationFuture.create((String)dataPath, (Executor)this.eventExecutor);
        this.getZooKeeper().setData(dataPath, data, version, Callbacks.STAT, (Object)result);
        return result;
    }

    public OperationFuture<String> delete(String deletePath, int version) {
        SettableOperationFuture result = SettableOperationFuture.create((String)deletePath, (Executor)this.eventExecutor);
        this.getZooKeeper().delete(deletePath, version, Callbacks.VOID, (Object)result);
        return result;
    }

    public OperationFuture<ACLData> getACL(String path) {
        SettableOperationFuture result = SettableOperationFuture.create((String)path, (Executor)this.eventExecutor);
        this.getZooKeeper().getACL(path, new Stat(), Callbacks.ACL, (Object)result);
        return result;
    }

    public OperationFuture<Stat> setACL(String path, Iterable<ACL> acl, int version) {
        SettableOperationFuture result = SettableOperationFuture.create((String)path, (Executor)this.eventExecutor);
        this.getZooKeeper().setACL(path, (List)ImmutableList.copyOf(acl), version, Callbacks.STAT, (Object)result);
        return result;
    }

    public Supplier<ZooKeeper> getZooKeeperSupplier() {
        return new Supplier<ZooKeeper>(){

            public ZooKeeper get() {
                return TephraZKClientService.this.getZooKeeper();
            }
        };
    }

    protected void doStart() {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), Threads.createDaemonThreadFactory((String)"zk-client-EventThread"));
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        this.eventExecutor = executor;
        try {
            this.zooKeeper.set(this.createZooKeeper());
        }
        catch (IOException e) {
            this.notifyFailed(e);
        }
    }

    protected void doStop() {
        this.eventExecutor.submit(this.stopTask);
        this.eventExecutor.shutdown();
    }

    private ZooKeeper getZooKeeper() {
        ZooKeeper zk = this.zooKeeper.get();
        Preconditions.checkArgument((zk != null ? 1 : 0) != 0, (Object)"Not connected to zooKeeper.");
        return zk;
    }

    private Watcher wrapWatcher(final Watcher watcher) {
        if (watcher == null) {
            return null;
        }
        return new Watcher(){

            public void process(final WatchedEvent event) {
                if (TephraZKClientService.this.eventExecutor.isShutdown()) {
                    LOG.debug("Already shutdown. Discarding event: {}", (Object)event);
                    return;
                }
                TephraZKClientService.this.eventExecutor.execute(new Runnable(){

                    @Override
                    public void run() {
                        try {
                            watcher.process(event);
                        }
                        catch (Throwable t) {
                            LOG.error("Watcher throws exception.", t);
                        }
                    }
                });
            }
        };
    }

    private Multimap<String, byte[]> copyAuthInfo(Multimap<String, byte[]> authInfos) {
        ArrayListMultimap result = ArrayListMultimap.create();
        for (Map.Entry entry : authInfos.entries()) {
            byte[] info = (byte[])entry.getValue();
            result.put(entry.getKey(), info == null ? null : Arrays.copyOf(info, info.length));
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void process(WatchedEvent event) {
        Service.State state = this.state();
        if (state == Service.State.TERMINATED || state == Service.State.FAILED) {
            return;
        }
        try {
            if (event.getState() == Watcher.Event.KeeperState.SyncConnected && state == Service.State.STARTING) {
                LOG.debug("Connected to ZooKeeper: {}", (Object)this.zkStr);
                this.notifyStarted();
                return;
            }
            if (event.getState() == Watcher.Event.KeeperState.Expired) {
                LOG.info("ZooKeeper session expired: {}", (Object)this.zkStr);
                if (state != Service.State.RUNNING) {
                    return;
                }
                this.eventExecutor.submit(new Runnable(){

                    @Override
                    public void run() {
                        if (TephraZKClientService.this.state() != Service.State.RUNNING) {
                            return;
                        }
                        try {
                            LOG.info("Reconnect to ZooKeeper due to expiration: {}", (Object)TephraZKClientService.this.zkStr);
                            TephraZKClientService.this.closeZooKeeper(TephraZKClientService.this.zooKeeper.getAndSet(TephraZKClientService.this.createZooKeeper()));
                        }
                        catch (IOException e) {
                            TephraZKClientService.this.notifyFailed(e);
                        }
                    }
                });
            }
        }
        finally {
            if (event.getType() == Watcher.Event.EventType.None) {
                for (Watcher connectionWatcher : this.connectionWatchers) {
                    connectionWatcher.process(event);
                }
            }
        }
    }

    private ZooKeeper createZooKeeper() throws IOException {
        ZooKeeper zk = new ZooKeeper(this.zkStr, this.sessionTimeout, this.wrapWatcher(this));
        for (Map.Entry authInfo : this.authInfos.entries()) {
            zk.addAuthInfo((String)authInfo.getKey(), (byte[])authInfo.getValue());
        }
        return zk;
    }

    private void closeZooKeeper(@Nullable ZooKeeper zk) {
        try {
            if (zk != null) {
                zk.close();
            }
        }
        catch (InterruptedException e) {
            LOG.warn("Interrupted when closing ZooKeeper", (Throwable)e);
            Thread.currentThread().interrupt();
        }
    }

    private Runnable createStopTask() {
        return new Runnable(){

            @Override
            public void run() {
                try {
                    TephraZKClientService.this.closeZooKeeper(TephraZKClientService.this.zooKeeper.getAndSet(null));
                    TephraZKClientService.this.notifyStopped();
                }
                catch (Exception e) {
                    TephraZKClientService.this.notifyFailed(e);
                }
            }
        };
    }

    private static final class Callbacks {
        static final AsyncCallback.StringCallback STRING = new AsyncCallback.StringCallback(){

            public void processResult(int rc, String path, Object ctx, String name) {
                SettableOperationFuture result = (SettableOperationFuture)ctx;
                KeeperException.Code code = KeeperException.Code.get((int)rc);
                if (code == KeeperException.Code.OK) {
                    result.set((Object)(name == null || name.isEmpty() ? path : name));
                    return;
                }
                result.setException((Throwable)KeeperException.create((KeeperException.Code)code, (String)result.getRequestPath()));
            }
        };
        static final AsyncCallback.StatCallback STAT = new AsyncCallback.StatCallback(){

            public void processResult(int rc, String path, Object ctx, Stat stat) {
                SettableOperationFuture result = (SettableOperationFuture)ctx;
                KeeperException.Code code = KeeperException.Code.get((int)rc);
                if (code == KeeperException.Code.OK) {
                    result.set((Object)stat);
                    return;
                }
                result.setException((Throwable)KeeperException.create((KeeperException.Code)code, (String)result.getRequestPath()));
            }
        };
        static final AsyncCallback.StatCallback STAT_NONODE = new AsyncCallback.StatCallback(){

            public void processResult(int rc, String path, Object ctx, Stat stat) {
                SettableOperationFuture result = (SettableOperationFuture)ctx;
                KeeperException.Code code = KeeperException.Code.get((int)rc);
                if (code == KeeperException.Code.OK || code == KeeperException.Code.NONODE) {
                    result.set((Object)stat);
                    return;
                }
                result.setException((Throwable)KeeperException.create((KeeperException.Code)code, (String)result.getRequestPath()));
            }
        };
        static final AsyncCallback.Children2Callback CHILDREN = new AsyncCallback.Children2Callback(){

            public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
                SettableOperationFuture result = (SettableOperationFuture)ctx;
                KeeperException.Code code = KeeperException.Code.get((int)rc);
                if (code == KeeperException.Code.OK) {
                    result.set((Object)new BasicNodeChildren(children, stat));
                    return;
                }
                result.setException((Throwable)KeeperException.create((KeeperException.Code)code, (String)result.getRequestPath()));
            }
        };
        static final AsyncCallback.DataCallback DATA = new AsyncCallback.DataCallback(){

            public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
                SettableOperationFuture result = (SettableOperationFuture)ctx;
                KeeperException.Code code = KeeperException.Code.get((int)rc);
                if (code == KeeperException.Code.OK) {
                    result.set((Object)new BasicNodeData(data, stat));
                    return;
                }
                result.setException((Throwable)KeeperException.create((KeeperException.Code)code, (String)result.getRequestPath()));
            }
        };
        static final AsyncCallback.VoidCallback VOID = new AsyncCallback.VoidCallback(){

            public void processResult(int rc, String path, Object ctx) {
                SettableOperationFuture result = (SettableOperationFuture)ctx;
                KeeperException.Code code = KeeperException.Code.get((int)rc);
                if (code == KeeperException.Code.OK) {
                    result.set((Object)result.getRequestPath());
                    return;
                }
                result.setException((Throwable)KeeperException.create((KeeperException.Code)code, (String)result.getRequestPath()));
            }
        };
        static final AsyncCallback.ACLCallback ACL = new AsyncCallback.ACLCallback(){

            public void processResult(int rc, String path, Object ctx, List<ACL> acl, Stat stat) {
                SettableOperationFuture result = (SettableOperationFuture)ctx;
                KeeperException.Code code = KeeperException.Code.get((int)rc);
                if (code == KeeperException.Code.OK) {
                    result.set((Object)new BasicACLData(acl, stat));
                    return;
                }
                result.setException((Throwable)KeeperException.create((KeeperException.Code)code, (String)result.getRequestPath()));
            }
        };

        private Callbacks() {
        }
    }
}

