package org.joyqueue.broker.kafka.session;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import org.apache.commons.lang3.StringUtils;
import org.joyqueue.broker.helper.SessionHelper;
import org.joyqueue.broker.kafka.helper.KafkaClientHelper;
import org.joyqueue.broker.monitor.SessionManager;
import org.joyqueue.message.SourceType;
import org.joyqueue.network.session.Connection;
import org.joyqueue.network.session.Consumer;
import org.joyqueue.network.session.Language;
import org.joyqueue.network.session.Producer;
import org.joyqueue.network.transport.Transport;
import org.joyqueue.toolkit.network.IpUtil;
import org.joyqueue.toolkit.time.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/broker/kafka/session/KafkaConnectionManager.class */
public class KafkaConnectionManager {
    protected static final Logger logger = LoggerFactory.getLogger(KafkaConnectionManager.class);
    private SessionManager sessionManager;

    public KafkaConnectionManager(SessionManager sessionManager) {
        this.sessionManager = sessionManager;
    }

    public void addConnection(Transport transport, String str, String str2) {
        if (SessionHelper.getConnection(transport) != null) {
            return;
        }
        String parseClient = KafkaClientHelper.parseClient(str);
        InetSocketAddress inetSocketAddress = (InetSocketAddress) transport.remoteAddress();
        String generateConnectionId = generateConnectionId(inetSocketAddress, parseClient, str2);
        Connection connection = new Connection();
        connection.setId(generateConnectionId);
        connection.setApp(parseClient);
        connection.setVersion(str2);
        connection.setAddress(IpUtil.toByte(inetSocketAddress));
        connection.setAddressStr(IpUtil.toAddress(inetSocketAddress));
        connection.setHost(((InetSocketAddress) transport.remoteAddress()).getHostString());
        connection.setLanguage(Language.JAVA);
        connection.setSource(SourceType.KAFKA.name());
        connection.setTransport(transport);
        connection.setCreateTime(SystemClock.now());
        if (this.sessionManager.addConnection(connection)) {
            SessionHelper.putIfAbsentConnection(transport, connection);
        }
    }

    public void addProducer(Transport transport, String str) {
        Connection connection = SessionHelper.getConnection(transport);
        String app = connection.getApp();
        if (StringUtils.isNotBlank(connection.getProducer(str, app))) {
            return;
        }
        String generateProducerId = generateProducerId(connection, str);
        Producer producer = new Producer();
        producer.setId(generateProducerId);
        producer.setConnectionId(connection.getId());
        producer.setApp(app);
        producer.setTopic(str);
        producer.setType(Producer.ProducerType.KAFKA);
        this.sessionManager.addProducer(producer);
    }

    public void addConsumer(Transport transport, String str) {
        Connection connection = SessionHelper.getConnection(transport);
        String app = connection.getApp();
        if (StringUtils.isNotBlank(connection.getConsumer(str, app))) {
            return;
        }
        String generateConsumerId = generateConsumerId(connection, str);
        Consumer consumer = new Consumer();
        consumer.setId(generateConsumerId);
        consumer.setConnectionId(connection.getId());
        consumer.setApp(app);
        consumer.setTopic(str);
        consumer.setType(Consumer.ConsumeType.KAFKA);
        this.sessionManager.addConsumer(consumer);
    }

    public void addGroup(Transport transport, String str) {
        Connection connection = SessionHelper.getConnection(transport);
        if (connection == null) {
            return;
        }
        connection.setApp(connection.getApp());
    }

    protected String generateProducerId(Connection connection, String str) {
        InetSocketAddress inetSocketAddress = (InetSocketAddress) connection.getTransport().remoteAddress();
        return String.format("%s-%s_%s_%s-%s", connection.getVersion(), inetSocketAddress.getHostString(), str, Integer.valueOf(inetSocketAddress.getPort()), Long.valueOf(SystemClock.now()));
    }

    protected String generateConsumerId(Connection connection, String str) {
        InetSocketAddress inetSocketAddress = (InetSocketAddress) connection.getTransport().remoteAddress();
        return String.format("%s-%s_%s_%s-%s", connection.getVersion(), inetSocketAddress.getHostString(), str, Integer.valueOf(inetSocketAddress.getPort()), Long.valueOf(SystemClock.now()));
    }

    protected String generateConnectionId(SocketAddress socketAddress, String str, String str2) {
        InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
        return String.format("%s-%s_%s_%s-%s", str2, inetSocketAddress.getHostString(), Integer.valueOf(inetSocketAddress.getPort()), str, Long.valueOf(SystemClock.now()));
    }
}
