/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.connector;

import com.hazelcast.jet.core.Inbox;
import com.hazelcast.jet.core.Outbox;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.function.BiConsumerEx;
import com.hazelcast.jet.function.SupplierEx;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.util.concurrent.BackoffIdleStrategy;
import com.hazelcast.util.concurrent.IdleStrategy;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.SQLNonTransientException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;

public final class WriteJdbcP<T>
implements Processor {
    private static final IdleStrategy IDLER = new BackoffIdleStrategy(0L, 0L, TimeUnit.SECONDS.toNanos(1L), TimeUnit.SECONDS.toNanos(10L));
    private static final int BATCH_LIMIT = 50;
    private final SupplierEx<? extends Connection> newConnectionFn;
    private final BiConsumerEx<? super PreparedStatement, ? super T> bindFn;
    private final String updateQuery;
    private ILogger logger;
    private Connection connection;
    private PreparedStatement statement;
    private List<T> itemList = new ArrayList<T>();
    private int idleCount;
    private boolean supportsBatch;
    private int batchCount;

    private WriteJdbcP(@Nonnull String updateQuery, @Nonnull SupplierEx<? extends Connection> newConnectionFn, @Nonnull BiConsumerEx<? super PreparedStatement, ? super T> bindFn) {
        this.updateQuery = updateQuery;
        this.newConnectionFn = newConnectionFn;
        this.bindFn = bindFn;
    }

    public static <T> ProcessorMetaSupplier metaSupplier(@Nonnull String updateQuery, @Nonnull SupplierEx<? extends Connection> newConnectionFn, @Nonnull BiConsumerEx<? super PreparedStatement, ? super T> bindFn) {
        Util.checkSerializable(newConnectionFn, "newConnectionFn");
        Util.checkSerializable(bindFn, "bindFn");
        return ProcessorMetaSupplier.preferLocalParallelismOne(() -> new WriteJdbcP(updateQuery, newConnectionFn, bindFn));
    }

    @Override
    public void init(@Nonnull Outbox outbox, @Nonnull Processor.Context context) {
        this.logger = context.logger();
        this.connectAndPrepareStatement();
    }

    @Override
    public void process(int ordinal, @Nonnull Inbox inbox) {
        inbox.drainTo(this.itemList);
        while (!this.itemList.isEmpty()) {
            if (!this.reconnectIfNecessary()) continue;
            try {
                for (T item : this.itemList) {
                    this.bindFn.accept((PreparedStatement)((PreparedStatement)this.statement), (PreparedStatement)item);
                    this.addBatchOrExecute();
                }
                this.executeBatch();
                this.connection.commit();
                this.itemList.clear();
                this.idleCount = 0;
            }
            catch (Exception e) {
                if (e instanceof SQLNonTransientException || e.getCause() instanceof SQLNonTransientException) {
                    throw ExceptionUtil.rethrow(e);
                }
                this.logger.warning("Exception during update", e.getCause());
                ++this.idleCount;
            }
        }
    }

    @Override
    public boolean tryProcessWatermark(@Nonnull Watermark watermark) {
        return true;
    }

    @Override
    public boolean isCooperative() {
        return false;
    }

    @Override
    public void close() {
        this.closeWithLogging(this.statement);
        this.closeWithLogging(this.connection);
    }

    private boolean connectAndPrepareStatement() {
        try {
            this.connection = this.newConnectionFn.get();
            this.connection.setAutoCommit(false);
            this.supportsBatch = this.connection.getMetaData().supportsBatchUpdates();
            this.statement = this.connection.prepareStatement(this.updateQuery);
        }
        catch (Exception e) {
            this.logger.warning("Exception during connecting and preparing the statement", (Throwable)e);
            ++this.idleCount;
            return false;
        }
        return true;
    }

    private void addBatchOrExecute() throws SQLException {
        if (!this.supportsBatch) {
            this.statement.executeUpdate();
            return;
        }
        this.statement.addBatch();
        if (++this.batchCount == 50) {
            this.statement.executeBatch();
            this.batchCount = 0;
        }
    }

    private void executeBatch() throws SQLException {
        if (this.supportsBatch) {
            this.statement.executeBatch();
            this.batchCount = 0;
        }
    }

    private boolean reconnectIfNecessary() {
        if (this.idleCount == 0) {
            return true;
        }
        IDLER.idle((long)this.idleCount);
        this.close();
        return this.connectAndPrepareStatement();
    }

    private void closeWithLogging(AutoCloseable closeable) {
        if (closeable == null) {
            return;
        }
        try {
            closeable.close();
        }
        catch (Exception e) {
            this.logger.warning("Exception during closing " + closeable, (Throwable)e);
        }
    }
}

