/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.dataproxy.sink.mq;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.flume.Event;
import org.apache.inlong.common.enums.DataProxyErrCode;
import org.apache.inlong.common.monitor.LogCounter;
import org.apache.inlong.common.msg.MsgType;
import org.apache.inlong.common.util.NetworkUtils;
import org.apache.inlong.dataproxy.base.SinkRspEvent;
import org.apache.inlong.dataproxy.sink.mq.PackProfile;
import org.apache.inlong.dataproxy.source.ServerMessageHandler;
import org.apache.inlong.sdk.commons.protocol.InlongId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SimplePackProfile
extends PackProfile {
    private static final Logger logger = LoggerFactory.getLogger(SimplePackProfile.class);
    private static final LogCounter logCounter = new LogCounter(10, 100000, 30000);
    private static final long MINUTE_MS = 60000L;
    private boolean needRspEvent = false;
    private Channel channel;
    private MsgType msgType;
    private Event event;

    public SimplePackProfile(String uid, String inlongGroupId, String inlongStreamId, long dispatchTime) {
        super(uid, inlongGroupId, inlongStreamId, dispatchTime);
    }

    @Override
    public void ack() {
        if (!this.needRspEvent) {
            return;
        }
        this.responseV0Msg(DataProxyErrCode.SUCCESS, "");
    }

    @Override
    public void fail(DataProxyErrCode errCode, String errMsg) {
        if (!this.needRspEvent) {
            return;
        }
        this.responseV0Msg(errCode, errMsg);
    }

    @Override
    public boolean isResend() {
        return !this.needRspEvent && this.enableRetryAfterFailure && (this.maxRetries < 0 || ++this.retries <= this.maxRetries);
    }

    @Override
    public boolean addEvent(Event event, long maxPackCount, long maxPackSize) {
        this.setCount(1L);
        this.setSize(event.getBody().length);
        if (event instanceof SinkRspEvent) {
            SinkRspEvent rspEvent = (SinkRspEvent)event;
            this.needRspEvent = true;
            this.event = rspEvent.getEvent();
            this.channel = rspEvent.getChannel();
            this.msgType = rspEvent.getMsgType();
        } else {
            this.event = event;
            this.needRspEvent = false;
        }
        return true;
    }

    public static SimplePackProfile create(Event event) {
        Map headers = event.getHeaders();
        String inlongGroupId = (String)headers.get("groupId");
        String inlongStreamId = (String)headers.get("streamId");
        String uid = InlongId.generateUid((String)inlongGroupId, (String)inlongStreamId);
        long msgTime = NumberUtils.toLong((String)((String)headers.get("dt")), (long)System.currentTimeMillis());
        long dispatchTime = msgTime - msgTime % 60000L;
        SimplePackProfile profile = new SimplePackProfile(uid, inlongGroupId, inlongStreamId, dispatchTime);
        profile.setCount(1L);
        profile.setSize(event.getBody().length);
        if (event instanceof SinkRspEvent) {
            SinkRspEvent rspEvent = (SinkRspEvent)event;
            profile.needRspEvent = true;
            profile.event = rspEvent.getEvent();
            profile.channel = rspEvent.getChannel();
            profile.msgType = rspEvent.getMsgType();
        } else {
            profile.event = event;
        }
        return profile;
    }

    public Event getEvent() {
        return this.event;
    }

    public Map<String, String> getProperties() {
        return this.event.getHeaders();
    }

    public Map<String, String> getPropsToMQ(long sendTime) {
        HashMap<String, String> result = new HashMap<String, String>();
        result.put("rt", (String)this.event.getHeaders().get("rt"));
        result.put("st", String.valueOf(sendTime));
        result.put("msgEnType", (String)this.event.getHeaders().get("msgEnType"));
        result.put("version", (String)this.event.getHeaders().get("version"));
        result.put("srcIp", (String)this.event.getHeaders().get("srcIp"));
        result.put("dpIp", NetworkUtils.getLocalIp());
        return result;
    }

    private void responseV0Msg(DataProxyErrCode errCode, String errMsg) {
        block13: {
            String uniqId = (String)this.event.getHeaders().get("uniq");
            if ("false".equals(this.event.getHeaders().get("isAck"))) {
                if (logger.isDebugEnabled()) {
                    logger.debug("not need to rsp message: uniqId = {}, inlongGroupId = {}, inlongStreamId = {}", new Object[]{uniqId, this.getInlongGroupId(), this.getInlongStreamId()});
                }
                return;
            }
            if (this.channel == null || !this.channel.isWritable()) {
                if (logCounter.shouldPrint()) {
                    logger.warn("Prepare send msg but channel full, msgType={}, attr={}, channel={}", new Object[]{this.msgType, this.event.getHeaders(), this.channel});
                }
                return;
            }
            try {
                String origAttr;
                StringBuilder strBuff = new StringBuilder(512);
                if (errCode != DataProxyErrCode.SUCCESS) {
                    strBuff.append("errCode").append("=").append(errCode.getErrCodeStr());
                    if (StringUtils.isNotEmpty((CharSequence)errMsg)) {
                        strBuff.append("&").append("errMsg").append("=").append(errMsg);
                    }
                }
                if (StringUtils.isNotEmpty((CharSequence)(origAttr = this.event.getHeaders().getOrDefault("attrs", "")))) {
                    if (strBuff.length() > 0) {
                        strBuff.append("&").append(origAttr);
                    } else {
                        strBuff.append(origAttr);
                    }
                }
                ByteBuf retData = MsgType.MSG_BIN_MULTI_BODY.equals((Object)this.msgType) ? ServerMessageHandler.buildBinMsgRspPackage(strBuff.toString(), Long.parseLong(uniqId)) : ServerMessageHandler.buildTxtMsgRspPackage(this.msgType, strBuff.toString());
                strBuff.delete(0, strBuff.length());
                if (this.channel == null || !this.channel.isWritable()) {
                    retData.release();
                    if (logCounter.shouldPrint()) {
                        logger.warn("Send msg but channel full, attr={}, channel={}", (Object)this.event.getHeaders(), (Object)this.channel);
                    }
                    return;
                }
                this.channel.writeAndFlush((Object)retData);
            }
            catch (Throwable e) {
                if (!logCounter.shouldPrint()) break block13;
                logger.warn("Send msg but failure, attr={}", (Object)this.event.getHeaders(), (Object)e);
            }
        }
    }
}

