/*
 * Decompiled with CFR 0.152.
 */
package com.taosdata.jdbc.tmq;

import com.taosdata.jdbc.TSDBError;
import com.taosdata.jdbc.tmq.CallbackResult;
import com.taosdata.jdbc.tmq.ConsumerRecords;
import com.taosdata.jdbc.tmq.Deserializer;
import com.taosdata.jdbc.tmq.MapDeserializer;
import com.taosdata.jdbc.tmq.OffsetCommitCallback;
import com.taosdata.jdbc.tmq.TConsumer;
import com.taosdata.jdbc.tmq.TMQConnector;
import com.taosdata.jdbc.utils.StringUtils;
import com.taosdata.jdbc.utils.Utils;
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class TaosConsumer<V>
implements TConsumer<V> {
    private static final long NO_CURRENT_THREAD = -1L;
    private final AtomicLong currentThread = new AtomicLong(-1L);
    private final AtomicInteger refcount = new AtomicInteger(0);
    private volatile boolean closed = false;
    private Deserializer<V> deserializer;
    long resultSet;
    private final TMQConnector connector;
    private OffsetCommitCallback callback;
    List<V> list = new ArrayList<V>();
    ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10), r -> {
        Thread t = new Thread(r);
        t.setName("consumer-callback-" + t.getId());
        return t;
    }, new ThreadPoolExecutor.CallerRunsPolicy());

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public TaosConsumer(Properties properties) throws SQLException {
        String s3;
        this.connector = new TMQConnector();
        if (null == properties) {
            throw TSDBError.createSQLException(-100, "consumer properties must not be null!");
        }
        String servers = properties.getProperty("bootstrap.servers");
        if (!StringUtils.isEmpty(servers)) {
            Arrays.stream(servers.split(",")).filter(s2 -> !StringUtils.isEmpty(s2)).findFirst().ifPresent(s2 -> {
                String[] host = s2.split(":");
                properties.setProperty("td.connect.ip", host[0]);
                properties.setProperty("td.connect.port", host[1]);
            });
        }
        this.deserializer = !StringUtils.isEmpty(s3 = properties.getProperty("value.deserializer")) ? (Deserializer)Utils.newInstance(Utils.parseClassType(s3)) : new MapDeserializer();
        this.deserializer.configure(properties);
        long config = this.connector.createConfig(properties);
        try {
            this.connector.createConsumer(config);
        }
        finally {
            this.connector.destroyConf(config);
        }
    }

    public void commitCallbackHandler(int code) {
        CallbackResult r = new CallbackResult(code, this.list);
        if (0 != code) {
            SQLException exception = TSDBError.createSQLException(code, this.connector.getErrMsg(code));
            this.executor.submit(() -> this.callback.onComplete(r, exception));
        } else {
            this.executor.submit(() -> this.callback.onComplete(r, null));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void subscribe(Collection<String> topics) throws SQLException {
        this.acquireAndEnsureOpen();
        long topicPointer = 0L;
        try {
            topicPointer = this.connector.createTopic(topics);
            this.connector.subscribe(topicPointer);
        }
        finally {
            if (topicPointer != 0L) {
                this.connector.destroyTopic(topicPointer);
            }
            this.release();
        }
    }

    @Override
    public void unsubscribe() throws SQLException {
        this.acquireAndEnsureOpen();
        try {
            this.connector.unsubscribe();
        }
        finally {
            this.release();
        }
    }

    @Override
    public Set<String> subscription() throws SQLException {
        this.acquireAndEnsureOpen();
        try {
            Set<String> set = this.connector.subscription();
            return set;
        }
        finally {
            this.release();
        }
    }

    /*
     * Exception decompiling
     */
    @Override
    public ConsumerRecords<V> poll(Duration timeout) throws SQLException {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    @Override
    public void commitAsync() {
        this.connector.asyncCommit(0L, this);
    }

    @Override
    public void commitAsync(OffsetCommitCallback callback) {
        this.callback = callback;
        this.connector.asyncCommit(0L, this);
    }

    @Override
    public void commitSync() throws SQLException {
        this.connector.syncCommit(0L);
    }

    private void acquireAndEnsureOpen() {
        this.acquire();
        if (this.closed) {
            this.release();
            throw new IllegalStateException("This consumer has already been closed.");
        }
    }

    private void acquire() {
        long threadId = Thread.currentThread().getId();
        if (threadId != this.currentThread.get() && !this.currentThread.compareAndSet(-1L, threadId)) {
            throw new ConcurrentModificationException("Consumer is not safe for multi-threaded access");
        }
        this.refcount.incrementAndGet();
    }

    private void release() {
        if (this.refcount.decrementAndGet() == 0) {
            this.currentThread.set(-1L);
        }
    }

    @Override
    public void close() throws SQLException {
        this.acquire();
        try {
            this.executor.shutdown();
            this.connector.closeConsumer();
        }
        finally {
            this.closed = true;
            this.release();
        }
    }
}

