/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.performance.engine;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.connector.postgresql.PostgresConnector;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.embedded.EmbeddedEngineChangeEvent;
import io.debezium.embedded.EmbeddedEngineConfig;
import io.debezium.embedded.async.AsyncEngineConfig;
import io.debezium.embedded.async.ConvertingAsyncEngineBuilderFactory;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import io.debezium.engine.format.KeyValueChangeEventFormat;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.util.IoUtil;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import javax.management.InstanceNotFoundException;
import javax.management.JMException;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.awaitility.Awaitility;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;

public class PostgresEndToEndPerf {
    private static final String HOST = "localhost";
    private static final int PORT = 5432;
    private static final String USER = "postgres";
    private static final String PASSWORD = "postgres";
    private static final String DATABASE = "postgres";
    private static final String SERVER_NAME = "server1";
    private static final String BASE_TABLE_NAME = "inventory.test";
    private static final KeyValueChangeEventFormat KV_EVENT_FORMAT = KeyValueChangeEventFormat.of(Json.class, Json.class);

    private static JdbcConfiguration defaultJdbcConfig() {
        return (JdbcConfiguration)((JdbcConfiguration.Builder)((JdbcConfiguration.Builder)((JdbcConfiguration.Builder)((JdbcConfiguration.Builder)((JdbcConfiguration.Builder)JdbcConfiguration.copy((Configuration)Configuration.fromSystemProperties((String)"database.")).withDefault(JdbcConfiguration.HOSTNAME, HOST)).withDefault(JdbcConfiguration.PORT, 5432)).withDefault(JdbcConfiguration.USER, "postgres")).withDefault(JdbcConfiguration.PASSWORD, "postgres")).withDefault(JdbcConfiguration.DATABASE, "postgres")).build();
    }

