/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.sink;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.Reader;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import net.sf.jsqlparser.JSQLParserException;
import net.sf.jsqlparser.parser.CCJSqlParserManager;
import net.sf.jsqlparser.statement.Statement;
import net.sf.jsqlparser.statement.create.table.ColDataType;
import net.sf.jsqlparser.statement.create.table.ColumnDefinition;
import net.sf.jsqlparser.statement.create.table.CreateTable;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.OperationTarget;
import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.common.enums.StreamStatus;
import org.apache.inlong.manager.common.enums.TenantUserTypeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.entity.SortConfigEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
import org.apache.inlong.manager.dao.mapper.SortConfigEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterDTO;
import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.OrderFieldEnum;
import org.apache.inlong.manager.pojo.common.OrderTypeEnum;
import org.apache.inlong.manager.pojo.common.PageRequest;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.UpdateResult;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaInfo;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarDTO;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.pojo.group.tubemq.InlongTubeMQInfo;
import org.apache.inlong.manager.pojo.sink.ParseFieldRequest;
import org.apache.inlong.manager.pojo.sink.SinkApproveDTO;
import org.apache.inlong.manager.pojo.sink.SinkBriefInfo;
import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
import org.apache.inlong.manager.pojo.sink.SinkRequest;
import org.apache.inlong.manager.pojo.sink.StreamSink;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.user.UserInfo;
import org.apache.inlong.manager.service.group.GroupCheckService;
import org.apache.inlong.manager.service.sink.SinkOperatorFactory;
import org.apache.inlong.manager.service.sink.StreamSinkOperator;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.inlong.manager.service.stream.InlongStreamProcessService;
import org.apache.inlong.manager.service.user.UserService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class StreamSinkServiceImpl
implements StreamSinkService {
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamSinkServiceImpl.class);
    private static final Pattern PARSE_FIELD_CSV_SPLITTER = Pattern.compile("[\t\\s,]");
    private static final int PARSE_FIELD_CSV_MAX_COLUMNS = 3;
    private static final int PARSE_FIELD_CSV_MIN_COLUMNS = 2;
    @Autowired
    private SortConfigEntityMapper sortConfigEntityMapper;
    @Autowired
    private InlongClusterEntityMapper clusterEntityMapper;
    @Autowired
    private SinkOperatorFactory operatorFactory;
    @Autowired
    private GroupCheckService groupCheckService;
    @Autowired
    private InlongStreamEntityMapper streamMapper;
    @Autowired
    private InlongGroupEntityMapper groupMapper;
    @Autowired
    private StreamSinkEntityMapper sinkMapper;
    @Autowired
    private StreamSinkFieldEntityMapper sinkFieldMapper;
    @Autowired
    private AutowireCapableBeanFactory autowireCapableBeanFactory;
    @Autowired
    private ObjectMapper objectMapper;
    @Autowired
    private UserService userService;
    private InlongStreamProcessService streamProcessOperation;

    @Override
    @Transactional(rollbackFor={Throwable.class})
    public Integer save(SinkRequest request, String operator) {
        LOGGER.info("begin to save sink info: {}", (Object)request);
        this.checkParams(request);
        String groupId = request.getInlongGroupId();
        this.groupCheckService.checkGroupStatus(groupId, operator);
        InlongGroupEntity groupEntity = this.groupMapper.selectByGroupId(groupId);
        if (groupEntity == null) {
            throw new BusinessException(String.format("InlongGroup does not exist with InlongGroupId=%s", groupId));
        }
        this.userService.checkUser(groupEntity.getInCharges() + "," + groupEntity.getFollowers(), operator, "Current user does not have permission to create sink info");
        String streamId = request.getInlongStreamId();
        String sinkName = request.getSinkName();
        InlongStreamEntity streamEntity = this.streamMapper.selectByIdentifier(groupId, streamId);
        Preconditions.expectNotNull((Object)streamEntity, (String)ErrorCodeEnum.STREAM_NOT_FOUND.getMessage());
        StreamSinkEntity exists = this.sinkMapper.selectByUniqueKey(groupId, streamId, sinkName);
        if (exists != null && exists.getSinkName().equals(sinkName)) {
            String err = "sink name=%s already exists with the groupId=%s streamId=%s";
            throw new BusinessException(String.format(err, sinkName, groupId, streamId));
        }
        StreamSinkOperator sinkOperator = this.operatorFactory.getInstance(request.getSinkType());
        List fields = request.getSinkFieldList();
        if (CollectionUtils.isNotEmpty((Collection)fields)) {
            fields.forEach(sinkField -> sinkField.setId(null));
        }
        int id = sinkOperator.saveOpt(request, operator);
        boolean streamSuccess = StreamStatus.CONFIG_SUCCESSFUL.getCode().equals(streamEntity.getStatus());
        if (streamSuccess || StreamStatus.CONFIG_FAILED.getCode().equals(streamEntity.getStatus())) {
            SinkStatus nextStatus;
            boolean enableCreateResource = InlongConstants.ENABLE_CREATE_RESOURCE.equals(request.getEnableCreateResource());
            SinkStatus sinkStatus = nextStatus = request.getStartProcess() != false ? SinkStatus.CONFIG_ING : SinkStatus.NEW;
            if (!enableCreateResource) {
                nextStatus = SinkStatus.CONFIG_SUCCESSFUL;
            }
            StreamSinkEntity sinkEntity = this.sinkMapper.selectByPrimaryKey(Integer.valueOf(id));
            sinkEntity.setStatus(nextStatus.getCode());
            this.sinkMapper.updateStatus(sinkEntity);
        }
        if (streamSuccess && request.getStartProcess().booleanValue()) {
            this.startProcessForSink(groupId, streamId, operator);
        }
        LOGGER.info("success to save sink info: {}", (Object)request);
        return id;
    }

    @Override
    public List<BatchResult> batchSave(List<SinkRequest> requestList, String operator) {
        ArrayList<BatchResult> resultList = new ArrayList<BatchResult>();
        for (SinkRequest request : requestList) {
            BatchResult result = BatchResult.builder().uniqueKey(request.getInlongGroupId() + "-" + request.getInlongStreamId() + "-" + request.getSinkName()).operationTarget(OperationTarget.SINK).build();
            try {
                this.save(request, operator);
                result.setSuccess(true);
            }
            catch (Exception e) {
                LOGGER.error("failed to save save source info for sinkName={}, groupId={}, streamId={}", new Object[]{request.getSinkName(), request.getInlongGroupId(), request.getInlongStreamId(), e});
                result.setSuccess(false);
                result.setErrMsg(e.getMessage());
            }
            resultList.add(result);
        }
        return resultList;
    }

    @Override
    public StreamSink get(Integer id) {
        if (id == null) {
            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "sink id is empty");
        }
        StreamSinkEntity entity = this.sinkMapper.selectByPrimaryKey(id);
        if (entity == null) {
            throw new BusinessException(ErrorCodeEnum.SINK_INFO_NOT_FOUND, String.format("sink not found by id=%s", id));
        }
        InlongGroupEntity groupEntity = this.groupMapper.selectByGroupId(entity.getInlongGroupId());
        if (groupEntity == null) {
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
        }
        StreamSinkOperator sinkOperator = this.operatorFactory.getInstance(entity.getSinkType());
        return sinkOperator.getFromEntity(entity);
    }

    @Override
    public Integer getCount(String groupId, String streamId) {
        Integer count = this.sinkMapper.selectCount(groupId, streamId);
        LOGGER.debug("sink count={} with groupId={}, streamId={}", new Object[]{count, groupId, streamId});
        return count;
    }

    @Override
    public List<StreamSink> listSink(String groupId, String streamId) {
        if (StringUtils.isBlank((CharSequence)groupId)) {
            throw new BusinessException(ErrorCodeEnum.GROUP_ID_IS_EMPTY, "groupId id is blank");
        }
        List entityList = this.sinkMapper.selectByRelatedId(groupId, streamId);
        if (CollectionUtils.isEmpty((Collection)entityList)) {
            return Collections.emptyList();
        }
        ArrayList<StreamSink> responseList = new ArrayList<StreamSink>();
        entityList.forEach(entity -> responseList.add(this.get(entity.getId())));
        return responseList;
    }

    @Override
    public List<SinkBriefInfo> listBrief(String groupId, String streamId) {
        Preconditions.expectNotBlank((String)groupId, (ErrorCodeEnum)ErrorCodeEnum.GROUP_ID_IS_EMPTY);
        Preconditions.expectNotBlank((String)streamId, (ErrorCodeEnum)ErrorCodeEnum.STREAM_ID_IS_EMPTY);
        List summaryList = this.sinkMapper.selectSummary(groupId, streamId);
        LOGGER.debug("success to list sink summary by groupId={}, streamId={}", (Object)groupId, (Object)streamId);
        return summaryList;
    }

    @Override
    public Map<String, List<StreamSink>> getSinksMap(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfos) {
        String groupId = groupInfo.getInlongGroupId();
        LOGGER.debug("begin to get sink map for groupId={}", (Object)groupId);
        List<StreamSink> streamSinks = this.listSink(groupId, null);
        Map result = streamSinks.stream().collect(Collectors.groupingBy(StreamSink::getInlongStreamId, HashMap::new, Collectors.toCollection(ArrayList::new)));
        LOGGER.debug("success to get sink map, size={}, groupInfo={}", (Object)result.size(), (Object)groupInfo);
        return result;
    }

    @Override
    public PageResult<? extends StreamSink> listByCondition(SinkPageRequest request, String operator) {
        Preconditions.expectNotBlank((String)request.getInlongGroupId(), (ErrorCodeEnum)ErrorCodeEnum.GROUP_ID_IS_EMPTY);
        PageHelper.startPage((int)request.getPageNum(), (int)request.getPageSize());
        OrderFieldEnum.checkOrderField((PageRequest)request);
        OrderTypeEnum.checkOrderType((PageRequest)request);
        Page entityPage = (Page)this.sinkMapper.selectByCondition(request);
        HashMap sinkMap = Maps.newHashMap();
        for (Object streamSink : entityPage) {
            InlongGroupEntity inlongGroupEntity = this.groupMapper.selectByGroupId(streamSink.getInlongGroupId());
            if (inlongGroupEntity == null) continue;
            sinkMap.computeIfAbsent(streamSink.getSinkType(), k -> new Page()).add(streamSink);
        }
        ArrayList responseList = Lists.newArrayList();
        for (Map.Entry entry : sinkMap.entrySet()) {
            StreamSinkOperator sinkOperator = this.operatorFactory.getInstance((String)entry.getKey());
            PageResult<? extends StreamSink> pageInfo = sinkOperator.getPageInfo((Page<StreamSinkEntity>)((Page)entry.getValue()));
            responseList.addAll(pageInfo.getList());
        }
        PageResult pageResult = new PageResult((List)responseList, Long.valueOf(entityPage.getTotal()), Integer.valueOf(entityPage.getPageNum()), Integer.valueOf(entityPage.getPageSize()));
        LOGGER.debug("success to list sink page, result size {}", (Object)pageResult.getList().size());
        return pageResult;
    }

    @Override
    public PageResult<Map<String, Object>> listDetail(SinkPageRequest request, String operator) {
        PageHelper.startPage((int)request.getPageNum(), (int)request.getPageSize());
        OrderFieldEnum.checkOrderField((PageRequest)request);
        OrderTypeEnum.checkOrderType((PageRequest)request);
        Page entityPage = (Page)this.sinkMapper.selectByCondition(request);
        InlongGroupEntity groupEntity = this.groupMapper.selectByGroupId(request.getInlongGroupId());
        InlongGroupInfo groupInfo = null;
        switch (groupEntity.getMqType()) {
            case "PULSAR": {
                groupInfo = (InlongGroupInfo)CommonBeanUtils.copyProperties((Object)groupEntity, InlongPulsarInfo::new, (boolean)true);
                break;
            }
            case "TUBEMQ": {
                groupInfo = (InlongGroupInfo)CommonBeanUtils.copyProperties((Object)groupEntity, InlongTubeMQInfo::new, (boolean)true);
                break;
            }
            case "KAFKA": {
                groupInfo = (InlongGroupInfo)CommonBeanUtils.copyProperties((Object)groupEntity, InlongKafkaInfo::new, (boolean)true);
            }
            default: {
                throw new BusinessException(ErrorCodeEnum.MQ_TYPE_NOT_SUPPORTED.getMessage());
            }
        }
        InlongGroupInfo finalGroupInfo = groupInfo;
        List responseList = entityPage.stream().map(sink -> {
            StreamSinkOperator sinkOperator = this.operatorFactory.getInstance(sink.getSinkType());
            StreamSink streamSink = sinkOperator.getFromEntity((StreamSinkEntity)sink);
            Map requestMap = (Map)JsonUtils.OBJECT_MAPPER.convertValue((Object)streamSink, (TypeReference)new TypeReference<Map<String, Object>>(){});
            InlongStreamEntity streamEntity = this.streamMapper.selectByIdentifier(request.getInlongGroupId(), sink.getInlongStreamId());
            String topic = "";
            String consumeGroup = "";
            switch (groupEntity.getMqType()) {
                case "PULSAR": {
                    List pulsarClusters = this.clusterEntityMapper.selectByKey(finalGroupInfo.getInlongClusterTag(), null, "PULSAR");
                    InlongPulsarDTO pulsarDTO = InlongPulsarDTO.getFromJson((String)groupEntity.getExtParams());
                    if (CollectionUtils.isEmpty((Collection)pulsarClusters)) break;
                    String tenant = pulsarDTO.getPulsarTenant();
                    if (StringUtils.isBlank((CharSequence)tenant)) {
                        InlongClusterEntity pulsarCluster = (InlongClusterEntity)pulsarClusters.get(0);
                        PulsarClusterDTO pulsarClusterDTO = PulsarClusterDTO.getFromJson((String)pulsarCluster.getExtParams());
                        tenant = pulsarClusterDTO.getPulsarTenant();
                    }
                    String fullTopicName = tenant + "/" + finalGroupInfo.getMqResource() + "/" + streamEntity.getMqResource();
                    topic = "persistent://" + fullTopicName;
                    consumeGroup = String.format("%s_%s_%s_consumer_group", finalGroupInfo.getInlongClusterTag(), fullTopicName, sink.getId());
                    break;
                }
                case "TUBEMQ": {
                    topic = streamEntity.getMqResource();
                    consumeGroup = String.format("%s_%s_%s_consumer_group", groupEntity.getInlongClusterTag(), topic, sink.getId());
                    break;
                }
                case "KAFKA": {
                    topic = streamEntity.getMqResource();
                    if (!topic.equals(streamEntity.getInlongStreamId())) break;
                    topic = String.format("%s.%s", finalGroupInfo.getMqResource(), streamEntity.getMqResource());
                    break;
                }
                default: {
                    throw new BusinessException(ErrorCodeEnum.MQ_TYPE_NOT_SUPPORTED.getMessage());
                }
            }
            requestMap.put("topic", topic);
            requestMap.put("consumerGroup", consumeGroup);
            SortConfigEntity sortConfigEntity = this.sortConfigEntityMapper.selectBySinkId(sink.getId());
            if (sortConfigEntity != null) {
                requestMap.put("dataFlowInfo", sortConfigEntity.getConfigParams());
            }
            return requestMap;
        }).collect(Collectors.toList());
        PageResult pageResult = new PageResult(responseList, Long.valueOf(entityPage.getTotal()), Integer.valueOf(entityPage.getPageNum()), Integer.valueOf(entityPage.getPageSize()));
        LOGGER.debug("success to list sink detail page, result size {}", (Object)pageResult.getList().size());
        return pageResult;
    }

    @Override
    public List<? extends StreamSink> listByCondition(SinkPageRequest request, UserInfo opInfo) {
        if (StringUtils.isBlank((CharSequence)request.getInlongGroupId())) {
            throw new BusinessException(ErrorCodeEnum.GROUP_ID_IS_EMPTY);
        }
        OrderFieldEnum.checkOrderField((PageRequest)request);
        OrderTypeEnum.checkOrderType((PageRequest)request);
        List sinkEntityList = this.sinkMapper.selectByCondition(request);
        HashMap sinkMap = Maps.newHashMap();
        for (StreamSinkEntity streamSink : sinkEntityList) {
            sinkMap.computeIfAbsent(streamSink.getSinkType(), k -> new Page()).add((Object)streamSink);
        }
        ArrayList filterResult = Lists.newArrayList();
        for (Map.Entry entry : sinkMap.entrySet()) {
            StreamSinkOperator sinkOperator = this.operatorFactory.getInstance((String)entry.getKey());
            PageResult<? extends StreamSink> pageInfo = sinkOperator.getPageInfo((Page<StreamSinkEntity>)((Page)entry.getValue()));
            for (StreamSink streamSink : pageInfo.getList()) {
                List<String> inCharges;
                InlongGroupEntity groupEntity = this.groupMapper.selectByGroupId(streamSink.getInlongGroupId());
                if (groupEntity == null || !opInfo.getAccountType().equals(TenantUserTypeEnum.TENANT_ADMIN.getCode()) && !(inCharges = Arrays.asList(groupEntity.getInCharges().split(","))).contains(opInfo.getName())) continue;
                filterResult.add(streamSink);
            }
        }
        return filterResult;
    }

    @Override
    @Transactional(rollbackFor={Throwable.class})
    public Boolean update(SinkRequest request, String operator) {
        boolean enableConfig;
        LOGGER.info("begin to update sink by id: {}", (Object)request);
        if (request == null) {
            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "inlong sink request is empty");
        }
        if (request.getId() == null) {
            throw new BusinessException(ErrorCodeEnum.ID_IS_EMPTY);
        }
        StreamSinkEntity curEntity = this.sinkMapper.selectByPrimaryKey(request.getId());
        if (curEntity == null) {
            throw new BusinessException(ErrorCodeEnum.SINK_INFO_NOT_FOUND);
        }
        this.chkUnmodifiableParams(curEntity, request);
        InlongGroupEntity groupEntity = this.groupCheckService.checkGroupStatus(request.getInlongGroupId(), operator);
        this.userService.checkUser(groupEntity.getInCharges() + "," + groupEntity.getFollowers(), operator, "Current user does not have permission to update sink info");
        InlongStreamEntity streamEntity = this.streamMapper.selectByIdentifier(request.getInlongGroupId(), request.getInlongStreamId());
        Preconditions.expectNotNull((Object)streamEntity, (String)ErrorCodeEnum.STREAM_NOT_FOUND.getMessage());
        StreamSinkEntity existEntity = this.sinkMapper.selectByUniqueKey(request.getInlongGroupId(), request.getInlongStreamId(), request.getSinkName());
        if (existEntity != null && !existEntity.getId().equals(request.getId())) {
            String errMsg = "sink name=%s already exists with the groupId=%s streamId=%s";
            throw new BusinessException(String.format(errMsg, request.getSinkName(), request.getInlongGroupId(), request.getInlongStreamId()));
        }
        SinkStatus nextStatus = null;
        boolean bl = enableConfig = StreamStatus.CONFIG_SUCCESSFUL.getCode().equals(streamEntity.getStatus()) || StreamStatus.CONFIG_FAILED.getCode().equals(streamEntity.getStatus());
        if (enableConfig) {
            boolean enableCreateResource = InlongConstants.ENABLE_CREATE_RESOURCE.equals(request.getEnableCreateResource());
            nextStatus = enableCreateResource ? SinkStatus.CONFIG_ING : SinkStatus.CONFIG_SUCCESSFUL;
        }
        StreamSinkOperator sinkOperator = this.operatorFactory.getInstance(request.getSinkType());
        sinkOperator.updateOpt(request, nextStatus, operator);
        if (enableConfig && request.getStartProcess().booleanValue()) {
            this.startProcessForSink(request.getInlongGroupId(), request.getInlongStreamId(), operator);
        }
        LOGGER.info("success to update sink by id: {}", (Object)request.getId());
        return true;
    }

    @Override
    @Transactional(rollbackFor={Throwable.class})
    public UpdateResult updateByKey(SinkRequest request, String operator) {
        LOGGER.info("begin to update sink by key: {}", (Object)request);
        String groupId = request.getInlongGroupId();
        String streamId = request.getInlongStreamId();
        String sinkName = request.getSinkName();
        StreamSinkEntity entity = this.sinkMapper.selectByUniqueKey(groupId, streamId, sinkName);
        if (entity == null) {
            String errMsg = String.format("stream sink not found with groupId=%s, streamId=%s, sinkName=%s", groupId, streamId, sinkName);
            LOGGER.error(errMsg);
            throw new BusinessException(errMsg);
        }
        request.setId(entity.getId());
        Boolean result = this.update(request, operator);
        LOGGER.info("success to update sink by key: {}", (Object)request);
        return new UpdateResult(entity.getId(), result, Integer.valueOf(request.getVersion() + 1));
    }

    @Override
    public void updateStatus(Integer id, int status, String log) {
        StreamSinkEntity entity = new StreamSinkEntity();
        entity.setId(id);
        entity.setStatus(Integer.valueOf(status));
        entity.setOperateLog(log);
        this.sinkMapper.updateStatus(entity);
        LOGGER.info("success to update sink status={} for id={} with log: {}", new Object[]{status, id, log});
    }

    @Override
    @Transactional(rollbackFor={Throwable.class})
    public Boolean delete(Integer id, Boolean startProcess, String operator) {
        LOGGER.info("begin to delete sink by id={}", (Object)id);
        Preconditions.expectNotNull((Object)id, (String)ErrorCodeEnum.ID_IS_EMPTY.getMessage());
        StreamSinkEntity entity = this.sinkMapper.selectByPrimaryKey(id);
        Preconditions.expectNotNull((Object)entity, (String)ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
        InlongGroupEntity groupEntity = this.groupCheckService.checkGroupStatus(entity.getInlongGroupId(), operator);
        this.userService.checkUser(groupEntity.getInCharges() + "," + groupEntity.getFollowers(), operator, "Current user does not have permission to delete sink info");
        StreamSinkOperator sinkOperator = this.operatorFactory.getInstance(entity.getSinkType());
        sinkOperator.deleteOpt(entity, operator);
        if (startProcess.booleanValue()) {
            this.deleteProcessForSink(entity.getInlongGroupId(), entity.getInlongStreamId(), operator);
        }
        LOGGER.info("success to delete sink by id: {}", (Object)entity);
        return true;
    }

    @Override
    @Transactional(rollbackFor={Throwable.class})
    public Boolean deleteByKey(String groupId, String streamId, String sinkName, Boolean startProcess, String operator) {
        LOGGER.info("begin to delete sink by groupId={}, streamId={}, sinkName={}", new Object[]{groupId, streamId, sinkName});
        Preconditions.expectNotBlank((String)groupId, (ErrorCodeEnum)ErrorCodeEnum.GROUP_ID_IS_EMPTY);
        Preconditions.expectNotBlank((String)streamId, (ErrorCodeEnum)ErrorCodeEnum.STREAM_ID_IS_EMPTY);
        Preconditions.expectNotBlank((String)sinkName, (ErrorCodeEnum)ErrorCodeEnum.INVALID_PARAMETER, (String)"stream sink name is empty or null");
        StreamSinkEntity entity = this.sinkMapper.selectByUniqueKey(groupId, streamId, sinkName);
        Preconditions.expectNotNull((Object)entity, (String)String.format("stream sink not exist by groupId=%s streamId=%s sinkName=%s", groupId, streamId, sinkName));
        InlongGroupEntity groupEntity = this.groupCheckService.checkGroupStatus(entity.getInlongGroupId(), operator);
        this.userService.checkUser(groupEntity.getInCharges() + "," + groupEntity.getFollowers(), operator, "Current user does not have permission to delete sink info");
        StreamSinkOperator sinkOperator = this.operatorFactory.getInstance(entity.getSinkType());
        sinkOperator.deleteOpt(entity, operator);
        if (startProcess.booleanValue()) {
            this.deleteProcessForSink(entity.getInlongGroupId(), entity.getInlongStreamId(), operator);
        }
        LOGGER.info("success to delete sink by key: {}", (Object)entity);
        return true;
    }

    @Override
    @Transactional(rollbackFor={Throwable.class})
    public Boolean logicDeleteAll(String groupId, String streamId, String operator) {
        LOGGER.info("begin to logic delete all sink info by groupId={}, streamId={}", (Object)groupId, (Object)streamId);
        Preconditions.expectNotBlank((String)groupId, (ErrorCodeEnum)ErrorCodeEnum.GROUP_ID_IS_EMPTY);
        Preconditions.expectNotBlank((String)streamId, (ErrorCodeEnum)ErrorCodeEnum.STREAM_ID_IS_EMPTY);
        this.groupCheckService.checkGroupStatus(groupId, operator);
        List entityList = this.sinkMapper.selectByRelatedId(groupId, streamId);
        if (CollectionUtils.isNotEmpty((Collection)entityList)) {
            entityList.forEach(entity -> {
                Integer id = entity.getId();
                entity.setPreviousStatus(entity.getStatus());
                entity.setStatus(InlongConstants.DELETED_STATUS);
                entity.setIsDeleted(id);
                entity.setModifier(operator);
                int rowCount = this.sinkMapper.updateByIdSelective(entity);
                this.checkAffectRowCount(rowCount, (StreamSinkEntity)entity);
                this.sinkFieldMapper.logicDeleteAll(id);
            });
        }
        LOGGER.info("success to logic delete all sink by groupId={}, streamId={}", (Object)groupId, (Object)streamId);
        return true;
    }

    private void checkAffectRowCount(int affectRowCount, StreamSinkEntity entity) {
        if (affectRowCount != InlongConstants.AFFECTED_ONE_ROW) {
            LOGGER.error("sink has already updated with groupId={}, streamId={}, name={}, curVersion={}", new Object[]{entity.getInlongGroupId(), entity.getInlongStreamId(), entity.getSinkName(), entity.getVersion()});
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
        }
    }

    @Override
    @Transactional(rollbackFor={Throwable.class})
    public Boolean deleteAll(String groupId, String streamId, String operator) {
        LOGGER.info("begin to delete all sink by groupId={}, streamId={}", (Object)groupId, (Object)streamId);
        Preconditions.expectNotBlank((String)groupId, (ErrorCodeEnum)ErrorCodeEnum.GROUP_ID_IS_EMPTY);
        Preconditions.expectNotBlank((String)streamId, (ErrorCodeEnum)ErrorCodeEnum.STREAM_ID_IS_EMPTY);
        this.groupCheckService.checkGroupStatus(groupId, operator);
        List entityList = this.sinkMapper.selectByRelatedId(groupId, streamId);
        if (CollectionUtils.isNotEmpty((Collection)entityList)) {
            entityList.forEach(entity -> {
                this.sinkMapper.deleteById(entity.getId());
                this.sinkFieldMapper.deleteAll(entity.getId());
            });
        }
        LOGGER.info("success to delete all sink by groupId={}, streamId={}", (Object)groupId, (Object)streamId);
        return true;
    }

    @Override
    public List<String> getExistsStreamIdList(String groupId, String sinkType, List<String> streamIdList) {
        LOGGER.debug("begin to filter stream by groupId={}, type={}, streamId={}", new Object[]{groupId, sinkType, streamIdList});
        if (StringUtils.isEmpty((CharSequence)sinkType) || CollectionUtils.isEmpty(streamIdList)) {
            return Collections.emptyList();
        }
        List resultList = this.sinkMapper.selectExistsStreamId(groupId, sinkType, streamIdList);
        LOGGER.debug("success to filter stream id list, result streamId={}", (Object)resultList);
        return resultList;
    }

    @Override
    public List<String> getSinkTypeList(String groupId, String streamId) {
        if (StringUtils.isEmpty((CharSequence)streamId)) {
            return Collections.emptyList();
        }
        List resultList = this.sinkMapper.selectSinkType(groupId, streamId);
        LOGGER.debug("success to get sink type by groupId={}, streamId={}, result={}", new Object[]{groupId, streamId, resultList});
        return resultList;
    }

    @Override
    public Boolean updateAfterApprove(List<SinkApproveDTO> approveList, String operator) {
        LOGGER.info("begin to update sink after approve: {}", approveList);
        if (CollectionUtils.isEmpty(approveList)) {
            return true;
        }
        for (SinkApproveDTO dto : approveList) {
            String sinkType = dto.getSinkType();
            Preconditions.expectNotBlank((String)sinkType, (ErrorCodeEnum)ErrorCodeEnum.SINK_TYPE_IS_NULL);
            StreamSinkEntity entity = this.sinkMapper.selectByPrimaryKey(dto.getId());
            int status = dto.getStatus() == null ? SinkStatus.CONFIG_ING.getCode() : dto.getStatus();
            entity.setPreviousStatus(entity.getStatus());
            entity.setStatus(Integer.valueOf(status));
            entity.setModifier(operator);
            int rowCount = this.sinkMapper.updateByIdSelective(entity);
            this.checkAffectRowCount(rowCount, entity);
        }
        LOGGER.info("success to update sink after approve: {}", approveList);
        return true;
    }

    @Override
    public boolean addFields(StreamSinkEntity sinkEntity, List<SinkField> sinkFieldList) {
        Set existFields = this.sinkFieldMapper.selectBySinkId(sinkEntity.getId()).stream().map(StreamSinkFieldEntity::getFieldName).collect(Collectors.toSet());
        LOGGER.debug("begin to save sink fields={}", sinkFieldList);
        if (CollectionUtils.isEmpty(sinkFieldList)) {
            return true;
        }
        ArrayList<StreamSinkFieldEntity> needAddFieldList = new ArrayList<StreamSinkFieldEntity>();
        for (SinkField fieldInfo : sinkFieldList) {
            if (existFields.contains(fieldInfo.getFieldName())) {
                LOGGER.debug("current sink field={} is exist for groupId={}, streamId={}", new Object[]{fieldInfo.getFieldName(), sinkEntity.getInlongGroupId(), sinkEntity.getInlongStreamId()});
                continue;
            }
            StreamSinkFieldEntity fieldEntity = (StreamSinkFieldEntity)CommonBeanUtils.copyProperties((Object)fieldInfo, StreamSinkFieldEntity::new);
            if (StringUtils.isEmpty((CharSequence)fieldEntity.getFieldComment())) {
                fieldEntity.setFieldComment(fieldEntity.getFieldName());
            }
            fieldEntity.setInlongGroupId(sinkEntity.getInlongGroupId());
            fieldEntity.setInlongStreamId(sinkEntity.getInlongStreamId());
            fieldEntity.setSinkType(sinkEntity.getSinkType());
            fieldEntity.setSinkId(sinkEntity.getId());
            fieldEntity.setIsDeleted(InlongConstants.UN_DELETED);
            needAddFieldList.add(fieldEntity);
        }
        if (CollectionUtils.isNotEmpty(needAddFieldList)) {
            this.sinkFieldMapper.insertAll(needAddFieldList);
        }
        LOGGER.debug("success to save sink fields={}", needAddFieldList);
        return true;
    }

    @Override
    public List<SinkField> parseFields(ParseFieldRequest parseFieldRequest) {
        try {
            String method = parseFieldRequest.getMethod();
            String statement = parseFieldRequest.getStatement();
            switch (method) {
                case "json": {
                    return this.parseFieldsByJson(statement);
                }
                case "sql": {
                    return this.parseFieldsBySql(statement);
                }
                case "csv": {
                    return this.parseFieldsByCsv(statement);
                }
            }
            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, String.format("Unsupported parse mode: %s", method));
        }
        catch (Exception e) {
            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, String.format("parse sink fields error: %s", e.getMessage()));
        }
    }

    private List<SinkField> parseFieldsByCsv(String statement) {
        String[] lines = statement.split("\n");
        ArrayList<SinkField> fields = new ArrayList<SinkField>();
        for (int i = 0; i < lines.length; ++i) {
            String line = lines[i];
            if (StringUtils.isBlank((CharSequence)line)) continue;
            String[] cols = PARSE_FIELD_CSV_SPLITTER.split(line, 3);
            if (cols.length < 2) {
                throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "At least two fields are required, line number is " + (i + 1));
            }
            String fieldName = cols[0];
            if (!InlongConstants.PATTERN_NORMAL_CHARACTERS.matcher(fieldName).matches()) {
                throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "Field names in line " + (i + 1) + " can only contain letters, underscores or numbers");
            }
            String fieldType = cols[1];
            String comment = null;
            if (cols.length == 3) {
                comment = cols[2];
            }
            SinkField field = new SinkField();
            field.setFieldName(fieldName);
            field.setFieldType(fieldType);
            field.setFieldComment(comment);
            fields.add(field);
        }
        return fields;
    }

    private List<SinkField> parseFieldsBySql(String sql) throws JSQLParserException {
        CCJSqlParserManager pm = new CCJSqlParserManager();
        Statement statement = pm.parse((Reader)new StringReader(sql));
        ArrayList<SinkField> fields = new ArrayList<SinkField>();
        if (!(statement instanceof CreateTable)) {
            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "The SQL statement must be a table creation statement");
        }
        CreateTable createTable = (CreateTable)statement;
        List columnDefinitions = createTable.getColumnDefinitions();
        for (ColumnDefinition definition : columnDefinitions) {
            String columnName = definition.getColumnName();
            ColDataType colDataType = definition.getColDataType();
            String sqlDataType = colDataType.getDataType();
            SinkField sinkField = new SinkField();
            sinkField.setFieldName(columnName);
            String realDataType = StringUtils.substringBefore((String)sqlDataType, (String)"(").toLowerCase();
            sinkField.setFieldType(realDataType);
            List columnSpecs = definition.getColumnSpecs();
            if (CollectionUtils.isNotEmpty((Collection)columnSpecs)) {
                int commentIndex = -1;
                for (int csIndex = 0; csIndex < columnSpecs.size(); ++csIndex) {
                    String spec = (String)columnSpecs.get(csIndex);
                    if (!spec.toUpperCase().startsWith("COMMENT")) continue;
                    commentIndex = csIndex;
                    break;
                }
                String comment = null;
                if (-1 != commentIndex && columnSpecs.size() > commentIndex + 1) {
                    comment = ((String)columnSpecs.get(commentIndex + 1)).replaceAll("['\"]", "");
                }
                sinkField.setFieldComment(comment);
            }
            fields.add(sinkField);
        }
        return fields;
    }

    private List<SinkField> parseFieldsByJson(String statement) throws JsonProcessingException {
        return ((List)this.objectMapper.readValue(statement, (TypeReference)new TypeReference<List<Map<String, String>>>(){})).stream().map(line -> {
            String name = (String)line.get("name");
            String type = (String)line.get("type");
            String desc = (String)line.get("desc");
            SinkField sinkField = new SinkField();
            sinkField.setFieldName(name);
            sinkField.setFieldType(type);
            sinkField.setFieldComment(desc);
            return sinkField;
        }).collect(Collectors.toList());
    }

    private void checkSinkRequestParams(SinkRequest request) {
        String groupId = request.getInlongGroupId();
        if (StringUtils.isBlank((CharSequence)groupId)) {
            throw new BusinessException(ErrorCodeEnum.GROUP_ID_IS_EMPTY);
        }
        String streamId = request.getInlongStreamId();
        if (StringUtils.isBlank((CharSequence)streamId)) {
            throw new BusinessException(ErrorCodeEnum.STREAM_ID_IS_EMPTY);
        }
        String sinkType = request.getSinkType();
        if (StringUtils.isBlank((CharSequence)sinkType)) {
            throw new BusinessException(ErrorCodeEnum.SINK_TYPE_IS_NULL);
        }
        String sinkName = request.getSinkName();
        if (StringUtils.isBlank((CharSequence)sinkName)) {
            throw new BusinessException(ErrorCodeEnum.SINK_NAME_IS_NULL);
        }
    }

    private void checkParams(SinkRequest request) {
        Preconditions.expectNotNull((Object)request, (String)ErrorCodeEnum.REQUEST_IS_EMPTY.getMessage());
        String groupId = request.getInlongGroupId();
        Preconditions.expectNotBlank((String)groupId, (ErrorCodeEnum)ErrorCodeEnum.GROUP_ID_IS_EMPTY);
        String streamId = request.getInlongStreamId();
        Preconditions.expectNotBlank((String)streamId, (ErrorCodeEnum)ErrorCodeEnum.STREAM_ID_IS_EMPTY);
        String sinkType = request.getSinkType();
        Preconditions.expectNotBlank((String)sinkType, (ErrorCodeEnum)ErrorCodeEnum.SINK_TYPE_IS_NULL);
        String sinkName = request.getSinkName();
        Preconditions.expectNotBlank((String)sinkName, (ErrorCodeEnum)ErrorCodeEnum.SINK_NAME_IS_NULL);
    }

    private void startProcessForSink(String groupId, String streamId, String operator) {
        if (this.streamProcessOperation == null) {
            this.streamProcessOperation = new InlongStreamProcessService();
            this.autowireCapableBeanFactory.autowireBean((Object)this.streamProcessOperation);
        }
        this.streamProcessOperation.startProcess(groupId, streamId, operator, false);
        LOGGER.info("success to start the start-stream-process for groupId={} streamId={}", (Object)groupId, (Object)streamId);
    }

    private void deleteProcessForSink(String groupId, String streamId, String operator) {
        if (this.streamProcessOperation == null) {
            this.streamProcessOperation = new InlongStreamProcessService();
            this.autowireCapableBeanFactory.autowireBean((Object)this.streamProcessOperation);
        }
        this.streamProcessOperation.deleteProcess(groupId, streamId, operator, false);
        LOGGER.debug("success to start the delete-stream-process for groupId={} streamId={}", (Object)groupId, (Object)streamId);
    }

    private void chkUnmodifiableParams(StreamSinkEntity curEntity, SinkRequest request) {
        Preconditions.expectEquals((Object)curEntity.getSinkType(), (Object)request.getSinkType(), (ErrorCodeEnum)ErrorCodeEnum.INVALID_PARAMETER, (String)"sinkType not allowed modify");
        Preconditions.expectEquals((Object)curEntity.getVersion(), (Object)request.getVersion(), (ErrorCodeEnum)ErrorCodeEnum.CONFIG_EXPIRED, (String)String.format("record has expired with record version=%d, request version=%d", curEntity.getVersion(), request.getVersion()));
        if (StringUtils.isNotBlank((CharSequence)request.getInlongGroupId()) && !curEntity.getInlongGroupId().equals(request.getInlongGroupId())) {
            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "InlongGroupId not allowed modify");
        }
        if (StringUtils.isNotBlank((CharSequence)request.getInlongStreamId()) && !curEntity.getInlongStreamId().equals(request.getInlongStreamId())) {
            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "InlongStreamId not allowed modify");
        }
        request.setInlongGroupId(curEntity.getInlongGroupId());
        request.setInlongStreamId(curEntity.getInlongStreamId());
    }
}

