/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.runtime.spec_store;

import com.google.common.base.Charsets;
import com.typesafe.config.Config;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.URI;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecSerDe;
import org.apache.gobblin.runtime.spec_store.MysqlBaseSpecStore;
import org.apache.gobblin.runtime.spec_store.MysqlSpecStore;

public class MysqlSpecStoreWithUpdate
extends MysqlSpecStore {
    protected static final String INSERT_STATEMENT_WITHOUT_UPDATE = "INSERT INTO %s (spec_uri, flow_group, flow_name, template_uri, user_to_proxy, source_identifier, destination_identifier, schedule, tag, isRunImmediately, owning_group, spec, spec_json) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";

    public MysqlSpecStoreWithUpdate(Config config, SpecSerDe specSerDe) throws IOException {
        super(config, specSerDe);
    }

    @Override
    protected MysqlBaseSpecStore.SqlStatements createSqlStatements() {
        return new SpecificSqlStatementsWithUpdate();
    }

    @Override
    public Spec updateSpecImpl(Spec spec) throws IOException {
        this.updateSpecImpl(spec, Long.MAX_VALUE);
        return spec;
    }

    @Override
    public Spec updateSpecImpl(Spec spec, long modifiedWatermark) throws IOException {
        this.withPreparedStatement(this.sqlStatements.updateStatement, statement -> {
            ((SpecificSqlStatementsWithUpdate)this.sqlStatements).completeUpdatePreparedStatement((PreparedStatement)statement, spec, modifiedWatermark);
            int i = statement.executeUpdate();
            if (i == 0) {
                throw new IOException("Spec does not exist or concurrent update happens, please check current spec and update again");
            }
            return null;
        }, true);
        return spec;
    }

    protected class SpecificSqlStatementsWithUpdate
    extends MysqlSpecStore.SpecificSqlStatements {
        protected SpecificSqlStatementsWithUpdate() {
            super(MysqlSpecStoreWithUpdate.this);
        }

        public void completeUpdatePreparedStatement(PreparedStatement statement, Spec spec, long modifiedWatermark) throws SQLException {
            FlowSpec flowSpec = (FlowSpec)spec;
            URI specUri = flowSpec.getUri();
            int i = 0;
            statement.setBlob(++i, new ByteArrayInputStream(MysqlSpecStoreWithUpdate.this.specSerDe.serialize(flowSpec)));
            statement.setString(++i, new String(MysqlSpecStoreWithUpdate.this.specSerDe.serialize(flowSpec), Charsets.UTF_8));
            statement.setString(++i, specUri.toString());
            statement.setLong(++i, modifiedWatermark);
        }

        @Override
        protected String getTablelessInsertStatement() {
            return MysqlSpecStoreWithUpdate.INSERT_STATEMENT_WITHOUT_UPDATE;
        }
    }
}

