/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.performance.engine.connector;

import io.debezium.config.Configuration;
import io.debezium.util.Collect;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;

public class PreComputedRecordsSourceConnector
extends SourceConnector {
    protected static final String VERSION = "1.0";
    public static final String TOPIC_NAME = "simple.topic";
    public static final String RECORD_COUNT_PER_BATCH = "record.count.per.batch";
    public static final String BATCH_COUNT = "batch.count";
    public static final int DEFAULT_RECORD_COUNT_PER_BATCH = 2048;
    public static final int DEFAULT_BATCH_COUNT = 100;
    private Map<String, String> config;

    public String version() {
        return VERSION;
    }

    public void start(Map<String, String> props) {
        this.config = props;
    }

    public Class<? extends Task> taskClass() {
        return PreComputedRecordsTask.class;
    }

    public List<Map<String, String>> taskConfigs(int maxTasks) {
        ArrayList<Map<String, String>> configs = new ArrayList<Map<String, String>>();
        configs.add(this.config);
        return configs;
    }

    public void stop() {
    }

    public ConfigDef config() {
        return new ConfigDef();
    }

    private static List<SourceRecord> precomputeRecords(int numberOfRecords) {
        Schema keySchema = SchemaBuilder.struct().name("simple.key").field("id", Schema.INT32_SCHEMA).build();
        Schema valueSchema = SchemaBuilder.struct().name("simple.value").field("name", Schema.STRING_SCHEMA).field("surname", Schema.STRING_SCHEMA).field("address", Schema.STRING_SCHEMA).field("batch", Schema.INT32_SCHEMA).field("record", Schema.INT32_SCHEMA).field("timestamp", Schema.OPTIONAL_INT64_SCHEMA).build();
        LinkedList<SourceRecord> records = new LinkedList<SourceRecord>();
        Random random = new Random();
        long initialTimestamp = System.currentTimeMillis();
        for (int recordNum = 0; recordNum != numberOfRecords; ++recordNum) {
            Struct key = new Struct(keySchema);
            key.put("id", (Object)recordNum);
            Struct value = new Struct(valueSchema);
            value.put("name", (Object)PreComputedRecordsSourceConnector.randomString(random, 10));
            value.put("surname", (Object)PreComputedRecordsSourceConnector.randomString(random, 20));
            value.put("address", (Object)PreComputedRecordsSourceConnector.randomString(random, 30));
            value.put("batch", (Object)1);
            value.put("record", (Object)(recordNum + 1));
            value.put("timestamp", (Object)(initialTimestamp + (long)recordNum));
            SourceRecord record = new SourceRecord(Collect.hashMapOf((Object)"source", (Object)"simple"), Collect.hashMapOf((Object)"id", (Object)recordNum), TOPIC_NAME, Integer.valueOf(1), keySchema, (Object)key, valueSchema, (Object)value);
            records.add(record);
        }
        return records;
    }

    private static String randomString(Random random, int length) {
        int ASCII_CHAR_START = 97;
        int ASCII_CHAR_END = 123;
        return random.ints(97, 123).limit(length).collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append).toString();
    }

    public static class PreComputedRecordsTask
    extends SourceTask {
        private static List<SourceRecord> records = PreComputedRecordsSourceConnector.precomputeRecords(204800);
        private final AtomicBoolean running = new AtomicBoolean();
        private int batchCount;
        private int recordsPerBatch;
        private int recordsFrom;
        private int currentBatch;
        private int recordsSent;

        public String version() {
            return PreComputedRecordsSourceConnector.VERSION;
        }

        public void start(Map<String, String> props) {
            if (this.running.compareAndSet(false, true)) {
                Configuration config = Configuration.from(props);
                this.recordsPerBatch = config.getInteger(PreComputedRecordsSourceConnector.RECORD_COUNT_PER_BATCH, 2048);
                this.batchCount = config.getInteger(PreComputedRecordsSourceConnector.BATCH_COUNT, 100);
                this.recordsFrom = 0;
                this.currentBatch = 0;
                this.recordsSent = 0;
            }
        }

        public List<SourceRecord> poll() throws InterruptedException {
            if (this.running.get()) {
                int recordsTo;
                if (this.recordsFrom + (this.currentBatch + 1) * this.recordsPerBatch >= 204800) {
                    this.recordsFrom = 0;
                    this.currentBatch = 0;
                }
                if (this.recordsFrom > (recordsTo = Math.min((this.currentBatch + 1) * this.recordsPerBatch, records.size() - this.recordsFrom) - 1)) {
                    return null;
                }
                List<SourceRecord> batch = records.subList(this.recordsFrom, recordsTo);
                ++this.currentBatch;
                this.recordsSent += batch.size();
                this.recordsFrom = recordsTo;
                return batch;
            }
            return null;
        }

        public void stop() {
            this.running.set(false);
        }
    }
}