    private static Configuration.Builder defaultConnectorConfig() {
        JdbcConfiguration jdbcConfiguration = PostgresEndToEndPerf.defaultJdbcConfig();
        Configuration.Builder builder = Configuration.create();
        jdbcConfiguration.forEach((f, v) -> builder.with("database." + f, v));
        return (Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)builder.with(CommonConnectorConfig.TOPIC_PREFIX, SERVER_NAME)).with(PostgresConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)).with(PostgresConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)PostgresConnectorConfig.SnapshotMode.NO_DATA)).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, true)).with(EmbeddedEngineConfig.ENGINE_NAME, "benchmark")).with(EmbeddedEngineConfig.CONNECTOR_CLASS, PostgresConnector.class)).with("offset.storage.file.filename", (Object)PostgresEndToEndPerf.getPath("offsets.txt").toAbsolutePath())).with(EmbeddedEngineConfig.OFFSET_FLUSH_INTERVAL_MS, 0);
    }

    private static Properties addSmtConfig(Configuration config) {
        Properties configProps = config.asProperties();
        configProps.setProperty("transforms", "replace");
        configProps.setProperty("transforms.replace.type", "org.apache.kafka.connect.transforms.ReplaceField$Value");
        configProps.setProperty("transforms.replace.renames", "name:transformed_name");
        configProps.setProperty("transforms.replace.exclude", "id");
        return configProps;
    }

    private static Consumer<ChangeEvent<String, String>> getRecordConsumer(BlockingQueue<EmbeddedEngineChangeEvent> consumedLines) {
        return record -> {
            if (Thread.currentThread().isInterrupted()) {
                return;
            }
            while (!consumedLines.offer((EmbeddedEngineChangeEvent)record)) {
                if (!Thread.currentThread().isInterrupted()) continue;
                return;
            }
        };
    }

    private static void recreateTable(String tableName) {
        PostgresConnection connection = PostgresEndToEndPerf.getTestConnection();
        try {
            connection.execute(new String[]{"DROP TABLE IF EXISTS " + tableName});
        }
        catch (SQLException e) {
            e.printStackTrace();
        }
        try {
            connection.execute(new String[]{"CREATE TABLE " + tableName + " (id numeric(9,0) primary key, name varchar(50))"});
        }
        catch (SQLException e) {
            throw new RuntimeException("Failed to create table", e);
        }
    }

    private static void createDmlEvents(String tableName, int eventCount) {
        PostgresConnection connection = PostgresEndToEndPerf.getTestConnection();
        try {
            for (int i = 0; i < eventCount; ++i) {
                StringBuilder dml = new StringBuilder("INSERT INTO " + tableName + " (id, name) values (");
                dml.append(i).append(",").append("'Test").append(i).append("')");
                connection.executeWithoutCommitting(new String[]{dml.toString()});
            }
            connection.commit();
        }
        catch (SQLException e) {
            throw new RuntimeException("Failed to insert data set", e);
        }
    }

    private static PostgresConnection getTestConnection() {
        PostgresConnection connection = new PostgresConnection(PostgresEndToEndPerf.defaultJdbcConfig(), "test_connection");
        try {
            connection.setAutoCommit(false);
        }
        catch (SQLException e) {
            throw new RuntimeException(e);
        }
        return connection;
    }

    private static void waitForStreamingToStart() {
        Awaitility.await().alias("Streaming was not started on time").pollInterval(100L, TimeUnit.MILLISECONDS).atMost(30L, TimeUnit.SECONDS).ignoreException(InstanceNotFoundException.class).until(() -> {
            MBeanServer server = ManagementFactory.getPlatformMBeanServer();
            try {
                return (boolean)((Boolean)server.getAttribute(PostgresEndToEndPerf.getMbeanName(), "Connected"));
            }
            catch (JMException jMException) {
                return false;
            }
        });
    }

    private static ObjectName getMbeanName() throws MalformedObjectNameException {
        return new ObjectName("debezium.postgres:type=connector-metrics,context=streaming,server=server1");
    }

    private static Path getPath(String relativePath) {
        return Paths.get(PostgresEndToEndPerf.resolveDataDir(), relativePath).toAbsolutePath();
    }

    private static void delete(String relativePath) {
        Path history = PostgresEndToEndPerf.getPath(relativePath).toAbsolutePath();
        if (history != null && PostgresEndToEndPerf.inTestDataDir(history = history.toAbsolutePath())) {
            try {
                IoUtil.delete((Path)history);
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }
    }

    private static boolean inTestDataDir(Path path) {
        Path target = FileSystems.getDefault().getPath(PostgresEndToEndPerf.resolveDataDir(), new String[0]).toAbsolutePath();
        return path.toAbsolutePath().startsWith(target);
    }

    private static String resolveDataDir() {
        String value = System.getProperty("dbz.test.data.dir");
        if (value != null && (value = value.trim()).length() > 0) {
            return value;
        }
        value = System.getenv("DBZ_TEST_DATA_DIR");
        if (value != null && (value = value.trim()).length() > 0) {
            return value;
        }
        return "/tmp";
    }

    @Benchmark
    @BenchmarkMode(value={Mode.SingleShotTime})
    @OutputTimeUnit(value=TimeUnit.SECONDS)
    @Fork(value=1)
    @Warmup(iterations=1)
    @Measurement(iterations=1, time=1)
    public void processRecordsAsyncEngine(AsyncEngineEndToEndPerfTest state) {
        ArrayList records = new ArrayList();
        while (records.size() < state.eventCount) {
            ArrayList temp = new ArrayList();
            state.consumedLines.drainTo(temp);
            records.addAll(temp);
        }
    }

    @State(value=Scope.Thread)
    public static class AsyncEngineEndToEndPerfTest
    extends DebeziumEndToEndPerfTest {
        @Param(value={"1", "2", "4", "8", "16"})
        public int threadCount;
        @Param(value={"ORDERED", "UNORDERED"})
        public String processingOrder;

        @Override
        public String getBaseTableName() {
            return "inventory.test_async_" + this.threadCount + "_" + this.processingOrder;
        }

        @Override
        public DebeziumEngine createEngine() {
            Configuration config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)PostgresEndToEndPerf.defaultConnectorConfig().with(PostgresConnectorConfig.SLOT_NAME, "async_" + this.eventCount)).with(AsyncEngineConfig.RECORD_PROCESSING_SHUTDOWN_TIMEOUT_MS, 100)).with(AsyncEngineConfig.TASK_MANAGEMENT_TIMEOUT_MS, 5000)).with(AsyncEngineConfig.RECORD_PROCESSING_THREADS, this.threadCount)).with(AsyncEngineConfig.RECORD_PROCESSING_ORDER, this.processingOrder)).build();
            Properties configProps = PostgresEndToEndPerf.addSmtConfig(config);
            return new ConvertingAsyncEngineBuilderFactory().builder(KV_EVENT_FORMAT).using(configProps).notifying(PostgresEndToEndPerf.getRecordConsumer(this.consumedLines)).using(this.getClass().getClassLoader()).build();
        }
    }

    @State(value=Scope.Thread)
    public static abstract class DebeziumEndToEndPerfTest {
        private DebeziumEngine engine;
        private ExecutorService executors;
        protected BlockingQueue<EmbeddedEngineChangeEvent> consumedLines;
        protected AtomicInteger count = new AtomicInteger(0);
        @Param(value={"100000", "1000000"})
        public int eventCount;

        public abstract String getBaseTableName();

        public abstract DebeziumEngine createEngine();

        @Setup(value=Level.Iteration)
        public void doSetup() {
            String tableName = this.getBaseTableName() + "_" + this.eventCount;
            PostgresEndToEndPerf.delete("offsets.txt");
            PostgresEndToEndPerf.recreateTable(tableName);
            this.consumedLines = new ArrayBlockingQueue<EmbeddedEngineChangeEvent>(this.eventCount);
            this.engine = this.createEngine();
            this.executors = Executors.newFixedThreadPool(1);
            this.executors.execute((Runnable)this.engine);
            PostgresEndToEndPerf.waitForStreamingToStart();
            PostgresEndToEndPerf.createDmlEvents(tableName, this.eventCount);
        }

        @TearDown(value=Level.Iteration)
        public void doCleanup() throws Exception {
            try {
                if (this.engine != null) {
                    this.engine.close();
                }
                if (this.executors != null) {
                    this.executors.shutdown();
                    try {
                        this.executors.awaitTermination(CommonConnectorConfig.DEFAULT_EXECUTOR_SHUTDOWN_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
            finally {
                this.executors.shutdownNow();
                this.engine = null;
                this.executors = null;
            }
        }
    }
}

