/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.maintenance.api;

import java.io.IOException;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLNonTransientConnectionException;
import java.sql.SQLTimeoutException;
import java.sql.SQLTransientConnectionException;
import java.util.Map;
import java.util.UUID;
import org.apache.flink.annotation.Internal;
import org.apache.iceberg.flink.maintenance.api.TriggerLockFactory;
import org.apache.iceberg.jdbc.JdbcClientPool;
import org.apache.iceberg.jdbc.UncheckedInterruptedException;
import org.apache.iceberg.jdbc.UncheckedSQLException;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.PropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JdbcLockFactory
implements TriggerLockFactory {
    private static final Logger LOG = LoggerFactory.getLogger(JdbcLockFactory.class);
    @Internal
    public static final String INIT_LOCK_TABLES_PROPERTY = "flink-maintenance.lock.jdbc.init-lock-tables";
    private static final String LOCK_TABLE_NAME = "flink_maintenance_lock";
    private static final int LOCK_ID_MAX_LENGTH = 100;
    private static final String CREATE_LOCK_TABLE_SQL = String.format("CREATE TABLE %s (LOCK_TYPE CHAR(1) NOT NULL, LOCK_ID VARCHAR(%s) NOT NULL, INSTANCE_ID CHAR(36) NOT NULL, PRIMARY KEY (LOCK_TYPE, LOCK_ID))", "flink_maintenance_lock", 100);
    private static final String CREATE_LOCK_SQL = String.format("INSERT INTO %s (LOCK_TYPE, LOCK_ID, INSTANCE_ID) VALUES (?, ?, ?)", "flink_maintenance_lock");
    private static final String GET_LOCK_SQL = String.format("SELECT INSTANCE_ID FROM %s WHERE LOCK_TYPE=? AND LOCK_ID=?", "flink_maintenance_lock");
    private static final String DELETE_LOCK_SQL = String.format("DELETE FROM %s WHERE LOCK_TYPE=? AND LOCK_ID=? AND INSTANCE_ID=?", "flink_maintenance_lock");
    private final String uri;
    private final String lockId;
    private final Map<String, String> properties;
    private transient JdbcClientPool pool;

    public JdbcLockFactory(String uri, String lockId, Map<String, String> properties) {
        Preconditions.checkNotNull(uri, "JDBC connection URI is required");
        Preconditions.checkNotNull(properties, "Properties map is required");
        Preconditions.checkArgument(lockId.length() < 100, "Invalid prefix length: lockId should be shorter than %s", 100);
        this.uri = uri;
        this.lockId = lockId;
        this.properties = properties;
    }

    @Override
    public void open() {
        this.pool = new JdbcClientPool(1, this.uri, this.properties);
        if (PropertyUtil.propertyAsBoolean(this.properties, INIT_LOCK_TABLES_PROPERTY, false)) {
            this.initializeLockTables();
        }
    }

    @VisibleForTesting
    void open(JdbcLockFactory other) {
        this.pool = other.pool;
    }

    @Override
    public TriggerLockFactory.Lock createLock() {
        return new JdbcLock(this.pool, this.lockId, Type.MAINTENANCE);
    }

    @Override
    public TriggerLockFactory.Lock createRecoveryLock() {
        return new JdbcLock(this.pool, this.lockId, Type.RECOVERY);
    }

    @Override
    public void close() throws IOException {
        this.pool.close();
    }

    private void initializeLockTables() {
        LOG.debug("Creating database tables (if missing) to store table maintenance locks");
        try {
            this.pool.run(conn -> {
                DatabaseMetaData dbMeta = conn.getMetaData();
                try (ResultSet rs = dbMeta.getTables(null, null, LOCK_TABLE_NAME, null);){
                    if (rs.next()) {
                        LOG.debug("Flink maintenance lock table already exists");
                        Boolean bl = true;
                        return bl;
                    }
                }
                LOG.info("Creating Flink maintenance lock table {}", (Object)LOCK_TABLE_NAME);
                try (PreparedStatement ps = conn.prepareStatement(CREATE_LOCK_TABLE_SQL);){
                    ps.execute();
                }
                return true;
            });
        }
        catch (SQLTimeoutException e) {
            throw new UncheckedSQLException(e, "Cannot initialize JDBC table maintenance lock: Query timed out", new Object[0]);
        }
        catch (SQLNonTransientConnectionException | SQLTransientConnectionException e) {
            throw new UncheckedSQLException(e, "Cannot initialize JDBC table maintenance lock: Connection failed", new Object[0]);
        }
        catch (SQLException e) {
            throw new UncheckedSQLException(e, "Cannot initialize JDBC table maintenance lock", new Object[0]);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new UncheckedInterruptedException(e, "Interrupted in call to initialize", new Object[0]);
        }
    }

    private static class JdbcLock
    implements TriggerLockFactory.Lock {
        private final JdbcClientPool pool;
        private final String lockId;
        private final Type type;

        private JdbcLock(JdbcClientPool pool, String lockId, Type type) {
            this.pool = pool;
            this.lockId = lockId;
            this.type = type;
        }

        @Override
        public boolean tryLock() {
            if (this.isHeld()) {
                LOG.info("Lock is already held for {}", (Object)this);
                return false;
            }
            String newInstanceId = UUID.randomUUID().toString();
            try {
                return this.pool.run(conn -> {
                    try (PreparedStatement sql = conn.prepareStatement(CREATE_LOCK_SQL);){
                        sql.setString(1, this.type.key);
                        sql.setString(2, this.lockId);
                        sql.setString(3, newInstanceId);
                        int count = sql.executeUpdate();
                        LOG.info("Created {} lock with instanceId {} with row count {}", new Object[]{this, newInstanceId, count});
                        Boolean bl = count == 1;
                        return bl;
                    }
                });
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new UncheckedInterruptedException(e, "Interrupted during tryLock", new Object[0]);
            }
            catch (SQLException e) {
                if (newInstanceId.equals(this.instanceId())) {
                    return true;
                }
                throw new UncheckedSQLException(e, "Failed to create %s lock", this);
            }
        }

        @Override
        public boolean isHeld() {
            try {
                return this.pool.run(conn -> {
                    try (PreparedStatement sql = conn.prepareStatement(GET_LOCK_SQL);){
                        Boolean bl;
                        block12: {
                            sql.setString(1, this.type.key);
                            sql.setString(2, this.lockId);
                            ResultSet rs = sql.executeQuery();
                            try {
                                bl = rs.next();
                                if (rs == null) break block12;
                            }
                            catch (Throwable throwable) {
                                if (rs != null) {
                                    try {
                                        rs.close();
                                    }
                                    catch (Throwable throwable2) {
                                        throwable.addSuppressed(throwable2);
                                    }
                                }
                                throw throwable;
                            }
                            rs.close();
                        }
                        return bl;
                    }
                });
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new UncheckedInterruptedException(e, "Interrupted during isHeld", new Object[0]);
            }
            catch (SQLException e) {
                throw new UncheckedSQLException(e, "Failed to check the state of the lock %s", this);
            }
        }

        @Override
        public void unlock() {
            try {
                String instanceId = this.instanceId();
                if (instanceId != null) {
                    this.pool.run(conn -> {
                        try (PreparedStatement sql = conn.prepareStatement(DELETE_LOCK_SQL);){
                            sql.setString(1, this.type.key);
                            sql.setString(2, this.lockId);
                            sql.setString(3, instanceId);
                            long count = sql.executeUpdate();
                            LOG.info("Deleted {} lock with instanceId {} with row count {}", new Object[]{this, instanceId, count});
                        }
                        catch (SQLException e) {
                            throw new UncheckedSQLException(e, "Failed to delete %s lock with instanceId %s", this, instanceId);
                        }
                        return null;
                    });
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new UncheckedInterruptedException(e, "Interrupted during unlock", new Object[0]);
            }
            catch (SQLException e) {
                throw new UncheckedSQLException(e, "Failed to remove lock %s", this);
            }
        }

        public String toString() {
            return MoreObjects.toStringHelper(this).add("type", (Object)this.type).add("lockId", this.lockId).toString();
        }

        private String instanceId() {
            try {
                return this.pool.run(conn -> {
                    try (PreparedStatement sql = conn.prepareStatement(GET_LOCK_SQL);){
                        String string;
                        block18: {
                            ResultSet rs;
                            block16: {
                                String string2;
                                block17: {
                                    sql.setString(1, this.type.key);
                                    sql.setString(2, this.lockId);
                                    rs = sql.executeQuery();
                                    try {
                                        if (!rs.next()) break block16;
                                        string2 = rs.getString(1);
                                        if (rs == null) break block17;
                                    }
                                    catch (Throwable throwable) {
                                        if (rs != null) {
                                            try {
                                                rs.close();
                                            }
                                            catch (Throwable throwable2) {
                                                throwable.addSuppressed(throwable2);
                                            }
                                        }
                                        throw throwable;
                                    }
                                    rs.close();
                                }
                                return string2;
                            }
                            string = null;
                            if (rs == null) break block18;
                            rs.close();
                        }
                        return string;
                    }
                    catch (SQLException e) {
                        throw new UncheckedSQLException(e, "Failed to get lock information for %s", new Object[]{this.type});
                    }
                });
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new UncheckedInterruptedException(e, "Interrupted during unlock", new Object[0]);
            }
            catch (SQLException e) {
                throw new UncheckedSQLException(e, "Failed to get lock information for %s", new Object[]{this.type});
            }
        }
    }

    private static enum Type {
        MAINTENANCE("m"),
        RECOVERY("r");

        private final String key;

        private Type(String key) {
            this.key = key;
        }
    }
}

