/*
 * Decompiled with CFR 0.152.
 */
package org.joyqueue.broker.kafka.session;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import org.apache.commons.lang3.StringUtils;
import org.joyqueue.broker.helper.SessionHelper;
import org.joyqueue.broker.kafka.helper.KafkaClientHelper;
import org.joyqueue.broker.monitor.SessionManager;
import org.joyqueue.message.SourceType;
import org.joyqueue.network.session.Connection;
import org.joyqueue.network.session.Consumer;
import org.joyqueue.network.session.Language;
import org.joyqueue.network.session.Producer;
import org.joyqueue.network.transport.Transport;
import org.joyqueue.toolkit.network.IpUtil;
import org.joyqueue.toolkit.time.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaConnectionManager {
    protected static final Logger logger = LoggerFactory.getLogger(KafkaConnectionManager.class);
    private SessionManager sessionManager;

    public KafkaConnectionManager(SessionManager sessionManager) {
        this.sessionManager = sessionManager;
    }

    public void addConnection(Transport transport, String clientId, String version) {
        Connection connection = SessionHelper.getConnection((Transport)transport);
        if (connection != null) {
            return;
        }
        clientId = KafkaClientHelper.parseClient(clientId);
        InetSocketAddress remoteAddress = (InetSocketAddress)transport.remoteAddress();
        String id = this.generateConnectionId(remoteAddress, clientId, version);
        connection = new Connection();
        connection.setId(id);
        connection.setApp(clientId);
        connection.setVersion(version);
        connection.setAddress(IpUtil.toByte((InetSocketAddress)remoteAddress));
        connection.setAddressStr(IpUtil.toAddress((SocketAddress)remoteAddress));
        connection.setHost(((InetSocketAddress)transport.remoteAddress()).getHostString());
        connection.setLanguage(Language.JAVA);
        connection.setSource(SourceType.KAFKA.name());
        connection.setTransport(transport);
        connection.setCreateTime(SystemClock.now());
        if (this.sessionManager.addConnection(connection)) {
            SessionHelper.putIfAbsentConnection((Transport)transport, (Connection)connection);
        }
    }

    public void addProducer(Transport transport, String topic) {
        String app;
        Connection connection = SessionHelper.getConnection((Transport)transport);
        String producerId = connection.getProducer(topic, app = connection.getApp());
        if (StringUtils.isNotBlank((CharSequence)producerId)) {
            return;
        }
        String id = this.generateProducerId(connection, topic);
        Producer producer = new Producer();
        producer.setId(id);
        producer.setConnectionId(connection.getId());
        producer.setApp(app);
        producer.setTopic(topic);
        producer.setType(Producer.ProducerType.KAFKA);
        this.sessionManager.addProducer(producer);
    }

    public void addConsumer(Transport transport, String topic) {
        String app;
        Connection connection = SessionHelper.getConnection((Transport)transport);
        String consumerId = connection.getConsumer(topic, app = connection.getApp());
        if (StringUtils.isNotBlank((CharSequence)consumerId)) {
            return;
        }
        String id = this.generateConsumerId(connection, topic);
        Consumer consumer = new Consumer();
        consumer.setId(id);
        consumer.setConnectionId(connection.getId());
        consumer.setApp(app);
        consumer.setTopic(topic);
        consumer.setType(Consumer.ConsumeType.KAFKA);
        this.sessionManager.addConsumer(consumer);
    }

    public void addGroup(Transport transport, String group) {
        Connection connection = SessionHelper.getConnection((Transport)transport);
        if (connection == null) {
            return;
        }
        String app = connection.getApp();
        connection.setApp(app);
    }

    protected String generateProducerId(Connection connection, String topic) {
        InetSocketAddress inetRemoteAddress = (InetSocketAddress)connection.getTransport().remoteAddress();
        return String.format("%s-%s_%s_%s-%s", connection.getVersion(), inetRemoteAddress.getHostString(), topic, inetRemoteAddress.getPort(), SystemClock.now());
    }

    protected String generateConsumerId(Connection connection, String topic) {
        InetSocketAddress inetRemoteAddress = (InetSocketAddress)connection.getTransport().remoteAddress();
        return String.format("%s-%s_%s_%s-%s", connection.getVersion(), inetRemoteAddress.getHostString(), topic, inetRemoteAddress.getPort(), SystemClock.now());
    }

    protected String generateConnectionId(SocketAddress remoteAddress, String clientId, String version) {
        InetSocketAddress inetRemoteAddress = (InetSocketAddress)remoteAddress;
        return String.format("%s-%s_%s_%s-%s", version, inetRemoteAddress.getHostString(), inetRemoteAddress.getPort(), clientId, SystemClock.now());
    }
}

