/*
 * Decompiled with CFR 0.152.
 */
package com.taosdata.jdbc.tmq;

import com.taosdata.jdbc.TSDBError;
import com.taosdata.jdbc.common.Consumer;
import com.taosdata.jdbc.tmq.ConsumerRecord;
import com.taosdata.jdbc.tmq.ConsumerRecords;
import com.taosdata.jdbc.tmq.Deserializer;
import com.taosdata.jdbc.tmq.OffsetCommitCallback;
import com.taosdata.jdbc.tmq.OffsetWaitCallback;
import com.taosdata.jdbc.tmq.TMQConnector;
import com.taosdata.jdbc.tmq.TMQResultSet;
import com.taosdata.jdbc.tmq.TopicPartition;
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class JNIConsumer<V>
implements Consumer<V> {
    private final TMQConnector connector;
    private final List<ConsumerRecords<V>> offsetList = new ArrayList<ConsumerRecords<V>>();
    private boolean autoCommit;
    private final Map<Long, OffsetWaitCallback<V>> callbacks = new HashMap<Long, OffsetWaitCallback<V>>();
    ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10), r -> {
        Thread t = new Thread(r);
        t.setName("consumer-callback-" + t.getId());
        return t;
    }, new ThreadPoolExecutor.CallerRunsPolicy());

    public JNIConsumer() {
        this.connector = new TMQConnector();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void create(Properties properties) throws SQLException {
        this.autoCommit = Boolean.parseBoolean(properties.getProperty("enable.auto.commit", "false"));
        long config = this.connector.createConfig(properties);
        try {
            this.connector.createConsumer(config);
        }
        finally {
            this.connector.destroyConf(config);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void subscribe(Collection<String> topics) throws SQLException {
        long topicPointer = 0L;
        try {
            topicPointer = this.connector.createTopic(topics);
            this.connector.subscribe(topicPointer);
        }
        finally {
            if (topicPointer != 0L) {
                this.connector.destroyTopic(topicPointer);
            }
        }
    }

    @Override
    public void unsubscribe() throws SQLException {
        for (ConsumerRecords<V> cr : this.offsetList) {
            this.releaseResultSet(cr.getOffset());
        }
        for (Long offset : this.callbacks.keySet()) {
            this.releaseResultSet(offset);
        }
        this.connector.unsubscribe();
    }

    @Override
    public Set<String> subscription() throws SQLException {
        return this.connector.subscription();
    }

    @Override
    public ConsumerRecords<V> poll(Duration timeout, Deserializer<V> deserializer) throws SQLException {
        long resultSet = this.connector.poll(timeout.toMillis());
        if (resultSet == 0L || resultSet == 9079L) {
            return ConsumerRecords.emptyRecord();
        }
        int timestampPrecision = this.connector.getResultTimePrecision(resultSet);
        ConsumerRecords<V> records = new ConsumerRecords<V>(resultSet);
        try (TMQResultSet rs = new TMQResultSet(this.connector, resultSet, timestampPrecision);){
            while (rs.next()) {
                String topic = this.connector.getTopicName(resultSet);
                String dbName = this.connector.getDbName(resultSet);
                int vGroupId = this.connector.getVgroupId(resultSet);
                long offset = this.connector.getOffset(resultSet);
                TopicPartition tp = new TopicPartition(topic, dbName, vGroupId);
                V v = deserializer.deserialize(rs, topic, dbName);
                ConsumerRecord<V> r = new ConsumerRecord<V>(topic, dbName, vGroupId, offset, v);
                records.put(tp, r);
            }
        }
        if (this.autoCommit) {
            this.releaseResultSet(resultSet);
        } else {
            this.offsetList.add(records);
        }
        return records;
    }

    @Override
    public void commitAsync(OffsetCommitCallback<V> callback) {
        for (ConsumerRecords<V> r : this.offsetList) {
            OffsetWaitCallback<V> offset = new OffsetWaitCallback<V>(r, this, callback);
            this.connector.asyncCommit(r.getOffset(), offset);
            this.callbacks.put(r.getOffset(), offset);
        }
        this.offsetList.clear();
    }

    @Override
    public void seek(TopicPartition partition, long offset) {
        this.connector.seek(partition.getTopic(), partition.getVGroupId(), offset);
    }

    @Override
    public long position(TopicPartition partition) {
        return this.connector.getTopicAssignment(partition.getTopic()).stream().filter(a -> a.getVgId() == partition.getVGroupId()).findFirst().orElseThrow(() -> TSDBError.createIllegalStateException(9082)).getCurrentOffset();
    }

    @Override
    public Map<TopicPartition, Long> position(String topic) {
        return this.connector.getTopicAssignment(topic).stream().collect(HashMap::new, (m, a) -> m.put(new TopicPartition(topic, a.getVgId()), a.getCurrentOffset()), HashMap::putAll);
    }

    @Override
    public Map<TopicPartition, Long> beginningOffsets(String topic) {
        return this.connector.getTopicAssignment(topic).stream().collect(HashMap::new, (m, a) -> m.put(new TopicPartition(topic, a.getVgId()), a.getBegin()), HashMap::putAll);
    }

    @Override
    public Map<TopicPartition, Long> endOffsets(String topic) {
        return this.connector.getTopicAssignment(topic).stream().collect(HashMap::new, (m, a) -> m.put(new TopicPartition(topic, a.getVgId()), a.getEnd()), HashMap::putAll);
    }

    @Override
    public void commitSync() throws SQLException {
        for (ConsumerRecords<V> r : this.offsetList) {
            this.connector.syncCommit(r.getOffset());
            this.releaseResultSet(r.getOffset());
        }
        this.offsetList.clear();
    }

    @Override
    public void close() throws SQLException {
        this.executor.shutdown();
        for (ConsumerRecords<V> cr : this.offsetList) {
            this.releaseResultSet(cr.getOffset());
        }
        for (Long offset : this.callbacks.keySet()) {
            this.releaseResultSet(offset);
        }
        this.connector.closeConsumer();
    }

    public void releaseResultSet(long ptr) throws SQLException {
        int code = this.connector.freeResultSet(ptr);
        if (code == -2) {
            throw TSDBError.createSQLException(9044);
        }
        if (code == -3) {
            throw TSDBError.createSQLException(9045);
        }
    }

    public String getErrMsg(int code) {
        return this.connector.getErrMsg(code);
    }

    public synchronized void closeOffset(long prt) {
        this.callbacks.remove(prt);
    }
}

