/*
 * Decompiled with CFR 0.152.
 */
package io.seata.core.rpc.netty;

import io.netty.channel.Channel;
import io.netty.util.concurrent.EventExecutorGroup;
import io.seata.common.exception.FrameworkErrorCode;
import io.seata.common.exception.FrameworkException;
import io.seata.common.thread.NamedThreadFactory;
import io.seata.common.util.StringUtils;
import io.seata.config.ConfigurationCache;
import io.seata.config.ConfigurationChangeEvent;
import io.seata.config.ConfigurationChangeListener;
import io.seata.config.ConfigurationFactory;
import io.seata.core.model.Resource;
import io.seata.core.model.ResourceManager;
import io.seata.core.protocol.AbstractMessage;
import io.seata.core.protocol.RegisterRMRequest;
import io.seata.core.protocol.RegisterRMResponse;
import io.seata.core.rpc.netty.AbstractNettyRemotingClient;
import io.seata.core.rpc.netty.NettyClientConfig;
import io.seata.core.rpc.netty.NettyPoolKey;
import io.seata.core.rpc.processor.client.ClientHeartbeatProcessor;
import io.seata.core.rpc.processor.client.ClientOnResponseProcessor;
import io.seata.core.rpc.processor.client.RmBranchCommitProcessor;
import io.seata.core.rpc.processor.client.RmBranchRollbackProcessor;
import io.seata.core.rpc.processor.client.RmUndoLogProcessor;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class RmNettyRemotingClient
extends AbstractNettyRemotingClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(RmNettyRemotingClient.class);
    private ResourceManager resourceManager;
    private static volatile RmNettyRemotingClient instance;
    private final AtomicBoolean initialized = new AtomicBoolean(false);
    private static final long KEEP_ALIVE_TIME = Integer.MAX_VALUE;
    private static final int MAX_QUEUE_SIZE = 20000;
    private String applicationId;
    private String transactionServiceGroup;

    @Override
    public void init() {
        this.registerProcessor();
        if (this.initialized.compareAndSet(false, true)) {
            super.init();
            if (this.resourceManager != null && !this.resourceManager.getManagedResources().isEmpty() && StringUtils.isNotBlank((String)this.transactionServiceGroup)) {
                this.getClientChannelManager().reconnect(this.transactionServiceGroup);
            }
        }
    }

    private RmNettyRemotingClient(NettyClientConfig nettyClientConfig, EventExecutorGroup eventExecutorGroup, ThreadPoolExecutor messageExecutor) {
        super(nettyClientConfig, eventExecutorGroup, messageExecutor, NettyPoolKey.TransactionRole.RMROLE);
        this.enableClientBatchSendRequest = ConfigurationFactory.getInstance().getBoolean("transport.enableRmClientBatchSendRequest", ConfigurationFactory.getInstance().getBoolean("transport.enableClientBatchSendRequest", true));
        ConfigurationCache.addConfigListener((String)"transport.enableRmClientBatchSendRequest", (ConfigurationChangeListener[])new ConfigurationChangeListener[]{new ConfigurationChangeListener(){

            public void onChangeEvent(ConfigurationChangeEvent event) {
                String dataId = event.getDataId();
                String newValue = event.getNewValue();
                if ("transport.enableRmClientBatchSendRequest".equals(dataId) && StringUtils.isNotBlank((String)newValue)) {
                    RmNettyRemotingClient.this.enableClientBatchSendRequest = Boolean.parseBoolean(newValue);
                }
            }
        }});
    }

    public static RmNettyRemotingClient getInstance(String applicationId, String transactionServiceGroup) {
        RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance();
        rmNettyRemotingClient.setApplicationId(applicationId);
        rmNettyRemotingClient.setTransactionServiceGroup(transactionServiceGroup);
        return rmNettyRemotingClient;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public static RmNettyRemotingClient getInstance() {
        if (instance != null) return instance;
        Class<RmNettyRemotingClient> clazz = RmNettyRemotingClient.class;
        synchronized (RmNettyRemotingClient.class) {
            if (instance != null) return instance;
            NettyClientConfig nettyClientConfig = new NettyClientConfig();
            ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor(nettyClientConfig.getClientWorkerThreads(), nettyClientConfig.getClientWorkerThreads(), Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(20000), (ThreadFactory)new NamedThreadFactory(nettyClientConfig.getRmDispatchThreadPrefix(), nettyClientConfig.getClientWorkerThreads()), new ThreadPoolExecutor.CallerRunsPolicy());
            instance = new RmNettyRemotingClient(nettyClientConfig, null, messageExecutor);
            // ** MonitorExit[var0] (shouldn't be in output)
            return instance;
        }
    }

    public void setApplicationId(String applicationId) {
        this.applicationId = applicationId;
    }

    public void setTransactionServiceGroup(String transactionServiceGroup) {
        this.transactionServiceGroup = transactionServiceGroup;
    }

    public void setResourceManager(ResourceManager resourceManager) {
        this.resourceManager = resourceManager;
    }

    @Override
    public void onRegisterMsgSuccess(String serverAddress, Channel channel, Object response, AbstractMessage requestMessage) {
        RegisterRMRequest registerRMRequest = (RegisterRMRequest)requestMessage;
        RegisterRMResponse registerRMResponse = (RegisterRMResponse)response;
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("register RM success. client version:{}, server version:{},channel:{}", new Object[]{registerRMRequest.getVersion(), registerRMResponse.getVersion(), channel});
        }
        this.getClientChannelManager().registerChannel(serverAddress, channel);
        String dbKey = this.getMergedResourceKeys();
        if (registerRMRequest.getResourceIds() != null && !registerRMRequest.getResourceIds().equals(dbKey)) {
            this.sendRegisterMessage(serverAddress, channel, dbKey);
        }
    }

    @Override
    public void onRegisterMsgFail(String serverAddress, Channel channel, Object response, AbstractMessage requestMessage) {
        RegisterRMRequest registerRMRequest = (RegisterRMRequest)requestMessage;
        RegisterRMResponse registerRMResponse = (RegisterRMResponse)response;
        String errMsg = String.format("register RM failed. client version: %s,server version: %s, errorMsg: %s, channel: %s", registerRMRequest.getVersion(), registerRMResponse.getVersion(), registerRMResponse.getMsg(), channel);
        throw new FrameworkException(errMsg);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void registerResource(String resourceGroupId, String resourceId) {
        if (StringUtils.isBlank((String)this.transactionServiceGroup)) {
            return;
        }
        if (StringUtils.isBlank((String)resourceId)) {
            LOGGER.warn("The resourceId must not be null or empty when registering the RM client.");
            return;
        }
        if (this.getClientChannelManager().getChannels().isEmpty()) {
            this.getClientChannelManager().reconnect(this.transactionServiceGroup);
            return;
        }
        ConcurrentMap<String, Channel> concurrentMap = this.getClientChannelManager().getChannels();
        synchronized (concurrentMap) {
            for (Map.Entry entry : this.getClientChannelManager().getChannels().entrySet()) {
                String serverAddress = (String)entry.getKey();
                Channel rmChannel = (Channel)entry.getValue();
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("will register resourceId:{}", (Object)resourceId);
                }
                this.sendRegisterMessage(serverAddress, rmChannel, resourceId);
            }
        }
    }

    public void sendRegisterMessage(String serverAddress, Channel channel, String resourceId) {
        RegisterRMRequest message = new RegisterRMRequest(this.applicationId, this.transactionServiceGroup);
        message.setResourceIds(resourceId);
        try {
            super.sendAsyncRequest(channel, message);
        }
        catch (FrameworkException e) {
            if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && serverAddress != null) {
                this.getClientChannelManager().releaseChannel(channel, serverAddress);
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("remove not writable channel:{}", (Object)channel);
                }
            }
            LOGGER.error("register resource failed, channel:{},resourceId:{}", new Object[]{channel, resourceId, e});
        }
    }

    public String getMergedResourceKeys() {
        Map<String, Resource> managedResources = this.resourceManager.getManagedResources();
        Set<String> resourceIds = managedResources.keySet();
        if (!resourceIds.isEmpty()) {
            StringBuilder sb = new StringBuilder();
            boolean first = true;
            for (String resourceId : resourceIds) {
                if (StringUtils.isBlank((String)resourceId)) {
                    LOGGER.warn("The resourceId must not be null or empty when registering the RM client.");
                    continue;
                }
                if (first) {
                    first = false;
                } else {
                    sb.append(",");
                }
                sb.append(resourceId);
            }
            return sb.toString();
        }
        return null;
    }

    @Override
    public void destroy() {
        super.destroy();
        this.initialized.getAndSet(false);
        instance = null;
    }

    @Override
    protected Function<String, NettyPoolKey> getPoolKeyFunction() {
        return serverAddress -> {
            String resourceIds = this.getMergedResourceKeys();
            if (resourceIds != null && LOGGER.isInfoEnabled()) {
                LOGGER.info("RM will register :{}", (Object)resourceIds);
            }
            RegisterRMRequest message = new RegisterRMRequest(this.applicationId, this.transactionServiceGroup);
            message.setResourceIds(resourceIds);
            return new NettyPoolKey(NettyPoolKey.TransactionRole.RMROLE, (String)serverAddress, message);
        };
    }

    @Override
    protected String getTransactionServiceGroup() {
        return this.transactionServiceGroup;
    }

    @Override
    public boolean isEnableClientBatchSendRequest() {
        return this.enableClientBatchSendRequest;
    }

    @Override
    public long getRpcRequestTimeout() {
        return NettyClientConfig.getRpcRmRequestTimeout();
    }

    private void registerProcessor() {
        RmBranchCommitProcessor rmBranchCommitProcessor = new RmBranchCommitProcessor(this.getTransactionMessageHandler(), this);
        super.registerProcessor(3, rmBranchCommitProcessor, this.messageExecutor);
        RmBranchRollbackProcessor rmBranchRollbackProcessor = new RmBranchRollbackProcessor(this.getTransactionMessageHandler(), this);
        super.registerProcessor(5, rmBranchRollbackProcessor, this.messageExecutor);
        RmUndoLogProcessor rmUndoLogProcessor = new RmUndoLogProcessor(this.getTransactionMessageHandler());
        super.registerProcessor(111, rmUndoLogProcessor, this.messageExecutor);
        ClientOnResponseProcessor onResponseProcessor = new ClientOnResponseProcessor(this.mergeMsgMap, super.getFutures(), this.getTransactionMessageHandler());
        super.registerProcessor(60, onResponseProcessor, null);
        super.registerProcessor(12, onResponseProcessor, null);
        super.registerProcessor(14, onResponseProcessor, null);
        super.registerProcessor(22, onResponseProcessor, null);
        super.registerProcessor(104, onResponseProcessor, null);
        super.registerProcessor(121, onResponseProcessor, null);
        ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
        super.registerProcessor(120, clientHeartbeatProcessor, null);
    }
}

