/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import java.net.InetAddress;
import kafka.cluster.Broker;
import kafka.log.LogManager;
import kafka.server.KafkaConfig;
import kafka.server.KafkaZooKeeper$;
import kafka.server.KafkaZooKeeper$SessionExpireListener$;
import kafka.utils.StringSerializer$;
import kafka.utils.ZkUtils$;
import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.log4j.Logger;
import org.apache.zookeeper.Watcher;
import scala.Function0;
import scala.Function1;
import scala.ScalaObject;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
@ScalaSignature(bytes="\u0006\u0001\u0005\u001dd\u0001C\u0001\u0003\t\u0003\u0005\t\u0011A\u0004\u0003\u001d-\u000bgm[1[_>\\U-\u001a9fe*\u00111\u0001B\u0001\u0007g\u0016\u0014h/\u001a:\u000b\u0003\u0015\tQa[1gW\u0006\u001c\u0001aE\u0002\u0001\u0011A\u0001\"!\u0003\b\u000e\u0003)Q!a\u0003\u0007\u0002\t1\fgn\u001a\u0006\u0002\u001b\u0005!!.\u0019<b\u0013\ty!B\u0001\u0004PE*,7\r\u001e\t\u0003#Qi\u0011A\u0005\u0006\u0002'\u0005)1oY1mC&\u0011QC\u0005\u0002\f'\u000e\fG.Y(cU\u0016\u001cG\u000f\u0003\u0005\u0018\u0001\t\u0005\t\u0015!\u0003\u0019\u0003\u0019\u0019wN\u001c4jOB\u0011\u0011DG\u0007\u0002\u0005%\u00111D\u0001\u0002\f\u0017\u000647.Y\"p]\u001aLw\r\u0003\u0005\u001e\u0001\t\u0005\t\u0015!\u0003\u001f\u0003)awnZ'b]\u0006<WM\u001d\t\u0003?\tj\u0011\u0001\t\u0006\u0003C\u0011\t1\u0001\\8h\u0013\t\u0019\u0003E\u0001\u0006M_\u001el\u0015M\\1hKJDQ!\n\u0001\u0005\u0002\u0019\na\u0001P5oSRtDcA\u0014)SA\u0011\u0011\u0004\u0001\u0005\u0006/\u0011\u0002\r\u0001\u0007\u0005\u0006;\u0011\u0002\rA\b\u0005\bW\u0001\u0011\r\u0011\"\u0003-\u0003\u0019awnZ4feV\tQ\u0006\u0005\u0002/k5\tqF\u0003\u00021c\u0005)An\\45U*\u0011!gM\u0001\u0007CB\f7\r[3\u000b\u0003Q\n1a\u001c:h\u0013\t1tF\u0001\u0004M_\u001e<WM\u001d\u0005\u0007q\u0001\u0001\u000b\u0011B\u0017\u0002\u000f1|wmZ3sA!9!\b\u0001b\u0001\n\u0003Y\u0014\u0001\u00042s_.,'/\u00133QCRDW#\u0001\u001f\u0011\u0005%i\u0014B\u0001 \u000b\u0005\u0019\u0019FO]5oO\"1\u0001\t\u0001Q\u0001\nq\nQB\u0019:pW\u0016\u0014\u0018\n\u001a)bi\"\u0004\u0003b\u0002\"\u0001\u0001\u0004%\taQ\u0001\tu.\u001cE.[3oiV\tA\t\u0005\u0002F\u00156\taI\u0003\u0002H\u0011\u0006A!p[2mS\u0016tGO\u0003\u0002Jg\u00051\u0011\nM%uK\u000eL!a\u0013$\u0003\u0011i[7\t\\5f]RDq!\u0014\u0001A\u0002\u0013\u0005a*\u0001\u0007{W\u000ec\u0017.\u001a8u?\u0012*\u0017\u000f\u0006\u0002P%B\u0011\u0011\u0003U\u0005\u0003#J\u0011A!\u00168ji\"91\u000bTA\u0001\u0002\u0004!\u0015a\u0001=%c!1Q\u000b\u0001Q!\n\u0011\u000b\u0011B_6DY&,g\u000e\u001e\u0011\t\u000f]\u0003\u0001\u0019!C\u00011\u00061Ao\u001c9jGN,\u0012!\u0017\t\u00045\n,gBA.a\u001d\tav,D\u0001^\u0015\tqf!\u0001\u0004=e>|GOP\u0005\u0002'%\u0011\u0011ME\u0001\ba\u0006\u001c7.Y4f\u0013\t\u0019GM\u0001\u0003MSN$(BA1\u0013!\t1\u0017N\u0004\u0002\u0012O&\u0011\u0001NE\u0001\u0007!J,G-\u001a4\n\u0005yR'B\u00015\u0013\u0011\u001da\u0007\u00011A\u0005\u00025\f!\u0002^8qS\u000e\u001cx\fJ3r)\tye\u000eC\u0004TW\u0006\u0005\t\u0019A-\t\rA\u0004\u0001\u0015)\u0003Z\u0003\u001d!x\u000e]5dg\u0002BqA\u001d\u0001C\u0002\u0013\u00051/\u0001\u0003m_\u000e\\W#\u0001\u0005\t\rU\u0004\u0001\u0015!\u0003\t\u0003\u0015awnY6!\u0011\u00159\b\u0001\"\u0001y\u0003\u001d\u0019H/\u0019:ukB$\u0012a\u0014\u0005\u0006u\u0002!\t\u0001_\u0001\u0013e\u0016<\u0017n\u001d;fe\n\u0013xn[3s\u0013:T6\u000eC\u0003}\u0001\u0011\u0005Q0A\tsK\u001eL7\u000f^3s)>\u0004\u0018nY%o5.$\"a\u0014@\t\u000b}\\\b\u0019A3\u0002\u000bQ|\u0007/[2\t\u000f\u0005\r\u0001\u0001\"\u0001\u0002\u0006\u0005I\"/Z4jgR,'\u000fV8qS\u000eLeNW6J]R,'O\\1m)\ry\u0015q\u0001\u0005\u0007\u007f\u0006\u0005\u0001\u0019A3\u0007\u0015\u0005-\u0001\u0001\"A\u0001\u0002\u0003\tiAA\u000bTKN\u001c\u0018n\u001c8FqBL'/\u001a'jgR,g.\u001a:\u0014\r\u0005%\u0001\"a\u0004\u0011!\r)\u0015\u0011C\u0005\u0004\u0003'1%\u0001E%[WN#\u0018\r^3MSN$XM\\3s\u0011\u001d)\u0013\u0011\u0002C\u0001\u0003/!\"!!\u0007\u0011\t\u0005m\u0011\u0011B\u0007\u0002\u0001!A\u0011qDA\u0005\t\u0003\t\t#\u0001\niC:$G.Z*uCR,7\t[1oO\u0016$GcA(\u0002$!A\u0011QEA\u000f\u0001\u0004\t9#A\u0003ti\u0006$X\r\u0005\u0003\u0002*\u0005\u001dc\u0002BA\u0016\u0003\u0003rA!!\f\u0002<9!\u0011qFA\u001c\u001d\u0011\t\t$!\u000e\u000f\u0007q\u000b\u0019$C\u00015\u0013\t\u00114'C\u0002\u0002:E\n\u0011B_8pW\u0016,\u0007/\u001a:\n\t\u0005u\u0012qH\u0001\b/\u0006$8\r[3s\u0015\r\tI$M\u0005\u0005\u0003\u0007\n)%A\u0003Fm\u0016tGO\u0003\u0003\u0002>\u0005}\u0012\u0002BA%\u0003\u0017\u00121bS3fa\u0016\u00148\u000b^1uK*!\u00111IA#Q\u0019\ti\"a\u0014\u0002VA\u0019\u0011#!\u0015\n\u0007\u0005M#C\u0001\u0004uQJ|wo]\u0012\u0003\u0003/\u00022AWA-\u0013\r\tY\u0006\u001a\u0002\n\u000bb\u001cW\r\u001d;j_:Dq!a\u0018\u0002\n\u0011\u0005\u00010\u0001\tiC:$G.\u001a(foN+7o]5p]\"2\u0011QLA(\u0003+Ba!!\u001a\u0001\t\u0003A\u0018!B2m_N,\u0007")
public class KafkaZooKeeper
implements ScalaObject {
    public final KafkaConfig kafka$server$KafkaZooKeeper$$config;
    private final LogManager logManager;
    private final Logger kafka$server$KafkaZooKeeper$$logger;
    private final String brokerIdPath;
    private ZkClient zkClient;
    private List<String> topics;
    private final Object lock;

    public final Logger kafka$server$KafkaZooKeeper$$logger() {
        return this.kafka$server$KafkaZooKeeper$$logger;
    }

    public String brokerIdPath() {
        return this.brokerIdPath;
    }

    public ZkClient zkClient() {
        return this.zkClient;
    }

    public void zkClient_$eq(ZkClient zkClient) {
        this.zkClient = zkClient;
    }

    public List<String> topics() {
        return this.topics;
    }

    public void topics_$eq(List<String> list) {
        this.topics = list;
    }

    public Object lock() {
        return this.lock;
    }

    public void startup() {
        this.kafka$server$KafkaZooKeeper$$logger().info((Object)new StringBuilder().append((Object)"connecting to ZK: ").append((Object)this.kafka$server$KafkaZooKeeper$$config.zkConnect()).toString());
        this.zkClient_$eq(new ZkClient(this.kafka$server$KafkaZooKeeper$$config.zkConnect(), this.kafka$server$KafkaZooKeeper$$config.zkSessionTimeoutMs(), this.kafka$server$KafkaZooKeeper$$config.zkConnectionTimeoutMs(), (ZkSerializer)StringSerializer$.MODULE$));
        this.zkClient().subscribeStateChanges((IZkStateListener)new SessionExpireListener());
    }

    public void registerBrokerInZk() {
        this.kafka$server$KafkaZooKeeper$$logger().info((Object)new StringBuilder().append((Object)"Registering broker ").append((Object)this.brokerIdPath()).toString());
        String hostName = this.kafka$server$KafkaZooKeeper$$config.hostName() == null ? InetAddress.getLocalHost().getHostAddress() : this.kafka$server$KafkaZooKeeper$$config.hostName();
        String creatorId = new StringBuilder().append((Object)hostName).append((Object)"-").append((Object)BoxesRunTime.boxToLong((long)System.currentTimeMillis())).toString();
        Broker broker = new Broker(this.kafka$server$KafkaZooKeeper$$config.brokerId(), creatorId, hostName, this.kafka$server$KafkaZooKeeper$$config.port());
        ZkUtils$.MODULE$.createEphemeralPathExpectConflict(this.zkClient(), this.brokerIdPath(), broker.getZKString());
        this.kafka$server$KafkaZooKeeper$$logger().info((Object)new StringBuilder().append((Object)"Registering broker ").append((Object)this.brokerIdPath()).append((Object)" succeeded with ").append((Object)broker).toString());
    }

    public void registerTopicInZk(String topic) {
        this.registerTopicInZkInternal(topic);
        Object object = this.lock();
        synchronized (object) {
            this.topics_$eq((List<String>)this.topics().$colon$colon((Object)topic));
            return;
        }
    }

    public void registerTopicInZkInternal(String topic) {
        String brokerTopicPath = new StringBuilder().append((Object)ZkUtils$.MODULE$.brokerTopicsPath()).append((Object)"/").append((Object)topic).append((Object)"/").append((Object)BoxesRunTime.boxToInteger((int)this.kafka$server$KafkaZooKeeper$$config.brokerId())).toString();
        int numParts = BoxesRunTime.unboxToInt((Object)this.logManager.getTopicPartitionsMap().getOrElse((Object)topic, (Function0)new $anonfun$1(this)));
        this.kafka$server$KafkaZooKeeper$$logger().info((Object)new StringBuilder().append((Object)"Begin registering broker topic ").append((Object)brokerTopicPath).append((Object)" with ").append((Object)((Object)BoxesRunTime.boxToInteger((int)numParts)).toString()).append((Object)" partitions").toString());
        ZkUtils$.MODULE$.createEphemeralPathExpectConflict(this.zkClient(), brokerTopicPath, ((Object)BoxesRunTime.boxToInteger((int)numParts)).toString());
        this.kafka$server$KafkaZooKeeper$$logger().info((Object)new StringBuilder().append((Object)"End registering broker topic ").append((Object)brokerTopicPath).toString());
    }

    public void close() {
        if (this.zkClient() != null) {
            this.kafka$server$KafkaZooKeeper$$logger().info((Object)"Closing zookeeper client...");
            this.zkClient().close();
        }
    }

    public KafkaZooKeeper(KafkaConfig config, LogManager logManager) {
        this.kafka$server$KafkaZooKeeper$$config = config;
        this.logManager = logManager;
        this.kafka$server$KafkaZooKeeper$$logger = Logger.getLogger(KafkaZooKeeper.class);
        this.brokerIdPath = new StringBuilder().append((Object)ZkUtils$.MODULE$.brokerIdsPath()).append((Object)"/").append((Object)BoxesRunTime.boxToInteger((int)config.brokerId())).toString();
        this.zkClient = null;
        this.topics = Nil$.MODULE$;
        this.lock = new Object();
    }

    public class SessionExpireListener
    implements IZkStateListener,
    ScalaObject {
        public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
        }

        /*
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        public void handleNewSession() throws Exception {
            this.kafka$server$KafkaZooKeeper$SessionExpireListener$$$outer().kafka$server$KafkaZooKeeper$$logger().info((Object)new StringBuilder().append((Object)"re-registering broker info in ZK for broker ").append((Object)BoxesRunTime.boxToInteger((int)this.kafka$server$KafkaZooKeeper$SessionExpireListener$$$outer().kafka$server$KafkaZooKeeper$$config.brokerId())).toString());
            this.kafka$server$KafkaZooKeeper$SessionExpireListener$$$outer().registerBrokerInZk();
            Object object = this.kafka$server$KafkaZooKeeper$SessionExpireListener$$$outer().lock();
            synchronized (object) {
                this.kafka$server$KafkaZooKeeper$SessionExpireListener$$$outer().kafka$server$KafkaZooKeeper$$logger().info((Object)new StringBuilder().append((Object)"re-registering broker topics in ZK for broker ").append((Object)BoxesRunTime.boxToInteger((int)this.kafka$server$KafkaZooKeeper$SessionExpireListener$$$outer().kafka$server$KafkaZooKeeper$$config.brokerId())).toString());
                this.kafka$server$KafkaZooKeeper$SessionExpireListener$$$outer().topics().foreach((Function1)new SessionExpireListener$$anonfun$handleNewSession$1(this));
            }
            this.kafka$server$KafkaZooKeeper$SessionExpireListener$$$outer().kafka$server$KafkaZooKeeper$$logger().info((Object)"done re-registering broker");
        }

        public /* synthetic */ KafkaZooKeeper kafka$server$KafkaZooKeeper$SessionExpireListener$$$outer() {
            return KafkaZooKeeper.this;
        }

        public SessionExpireListener() {
            if (KafkaZooKeeper.this == null) {
                throw new NullPointerException();
            }
        }
    }
}

