/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.session.subscription.consumer.base;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.subscription.config.ConsumerConfig;
import org.apache.iotdb.rpc.subscription.config.TopicConfig;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionConnectionException;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionPipeTimeoutException;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionRuntimeCriticalException;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionRuntimeNonCriticalException;
import org.apache.iotdb.rpc.subscription.payload.poll.PollFilePayload;
import org.apache.iotdb.rpc.subscription.payload.poll.PollPayload;
import org.apache.iotdb.rpc.subscription.payload.poll.PollTabletsPayload;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollPayload;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollRequest;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollRequestType;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeCloseReq;
import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeCommitReq;
import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeHandshakeReq;
import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeHeartbeatReq;
import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribePollReq;
import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeSubscribeReq;
import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeUnsubscribeReq;
import org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeHandshakeResp;
import org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeHeartbeatResp;
import org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribePollResp;
import org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeSubscribeResp;
import org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeUnsubscribeResp;
import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeReq;
import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp;
import org.apache.iotdb.session.AbstractSessionBuilder;
import org.apache.iotdb.session.subscription.SubscriptionSessionConnection;
import org.apache.iotdb.session.subscription.SubscriptionSessionWrapper;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractSubscriptionProvider {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSubscriptionProvider.class);
    private static final String STATUS_FORMATTER = "Status code is [%s], status message is [%s].";
    private static final String INTERNAL_ERROR_FORMATTER = "Internal error occurred. Status code is [%s], status message is [%s].";
    private static final String SUBSCRIPTION_PIPE_TIMEOUT_FORMATTER = "A timeout has occurred in procedures related to the pipe within the subscription procedure. Please manually check the subscription correctness later. Status code is [%s], status message is [%s].";
    private final SubscriptionSessionWrapper session;
    private String consumerId;
    private String consumerGroupId;
    private final AtomicBoolean isClosed = new AtomicBoolean(true);
    private final AtomicBoolean isAvailable = new AtomicBoolean(false);
    private final TEndPoint endPoint;
    private int dataNodeId;
    private final String username;
    private final String password;

    protected abstract AbstractSessionBuilder constructSubscriptionSessionBuilder(String var1, int var2, String var3, String var4, int var5);

    protected AbstractSubscriptionProvider(TEndPoint endPoint, String username, String password, String consumerId, String consumerGroupId, int thriftMaxFrameSize) {
        this.session = new SubscriptionSessionWrapper(this.constructSubscriptionSessionBuilder(endPoint.ip, endPoint.port, username, password, thriftMaxFrameSize));
        this.endPoint = endPoint;
        this.consumerId = consumerId;
        this.consumerGroupId = consumerGroupId;
        this.username = username;
        this.password = password;
    }

    SubscriptionSessionConnection getSessionConnection() {
        return this.session.getSessionConnection();
    }

    boolean isAvailable() {
        return this.isAvailable.get();
    }

    void setAvailable() {
        this.isAvailable.set(true);
    }

    void setUnavailable() {
        this.isAvailable.set(false);
    }

    int getDataNodeId() {
        return this.dataNodeId;
    }

    String getConsumerId() {
        return this.consumerId;
    }

    String getConsumerGroupId() {
        return this.consumerGroupId;
    }

    TEndPoint getEndPoint() {
        return this.endPoint;
    }

    synchronized void handshake() throws SubscriptionException, IoTDBConnectionException {
        if (!this.isClosed.get()) {
            return;
        }
        this.session.open();
        HashMap<String, String> consumerAttributes = new HashMap<String, String>();
        consumerAttributes.put("group-id", this.consumerGroupId);
        consumerAttributes.put("consumer-id", this.consumerId);
        consumerAttributes.put("username", this.username);
        consumerAttributes.put("password", this.password);
        consumerAttributes.put("sql-dialect", this.session.getSqlDialect());
        PipeSubscribeHandshakeResp resp = this.handshake(new ConsumerConfig(consumerAttributes));
        this.dataNodeId = resp.getDataNodeId();
        this.consumerId = resp.getConsumerId();
        this.consumerGroupId = resp.getConsumerGroupId();
        this.isClosed.set(false);
        this.setAvailable();
    }

    PipeSubscribeHandshakeResp handshake(ConsumerConfig consumerConfig) throws SubscriptionException {
        TPipeSubscribeResp resp;
        PipeSubscribeHandshakeReq req;
        try {
            req = PipeSubscribeHandshakeReq.toTPipeSubscribeReq((ConsumerConfig)consumerConfig);
        }
        catch (IOException e) {
            LOGGER.warn("IOException occurred when SubscriptionProvider {} serialize handshake request {}", new Object[]{this, consumerConfig, e});
            throw new SubscriptionRuntimeNonCriticalException(e.getMessage(), (Throwable)e);
        }
        try {
            resp = this.getSessionConnection().pipeSubscribe((TPipeSubscribeReq)req);
        }
        catch (TException e) {
            LOGGER.warn("TException occurred when SubscriptionProvider {} handshake with request {}, set SubscriptionProvider unavailable", new Object[]{this, consumerConfig, e});
            this.setUnavailable();
            throw new SubscriptionConnectionException(e.getMessage(), (Throwable)e);
        }
        AbstractSubscriptionProvider.verifyPipeSubscribeSuccess(resp.status);
        return PipeSubscribeHandshakeResp.fromTPipeSubscribeResp((TPipeSubscribeResp)resp);
    }

    synchronized void close() throws SubscriptionException, IoTDBConnectionException {
        if (this.isClosed.get()) {
            return;
        }
        try {
            this.closeInternal();
        }
        finally {
            this.session.close();
            this.setUnavailable();
            this.isClosed.set(true);
        }
    }

    void closeInternal() throws SubscriptionException {
        TPipeSubscribeResp resp;
        try {
            resp = this.getSessionConnection().pipeSubscribe((TPipeSubscribeReq)PipeSubscribeCloseReq.toTPipeSubscribeReq());
        }
        catch (TException e) {
            LOGGER.warn("TException occurred when SubscriptionProvider {} close, set SubscriptionProvider unavailable", (Object)this, (Object)e);
            this.setUnavailable();
            throw new SubscriptionConnectionException(e.getMessage(), (Throwable)e);
        }
        AbstractSubscriptionProvider.verifyPipeSubscribeSuccess(resp.status);
    }

    PipeSubscribeHeartbeatResp heartbeat() throws SubscriptionException {
        TPipeSubscribeResp resp;
        try {
            resp = this.getSessionConnection().pipeSubscribe((TPipeSubscribeReq)PipeSubscribeHeartbeatReq.toTPipeSubscribeReq());
        }
        catch (TException e) {
            LOGGER.warn("TException occurred when SubscriptionProvider {} heartbeat, set SubscriptionProvider unavailable", (Object)this, (Object)e);
            this.setUnavailable();
            throw new SubscriptionConnectionException(e.getMessage(), (Throwable)e);
        }
        AbstractSubscriptionProvider.verifyPipeSubscribeSuccess(resp.status);
        return PipeSubscribeHeartbeatResp.fromTPipeSubscribeResp((TPipeSubscribeResp)resp);
    }

    Map<String, TopicConfig> subscribe(Set<String> topicNames) throws SubscriptionException {
        TPipeSubscribeResp resp;
        PipeSubscribeSubscribeReq req;
        try {
            req = PipeSubscribeSubscribeReq.toTPipeSubscribeReq(topicNames);
        }
        catch (IOException e) {
            LOGGER.warn("IOException occurred when SubscriptionProvider {} serialize subscribe request {}", new Object[]{this, topicNames, e});
            throw new SubscriptionRuntimeNonCriticalException(e.getMessage(), (Throwable)e);
        }
        try {
            resp = this.getSessionConnection().pipeSubscribe((TPipeSubscribeReq)req);
        }
        catch (TException e) {
            LOGGER.warn("TException occurred when SubscriptionProvider {} subscribe with request {}, set SubscriptionProvider unavailable", new Object[]{this, topicNames, e});
            this.setUnavailable();
            throw new SubscriptionConnectionException(e.getMessage(), (Throwable)e);
        }
        AbstractSubscriptionProvider.verifyPipeSubscribeSuccess(resp.status);
        PipeSubscribeSubscribeResp subscribeResp = PipeSubscribeSubscribeResp.fromTPipeSubscribeResp((TPipeSubscribeResp)resp);
        return subscribeResp.getTopics();
    }

    Map<String, TopicConfig> unsubscribe(Set<String> topicNames) throws SubscriptionException {
        TPipeSubscribeResp resp;
        PipeSubscribeUnsubscribeReq req;
        try {
            req = PipeSubscribeUnsubscribeReq.toTPipeSubscribeReq(topicNames);
        }
        catch (IOException e) {
            LOGGER.warn("IOException occurred when SubscriptionProvider {} serialize unsubscribe request {}", new Object[]{this, topicNames, e});
            throw new SubscriptionRuntimeNonCriticalException(e.getMessage(), (Throwable)e);
        }
        try {
            resp = this.getSessionConnection().pipeSubscribe((TPipeSubscribeReq)req);
        }
        catch (TException e) {
            LOGGER.warn("TException occurred when SubscriptionProvider {} unsubscribe with request {}, set SubscriptionProvider unavailable", new Object[]{this, topicNames, e});
            this.setUnavailable();
            throw new SubscriptionConnectionException(e.getMessage(), (Throwable)e);
        }
        AbstractSubscriptionProvider.verifyPipeSubscribeSuccess(resp.status);
        PipeSubscribeUnsubscribeResp unsubscribeResp = PipeSubscribeUnsubscribeResp.fromTPipeSubscribeResp((TPipeSubscribeResp)resp);
        return unsubscribeResp.getTopics();
    }

    List<SubscriptionPollResponse> poll(Set<String> topicNames, long timeoutMs) throws SubscriptionException {
        return this.poll(new SubscriptionPollRequest(SubscriptionPollRequestType.POLL.getType(), (SubscriptionPollPayload)new PollPayload(topicNames), timeoutMs, (long)this.session.getThriftMaxFrameSize()));
    }

    List<SubscriptionPollResponse> pollFile(SubscriptionCommitContext commitContext, long writingOffset, long timeoutMs) throws SubscriptionException {
        return this.poll(new SubscriptionPollRequest(SubscriptionPollRequestType.POLL_FILE.getType(), (SubscriptionPollPayload)new PollFilePayload(commitContext, writingOffset), timeoutMs, (long)this.session.getThriftMaxFrameSize()));
    }

    List<SubscriptionPollResponse> pollTablets(SubscriptionCommitContext commitContext, int offset, long timeoutMs) throws SubscriptionException {
        return this.poll(new SubscriptionPollRequest(SubscriptionPollRequestType.POLL_TABLETS.getType(), (SubscriptionPollPayload)new PollTabletsPayload(commitContext, offset), timeoutMs, (long)this.session.getThriftMaxFrameSize()));
    }

    List<SubscriptionPollResponse> poll(SubscriptionPollRequest pollMessage) throws SubscriptionException {
        TPipeSubscribeResp resp;
        PipeSubscribePollReq req;
        try {
            req = PipeSubscribePollReq.toTPipeSubscribeReq((SubscriptionPollRequest)pollMessage);
        }
        catch (IOException e) {
            LOGGER.warn("IOException occurred when SubscriptionProvider {} serialize poll request {}", new Object[]{this, pollMessage, e});
            throw new SubscriptionRuntimeNonCriticalException(e.getMessage(), (Throwable)e);
        }
        try {
            resp = this.getSessionConnection().pipeSubscribe((TPipeSubscribeReq)req);
        }
        catch (TException e) {
            LOGGER.warn("TException occurred when SubscriptionProvider {} poll with request {}, set SubscriptionProvider unavailable", new Object[]{this, pollMessage, e});
            this.setUnavailable();
            throw new SubscriptionConnectionException(e.getMessage(), (Throwable)e);
        }
        AbstractSubscriptionProvider.verifyPipeSubscribeSuccess(resp.status);
        PipeSubscribePollResp pollResp = PipeSubscribePollResp.fromTPipeSubscribeResp((TPipeSubscribeResp)resp);
        return pollResp.getResponses();
    }

    void commit(List<SubscriptionCommitContext> subscriptionCommitContexts, boolean nack) throws SubscriptionException {
        TPipeSubscribeResp resp;
        PipeSubscribeCommitReq req;
        try {
            req = PipeSubscribeCommitReq.toTPipeSubscribeReq(subscriptionCommitContexts, (boolean)nack);
        }
        catch (IOException e) {
            LOGGER.warn("IOException occurred when SubscriptionProvider {} serialize commit request {}", new Object[]{this, subscriptionCommitContexts, e});
            throw new SubscriptionRuntimeNonCriticalException(e.getMessage(), (Throwable)e);
        }
        try {
            resp = this.getSessionConnection().pipeSubscribe((TPipeSubscribeReq)req);
        }
        catch (TException e) {
            LOGGER.warn("TException occurred when SubscriptionProvider {} commit with request {}, set SubscriptionProvider unavailable", new Object[]{this, subscriptionCommitContexts, e});
            this.setUnavailable();
            throw new SubscriptionConnectionException(e.getMessage(), (Throwable)e);
        }
        AbstractSubscriptionProvider.verifyPipeSubscribeSuccess(resp.status);
    }

    private static void verifyPipeSubscribeSuccess(TSStatus status) throws SubscriptionException {
        switch (status.code) {
            case 200: {
                return;
            }
            case 1902: 
            case 1903: 
            case 1904: 
            case 1905: 
            case 1906: 
            case 1907: 
            case 1908: {
                String errorMessage = String.format(INTERNAL_ERROR_FORMATTER, status.code, status.message);
                LOGGER.warn(errorMessage);
                throw new SubscriptionRuntimeNonCriticalException(errorMessage);
            }
            case 1911: {
                throw new SubscriptionPipeTimeoutException(String.format(SUBSCRIPTION_PIPE_TIMEOUT_FORMATTER, status.code, status.message));
            }
        }
        String errorMessage = String.format(INTERNAL_ERROR_FORMATTER, status.code, status.message);
        LOGGER.warn(errorMessage);
        throw new SubscriptionRuntimeCriticalException(status.message);
    }

    public String toString() {
        return "SubscriptionProvider{endPoint=" + this.endPoint + ", dataNodeId=" + this.dataNodeId + ", consumerId=" + this.consumerId + ", consumerGroupId=" + this.consumerGroupId + ", isAvailable=" + this.isAvailable + ", isClosed=" + this.isClosed + "}";
    }
}

