/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.confignode.procedure.impl.subscription.subscription;

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerGroupMeta;
import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.OperateMultiplePipesPlanV2;
import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
import org.apache.iotdb.confignode.persistence.subscription.SubscriptionInfo;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProcedureV2;
import org.apache.iotdb.confignode.procedure.impl.pipe.task.CreatePipeProcedureV2;
import org.apache.iotdb.confignode.procedure.impl.subscription.SubscriptionOperation;
import org.apache.iotdb.confignode.procedure.impl.subscription.consumer.AlterConsumerGroupProcedure;
import org.apache.iotdb.confignode.procedure.impl.subscription.subscription.AbstractOperateSubscriptionAndPipeProcedure;
import org.apache.iotdb.confignode.procedure.impl.subscription.topic.AlterTopicProcedure;
import org.apache.iotdb.confignode.procedure.store.ProcedureType;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TSubscribeReq;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CreateSubscriptionProcedure
extends AbstractOperateSubscriptionAndPipeProcedure {
    private static final Logger LOGGER = LoggerFactory.getLogger(CreateSubscriptionProcedure.class);
    private TSubscribeReq subscribeReq;
    private AlterConsumerGroupProcedure alterConsumerGroupProcedure;
    private List<CreatePipeProcedureV2> createPipeProcedures = new ArrayList<CreatePipeProcedureV2>();
    private final List<AlterTopicProcedure> alterTopicProcedures = new ArrayList<AlterTopicProcedure>();

    public CreateSubscriptionProcedure() {
    }

    public CreateSubscriptionProcedure(TSubscribeReq subscribeReq) {
        this.subscribeReq = subscribeReq;
    }

    @Override
    protected SubscriptionOperation getOperation() {
        return SubscriptionOperation.CREATE_SUBSCRIPTION;
    }

    @Override
    protected boolean executeFromValidate(ConfigNodeProcedureEnv env) throws SubscriptionException {
        LOGGER.info("CreateSubscriptionProcedure: executeFromValidate");
        ((SubscriptionInfo)this.subscriptionInfo.get()).validateBeforeSubscribe(this.subscribeReq);
        String consumerGroupId = this.subscribeReq.getConsumerGroupId();
        ConsumerGroupMeta updatedConsumerGroupMeta = ((SubscriptionInfo)this.subscriptionInfo.get()).deepCopyConsumerGroupMeta(consumerGroupId);
        updatedConsumerGroupMeta.addSubscription(this.subscribeReq.getConsumerId(), this.subscribeReq.getTopicNames());
        this.alterConsumerGroupProcedure = new AlterConsumerGroupProcedure(updatedConsumerGroupMeta, this.subscriptionInfo);
        for (String topicName : this.subscribeReq.getTopicNames()) {
            String pipeName = PipeStaticMeta.generateSubscriptionPipeName((String)topicName, (String)consumerGroupId);
            if (((SubscriptionInfo)this.subscriptionInfo.get()).isTopicSubscribedByConsumerGroup(topicName, consumerGroupId) && ((PipeTaskInfo)this.pipeTaskInfo.get()).isPipeExisted(pipeName)) continue;
            TopicMeta topicMeta = ((SubscriptionInfo)this.subscriptionInfo.get()).deepCopyTopicMeta(topicName);
            this.createPipeProcedures.add(new CreatePipeProcedureV2(new TCreatePipeReq().setPipeName(pipeName).setExtractorAttributes(topicMeta.generateExtractorAttributes()).setProcessorAttributes(topicMeta.generateProcessorAttributes()).setConnectorAttributes(topicMeta.generateConnectorAttributes(consumerGroupId)), this.pipeTaskInfo));
        }
        this.alterConsumerGroupProcedure.executeFromValidate(env);
        for (CreatePipeProcedureV2 createPipeProcedure : this.createPipeProcedures) {
            createPipeProcedure.executeFromValidateTask(env);
            createPipeProcedure.executeFromCalculateInfoForTask(env);
        }
        return true;
    }

    @Override
    protected void executeFromOperateOnConfigNodes(ConfigNodeProcedureEnv env) throws SubscriptionException {
        TSStatus response;
        LOGGER.info("CreateSubscriptionProcedure: executeFromOperateOnConfigNodes");
        this.alterConsumerGroupProcedure.executeFromOperateOnConfigNodes(env);
        List<ConfigPhysicalPlan> createPipePlans = this.createPipeProcedures.stream().map(CreatePipeProcedureV2::constructPlan).collect(Collectors.toList());
        try {
            response = env.getConfigManager().getConsensusManager().write(new OperateMultiplePipesPlanV2(createPipePlans));
        }
        catch (ConsensusException e) {
            LOGGER.warn("Failed in the write API executing the consensus layer due to: ", (Throwable)e);
            response = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
            response.setMessage(e.getMessage());
        }
        if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() && response.getSubStatusSize() > 0) {
            throw new SubscriptionException(String.format("Failed to create subscription with request %s on config nodes, because %s", this.subscribeReq, response));
        }
    }

    @Override
    protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) throws SubscriptionException, IOException {
        LOGGER.info("CreateSubscriptionProcedure: executeFromOperateOnDataNodes");
        this.alterConsumerGroupProcedure.executeFromOperateOnDataNodes(env);
        List<String> pipeNames = this.createPipeProcedures.stream().map(CreatePipeProcedureV2::getPipeName).collect(Collectors.toList());
        String exceptionMessage = AbstractOperatePipeProcedureV2.parsePushPipeMetaExceptionForPipe(null, this.pushMultiPipeMetaToDataNodes(pipeNames, env));
        if (!exceptionMessage.isEmpty()) {
            throw new SubscriptionException(String.format("Failed to create pipes %s when creating subscription with request %s, details: %s, metadata will be synchronized later.", pipeNames, this.subscribeReq, exceptionMessage));
        }
    }

    @Override
    protected void rollbackFromValidate(ConfigNodeProcedureEnv env) {
        LOGGER.info("CreateSubscriptionProcedure: rollbackFromValidate");
    }

    @Override
    protected void rollbackFromOperateOnConfigNodes(ConfigNodeProcedureEnv env) throws SubscriptionException {
        TSStatus response;
        LOGGER.info("CreateSubscriptionProcedure: rollbackFromOperateOnConfigNodes");
        List<ConfigPhysicalPlan> dropPipePlans = this.createPipeProcedures.stream().map(procedure -> new DropPipePlanV2(procedure.getPipeName())).collect(Collectors.toList());
        try {
            response = env.getConfigManager().getConsensusManager().write(new OperateMultiplePipesPlanV2(dropPipePlans));
        }
        catch (ConsensusException e) {
            LOGGER.warn("Failed in the write API executing the consensus layer due to: ", (Throwable)e);
            response = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
            response.setMessage(e.getMessage());
        }
        if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
            throw new SubscriptionException(String.format("Failed to rollback creating subscription with request %s on config nodes, because %s", this.subscribeReq, response));
        }
        this.alterConsumerGroupProcedure.rollbackFromOperateOnConfigNodes(env);
    }

    @Override
    protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) throws SubscriptionException, IOException {
        LOGGER.info("CreateSubscriptionProcedure: rollbackFromOperateOnDataNodes");
        String exceptionMessage = AbstractOperatePipeProcedureV2.parsePushPipeMetaExceptionForPipe(null, AbstractOperatePipeProcedureV2.pushPipeMetaToDataNodes(env, this.pipeTaskInfo));
        if (!exceptionMessage.isEmpty()) {
            throw new SubscriptionException(String.format("Failed to rollback create pipes when creating subscription with request %s, because %s", this.subscribeReq, exceptionMessage));
        }
        this.alterConsumerGroupProcedure.rollbackFromOperateOnDataNodes(env);
    }

    @Override
    public void serialize(DataOutputStream stream) throws IOException {
        stream.writeShort(ProcedureType.CREATE_SUBSCRIPTION_PROCEDURE.getTypeCode());
        super.serialize(stream);
        ReadWriteIOUtils.write((String)this.subscribeReq.getConsumerId(), (OutputStream)stream);
        ReadWriteIOUtils.write((String)this.subscribeReq.getConsumerGroupId(), (OutputStream)stream);
        int size = this.subscribeReq.getTopicNamesSize();
        ReadWriteIOUtils.write((int)size, (OutputStream)stream);
        if (size != 0) {
            for (String topicName : this.subscribeReq.getTopicNames()) {
                ReadWriteIOUtils.write((String)topicName, (OutputStream)stream);
            }
        }
        if (this.alterConsumerGroupProcedure != null) {
            ReadWriteIOUtils.write((Boolean)true, (OutputStream)stream);
            this.alterConsumerGroupProcedure.serialize(stream);
        } else {
            ReadWriteIOUtils.write((Boolean)false, (OutputStream)stream);
        }
        if (this.alterTopicProcedures != null) {
            ReadWriteIOUtils.write((Boolean)true, (OutputStream)stream);
            ReadWriteIOUtils.write((int)this.alterTopicProcedures.size(), (OutputStream)stream);
            for (AlterTopicProcedure topicProcedure : this.alterTopicProcedures) {
                topicProcedure.serialize(stream);
            }
        } else {
            ReadWriteIOUtils.write((Boolean)false, (OutputStream)stream);
        }
        if (this.createPipeProcedures != null) {
            ReadWriteIOUtils.write((Boolean)true, (OutputStream)stream);
            ReadWriteIOUtils.write((int)this.createPipeProcedures.size(), (OutputStream)stream);
            for (CreatePipeProcedureV2 pipeProcedure : this.createPipeProcedures) {
                pipeProcedure.serialize(stream);
            }
        } else {
            ReadWriteIOUtils.write((Boolean)false, (OutputStream)stream);
        }
    }

    @Override
    public void deserialize(ByteBuffer byteBuffer) {
        int i;
        super.deserialize(byteBuffer);
        this.subscribeReq = new TSubscribeReq().setConsumerId(ReadWriteIOUtils.readString((ByteBuffer)byteBuffer)).setConsumerGroupId(ReadWriteIOUtils.readString((ByteBuffer)byteBuffer)).setTopicNames(new HashSet());
        int size = ReadWriteIOUtils.readInt((ByteBuffer)byteBuffer);
        for (i = 0; i < size; ++i) {
            this.subscribeReq.getTopicNames().add(ReadWriteIOUtils.readString((ByteBuffer)byteBuffer));
        }
        if (ReadWriteIOUtils.readBool((ByteBuffer)byteBuffer)) {
            ReadWriteIOUtils.readShort((ByteBuffer)byteBuffer);
            this.alterConsumerGroupProcedure = new AlterConsumerGroupProcedure();
            this.alterConsumerGroupProcedure.deserialize(byteBuffer);
        }
        if (ReadWriteIOUtils.readBool((ByteBuffer)byteBuffer)) {
            size = ReadWriteIOUtils.readInt((ByteBuffer)byteBuffer);
            for (i = 0; i < size; ++i) {
                ReadWriteIOUtils.readShort((ByteBuffer)byteBuffer);
                AlterTopicProcedure topicProcedure = new AlterTopicProcedure();
                topicProcedure.deserialize(byteBuffer);
                this.alterTopicProcedures.add(topicProcedure);
            }
        }
        if (ReadWriteIOUtils.readBool((ByteBuffer)byteBuffer)) {
            size = ReadWriteIOUtils.readInt((ByteBuffer)byteBuffer);
            for (i = 0; i < size; ++i) {
                short typeCode = ReadWriteIOUtils.readShort((ByteBuffer)byteBuffer);
                if (typeCode != ProcedureType.CREATE_PIPE_PROCEDURE_V2.getTypeCode()) continue;
                CreatePipeProcedureV2 createPipeProcedureV2 = new CreatePipeProcedureV2();
                createPipeProcedureV2.deserialize(byteBuffer);
                this.createPipeProcedures.add(createPipeProcedureV2);
            }
        }
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || this.getClass() != o.getClass()) {
            return false;
        }
        CreateSubscriptionProcedure that = (CreateSubscriptionProcedure)o;
        return Objects.equals(this.getProcId(), that.getProcId()) && Objects.equals(this.getCurrentState(), that.getCurrentState()) && this.getCycles() == that.getCycles() && Objects.equals(this.subscribeReq, that.subscribeReq) && Objects.equals(this.alterConsumerGroupProcedure, that.alterConsumerGroupProcedure) && Objects.equals(this.createPipeProcedures, that.createPipeProcedures);
    }

    public int hashCode() {
        return Objects.hash(this.getProcId(), this.getCurrentState(), this.getCycles(), this.subscribeReq, this.alterConsumerGroupProcedure, this.createPipeProcedures);
    }

    @TestOnly
    public void setAlterConsumerGroupProcedure(AlterConsumerGroupProcedure alterConsumerGroupProcedure) {
        this.alterConsumerGroupProcedure = alterConsumerGroupProcedure;
    }

    @TestOnly
    public AlterConsumerGroupProcedure getAlterConsumerGroupProcedure() {
        return this.alterConsumerGroupProcedure;
    }

    @TestOnly
    public void setCreatePipeProcedures(List<CreatePipeProcedureV2> createPipeProcedures) {
        this.createPipeProcedures = createPipeProcedures;
    }

    @TestOnly
    public List<CreatePipeProcedureV2> getCreatePipeProcedures() {
        return this.createPipeProcedures;
    }
}

