/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.connector;

import com.hazelcast.function.BiConsumerEx;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.internal.util.Preconditions;
import com.hazelcast.internal.util.concurrent.BackoffIdleStrategy;
import com.hazelcast.internal.util.concurrent.IdleStrategy;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.config.ProcessingGuarantee;
import com.hazelcast.jet.core.Inbox;
import com.hazelcast.jet.core.Outbox;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.impl.connector.XaSinkProcessorBase;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.SQLNonTransientException;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import javax.sql.CommonDataSource;
import javax.sql.DataSource;
import javax.sql.PooledConnection;
import javax.sql.XAConnection;
import javax.sql.XADataSource;

public final class WriteJdbcP<T>
extends XaSinkProcessorBase {
    private static final IdleStrategy IDLER = new BackoffIdleStrategy(0L, 0L, TimeUnit.SECONDS.toNanos(1L), TimeUnit.SECONDS.toNanos(3L));
    private final CommonDataSource dataSource;
    private final BiConsumerEx<? super PreparedStatement, ? super T> bindFn;
    private final String updateQuery;
    private final int batchLimit;
    private ILogger logger;
    private XAConnection xaConnection;
    private Connection connection;
    private PreparedStatement statement;
    private int idleCount;
    private boolean supportsBatch;
    private int batchCount;

    private WriteJdbcP(@Nonnull String updateQuery, @Nonnull CommonDataSource dataSource, @Nonnull BiConsumerEx<? super PreparedStatement, ? super T> bindFn, boolean exactlyOnce, int batchLimit) {
        super(exactlyOnce ? ProcessingGuarantee.EXACTLY_ONCE : ProcessingGuarantee.AT_LEAST_ONCE);
        this.updateQuery = updateQuery;
        this.dataSource = dataSource;
        this.bindFn = bindFn;
        this.batchLimit = batchLimit;
    }

    public static <T> ProcessorMetaSupplier metaSupplier(final @Nonnull String updateQuery, final @Nonnull SupplierEx<? extends CommonDataSource> dataSourceSupplier, final @Nonnull BiConsumerEx<? super PreparedStatement, ? super T> bindFn, final boolean exactlyOnce, final int batchLimit) {
        Util.checkSerializable(dataSourceSupplier, "newConnectionFn");
        Util.checkSerializable(bindFn, "bindFn");
        Preconditions.checkPositive(batchLimit, "batchLimit");
        return ProcessorMetaSupplier.preferLocalParallelismOne(new ProcessorSupplier(){
            private transient CommonDataSource dataSource;

            @Override
            public void init(@Nonnull ProcessorSupplier.Context context) {
                this.dataSource = (CommonDataSource)dataSourceSupplier.get();
            }

            @Override
            @Nonnull
            public Collection<? extends Processor> get(int count) {
                return IntStream.range(0, count).mapToObj(i -> new WriteJdbcP(updateQuery, this.dataSource, bindFn, exactlyOnce, batchLimit)).collect(Collectors.toList());
            }
        });
    }

    @Override
    public void init(@Nonnull Outbox outbox, @Nonnull Processor.Context context) throws Exception {
        super.init(outbox, context);
        DriverManager.getDrivers();
        this.logger = context.logger();
        this.connectAndPrepareStatement();
    }

    @Override
    public boolean tryProcess() {
        if (!this.reconnectIfNecessary()) {
            return false;
        }
        return super.tryProcess();
    }

    @Override
    public void process(int ordinal, @Nonnull Inbox inbox) {
        if (!this.reconnectIfNecessary() || this.snapshotUtility.activeTransaction() == null) {
            return;
        }
        try {
            Iterator<Object> iterator = inbox.iterator();
            while (iterator.hasNext()) {
                Object item;
                Object castItem = item = iterator.next();
                this.bindFn.accept((PreparedStatement)((PreparedStatement)this.statement), (PreparedStatement)castItem);
                this.addBatchOrExecute();
            }
            this.executeBatch();
            if (!this.snapshotUtility.usesTransactionLifecycle()) {
                this.connection.commit();
            }
            this.idleCount = 0;
            inbox.clear();
        }
        catch (SQLException e) {
            if (e instanceof SQLNonTransientException || e.getCause() instanceof SQLNonTransientException || this.snapshotUtility.usesTransactionLifecycle()) {
                throw ExceptionUtil.rethrow(e);
            }
            this.logger.warning("Exception during update", e);
            ++this.idleCount;
        }
    }

    @Override
    public boolean tryProcessWatermark(@Nonnull Watermark watermark) {
        return true;
    }

    @Override
    public void close() throws Exception {
        super.close();
        this.closeWithLogging(this.statement);
        if (this.xaConnection != null) {
            this.closeWithLogging(this.xaConnection);
        }
        this.closeWithLogging(this.connection);
    }

    private boolean connectAndPrepareStatement() {
        try {
            if (this.snapshotUtility.usesTransactionLifecycle()) {
                if (!(this.dataSource instanceof XADataSource)) {
                    throw new JetException("When using exactly-once, the dataSource must implement " + XADataSource.class.getName());
                }
                this.xaConnection = ((XADataSource)this.dataSource).getXAConnection();
                this.connection = this.xaConnection.getConnection();
                assert (this.idleCount == 0) : "idleCount=" + this.idleCount;
                this.setXaResource(this.xaConnection.getXAResource());
            } else if (this.dataSource instanceof DataSource) {
                this.connection = ((DataSource)this.dataSource).getConnection();
            } else if (this.dataSource instanceof XADataSource) {
                this.logger.warning("Using " + XADataSource.class.getName() + " when no XA transactions are needed");
                XAConnection xaConnection = ((XADataSource)this.dataSource).getXAConnection();
                this.connection = xaConnection.getConnection();
            } else {
                throw new JetException("The dataSource implements neither " + DataSource.class.getName() + " nor " + XADataSource.class.getName());
            }
            this.connection.setAutoCommit(false);
            this.supportsBatch = this.connection.getMetaData().supportsBatchUpdates();
            this.statement = this.connection.prepareStatement(this.updateQuery);
        }
        catch (SQLException e) {
            if (this.snapshotUtility.usesTransactionLifecycle()) {
                throw ExceptionUtil.rethrow(e);
            }
            this.logger.warning("Exception when connecting and preparing the statement", e);
            ++this.idleCount;
            return false;
        }
        return true;
    }

    private void addBatchOrExecute() throws SQLException {
        if (!this.supportsBatch) {
            this.statement.executeUpdate();
            return;
        }
        this.statement.addBatch();
        if (++this.batchCount == this.batchLimit) {
            this.executeBatch();
        }
    }

    private void executeBatch() throws SQLException {
        if (this.supportsBatch && this.batchCount > 0) {
            this.statement.executeBatch();
            this.batchCount = 0;
        }
    }

    private boolean reconnectIfNecessary() {
        if (this.idleCount == 0) {
            return true;
        }
        assert (!this.snapshotUtility.usesTransactionLifecycle()) : "attempt to reconnect in XA mode";
        IDLER.idle(this.idleCount);
        this.closeWithLogging(this.statement);
        this.closeWithLogging(this.connection);
        return this.connectAndPrepareStatement();
    }

    private void closeWithLogging(PooledConnection closeable) {
        if (closeable == null) {
            return;
        }
        try {
            closeable.close();
        }
        catch (Exception e) {
            this.logger.warning("Exception when closing " + closeable + ", ignoring it: " + e, e);
        }
    }

    private void closeWithLogging(AutoCloseable closeable) {
        if (closeable == null) {
            return;
        }
        try {
            closeable.close();
        }
        catch (Exception e) {
            this.logger.warning("Exception when closing " + closeable + ", ignoring it: " + e, e);
        }
    }
}

