/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.oracle.logminer.logwriter;

import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.logwriter.CommitLogWriterFlushStrategy;
import io.debezium.connector.oracle.logminer.logwriter.LogWriterFlushStrategy;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import io.debezium.util.Strings;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RacCommitLogWriterFlushStrategy
implements LogWriterFlushStrategy {
    private static final Logger LOGGER = LoggerFactory.getLogger(RacCommitLogWriterFlushStrategy.class);
    private final Map<String, CommitLogWriterFlushStrategy> flushStrategies = new HashMap<String, CommitLogWriterFlushStrategy>();
    private final OracleStreamingChangeEventSourceMetrics streamingMetrics;
    private final JdbcConfiguration jdbcConfiguration;
    private final OracleConnectorConfig connectorConfig;
    private final Set<String> hosts;

    public RacCommitLogWriterFlushStrategy(OracleConnectorConfig connectorConfig, JdbcConfiguration jdbcConfig, OracleStreamingChangeEventSourceMetrics streamingMetrics) {
        this.jdbcConfiguration = jdbcConfig;
        this.streamingMetrics = streamingMetrics;
        this.connectorConfig = connectorConfig;
        this.hosts = connectorConfig.getRacNodes().stream().map(String::toUpperCase).collect(Collectors.toSet());
        this.recreateRacNodeFlushStrategies();
    }

    @Override
    public void close() {
        this.closeRacNodeFlushStrategies();
        this.flushStrategies.clear();
    }

    @Override
    public String getHost() {
        return String.join((CharSequence)", ", this.hosts);
    }

    @Override
    public void flush(Scn currentScn) throws InterruptedException {
        Instant startTime = Instant.now();
        if (this.flushStrategies.isEmpty()) {
            throw new DebeziumException("No RAC node addresses supplied or currently connected");
        }
        boolean recreateConnections = false;
        for (Map.Entry<String, CommitLogWriterFlushStrategy> entry : this.flushStrategies.entrySet()) {
            CommitLogWriterFlushStrategy strategy = entry.getValue();
            try {
                strategy.flush(currentScn);
            }
            catch (Exception e) {
                LOGGER.warn("Failed to flush LGWR buffer on RAC node '{}'", (Object)strategy.getHost(), (Object)e);
                recreateConnections = true;
            }
        }
        if (recreateConnections) {
            this.recreateRacNodeFlushStrategies();
            LOGGER.warn("Not all LGWR buffers were flushed, waiting 3 seconds for Oracle to flush automatically.");
            Metronome metronome = Metronome.sleeper((Duration)Duration.ofSeconds(3L), (Clock)Clock.SYSTEM);
            try {
                metronome.pause();
            }
            catch (InterruptedException e) {
                LOGGER.warn("The LGWR buffer wait was interrupted.");
                throw e;
            }
        }
        LOGGER.trace("LGWR flush took {} to complete.", (Object)Duration.between(startTime, Instant.now()));
    }

    private void recreateRacNodeFlushStrategies() {
        this.closeRacNodeFlushStrategies();
        this.flushStrategies.clear();
        for (String hostName : this.hosts) {
            try {
                String[] parts = this.parseHostName(hostName);
                this.flushStrategies.put(hostName, this.createHostFlushStrategy(parts[0], Integer.parseInt(parts[1]), parts[2]));
            }
            catch (SQLException e) {
                throw new DebeziumException("Cannot connect to RAC node '" + hostName + "'", (Throwable)e);
            }
        }
    }

    private String[] parseHostName(String hostName) {
        String[] parts = new String[3];
        String[] colonParts = hostName.split(":");
        parts[0] = colonParts[0];
        if (colonParts[1].contains("/")) {
            int slashIndex = colonParts[1].indexOf(47);
            parts[1] = colonParts[1].substring(0, slashIndex);
            parts[2] = colonParts[1].substring(slashIndex + 1);
            return parts;
        }
        parts[1] = colonParts[1];
        parts[2] = null;
        return parts;
    }

    private CommitLogWriterFlushStrategy createHostFlushStrategy(String hostName, Integer port, String sid) throws SQLException {
        Configuration.Builder jdbcConfigBuilder = (Configuration.Builder)((Configuration.Builder)this.jdbcConfiguration.edit().with(JdbcConfiguration.HOSTNAME, hostName)).with(JdbcConfiguration.PORT, (Object)port);
        if (!Strings.isNullOrBlank((String)sid)) {
            jdbcConfigBuilder = (Configuration.Builder)jdbcConfigBuilder.with(JdbcConfiguration.DATABASE, sid);
        }
        JdbcConfiguration jdbcHostConfig = JdbcConfiguration.adapt((Configuration)jdbcConfigBuilder.build());
        LOGGER.debug("Creating flush connection to RAC node '{}'", (Object)hostName);
        return new CommitLogWriterFlushStrategy(this.connectorConfig, jdbcHostConfig);
    }

    private void closeRacNodeFlushStrategies() {
        for (CommitLogWriterFlushStrategy strategy : this.flushStrategies.values()) {
            try {
                strategy.close();
            }
            catch (Exception e) {
                LOGGER.warn("Failed to close RAC connection to node '{}'", (Object)strategy.getHost(), (Object)e);
                this.streamingMetrics.incrementWarningCount();
            }
        }
    }
}

