/*
 * Decompiled with CFR 0.152.
 */
package com.oceanbase.connector.flink.sink;

import com.oceanbase.connector.flink.ConnectorOptions;
import com.oceanbase.connector.flink.sink.OceanBaseWriterEvent;
import com.oceanbase.connector.flink.sink.RecordFlusher;
import com.oceanbase.connector.flink.table.DataChangeRecord;
import com.oceanbase.connector.flink.table.Record;
import com.oceanbase.connector.flink.table.RecordSerializationSchema;
import com.oceanbase.connector.flink.table.SchemaChangeRecord;
import com.oceanbase.connector.flink.table.TransactionRecord;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OceanBaseWriter<T>
implements SinkWriter<T> {
    private static final Logger LOG = LoggerFactory.getLogger(OceanBaseWriter.class);
    private final ConnectorOptions options;
    private final SinkWriterMetricGroup metricGroup;
    private final TypeSerializer<T> typeSerializer;
    private final RecordSerializationSchema<T> recordSerializer;
    private final DataChangeRecord.KeyExtractor keyExtractor;
    private final RecordFlusher recordFlusher;
    private final OceanBaseWriterEvent.Listener writerEventListener;
    private final AtomicReference<Record> currentRecord = new AtomicReference();
    private final Map<String, List<DataChangeRecord>> buffer = new HashMap<String, List<DataChangeRecord>>();
    private final Map<String, Map<Object, DataChangeRecord>> reducedBuffer = new HashMap<String, Map<Object, DataChangeRecord>>();
    private final transient ScheduledExecutorService scheduler;
    private final transient ScheduledFuture<?> scheduledFuture;
    private transient int bufferCount = 0;
    private volatile transient Exception flushException = null;
    private volatile transient boolean closed = false;

    public OceanBaseWriter(ConnectorOptions options, Sink.InitContext initContext, TypeSerializer<T> typeSerializer, RecordSerializationSchema<T> recordSerializer, DataChangeRecord.KeyExtractor keyExtractor, RecordFlusher recordFlusher, OceanBaseWriterEvent.Listener writerEventListener) {
        this.options = options;
        this.metricGroup = initContext.metricGroup();
        this.typeSerializer = typeSerializer;
        this.recordSerializer = recordSerializer;
        this.keyExtractor = keyExtractor;
        this.recordFlusher = recordFlusher;
        this.writerEventListener = writerEventListener;
        this.scheduler = options.getSyncWrite() || options.getBufferFlushInterval() == 0L ? null : new ScheduledThreadPoolExecutor(1, (ThreadFactory)new ExecutorThreadFactory("OceanBaseWriter.scheduler"));
        ScheduledFuture<?> scheduledFuture = this.scheduledFuture = this.scheduler == null ? null : this.scheduler.scheduleWithFixedDelay(() -> {
            if (!this.closed) {
                try {
                    OceanBaseWriter oceanBaseWriter = this;
                    synchronized (oceanBaseWriter) {
                        this.flush(false);
                    }
                }
                catch (Exception e) {
                    this.flushException = e;
                }
            }
        }, options.getBufferFlushInterval(), options.getBufferFlushInterval(), TimeUnit.MILLISECONDS);
        if (!options.getSyncWrite() && keyExtractor == null) {
            throw new IllegalArgumentException("When 'sync-write' is not enabled, keyExtractor is required to construct the buffer key.");
        }
        if (writerEventListener != null) {
            writerEventListener.apply(OceanBaseWriterEvent.INITIALIZED);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void write(T data, SinkWriter.Context context) throws IOException, InterruptedException {
        this.checkFlushException();
        T copy = this.copyIfNecessary(data);
        Record record = this.recordSerializer.serialize(copy);
        if (record == null) {
            return;
        }
        if (this.options.getSyncWrite() && record instanceof DataChangeRecord || record instanceof SchemaChangeRecord || record instanceof TransactionRecord) {
            while (!this.currentRecord.compareAndSet(null, record)) {
                this.flush(false);
            }
            this.flush(false);
        } else if (record instanceof DataChangeRecord) {
            DataChangeRecord dataChangeRecord = (DataChangeRecord)record;
            Object key = this.keyExtractor.extract(dataChangeRecord);
            if (key == null) {
                Map<String, List<DataChangeRecord>> map = this.buffer;
                synchronized (map) {
                    this.buffer.computeIfAbsent(record.getTableId().identifier(), k -> new ArrayList()).add(dataChangeRecord);
                }
            }
            Map<String, Map<Object, DataChangeRecord>> map = this.reducedBuffer;
            synchronized (map) {
                this.reducedBuffer.computeIfAbsent(record.getTableId().identifier(), k -> new HashMap()).put(key, dataChangeRecord);
            }
            ++this.bufferCount;
            if (this.bufferCount >= this.options.getBufferSize()) {
                this.flush(false);
            }
        } else {
            LOG.info("Discard unsupported record: {}", (Object)record);
        }
        this.metricGroup.getIOMetricGroup().getNumRecordsInCounter().inc();
    }

    protected void checkFlushException() {
        if (this.flushException != null) {
            throw new RuntimeException("Writing records to OceanBase failed.", this.flushException);
        }
    }

    private T copyIfNecessary(T record) {
        return (T)(this.typeSerializer == null ? record : this.typeSerializer.copy(record));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void flush(boolean endOfInput) throws IOException, InterruptedException {
        this.checkFlushException();
        for (int i = 0; i <= this.options.getMaxRetries(); ++i) {
            try {
                Map<String, Object> map;
                if (!this.buffer.isEmpty()) {
                    map = this.buffer;
                    synchronized (map) {
                        for (Map.Entry<String, List<DataChangeRecord>> entry : this.buffer.entrySet()) {
                            List<DataChangeRecord> recordList = entry.getValue();
                            this.recordFlusher.flush(recordList);
                            this.metricGroup.getIOMetricGroup().getNumRecordsOutCounter().inc((long)recordList.size());
                        }
                        this.buffer.clear();
                    }
                }
                if (!this.reducedBuffer.isEmpty()) {
                    map = this.reducedBuffer;
                    synchronized (map) {
                        for (Map.Entry<String, Object> entry : this.reducedBuffer.entrySet()) {
                            Map recordMap = (Map)entry.getValue();
                            this.recordFlusher.flush(new ArrayList<DataChangeRecord>(recordMap.values()));
                            this.metricGroup.getIOMetricGroup().getNumRecordsOutCounter().inc((long)recordMap.size());
                        }
                        this.reducedBuffer.clear();
                    }
                }
                this.bufferCount = 0;
                Record record = this.currentRecord.get();
                if (record == null) {
                    return;
                }
                if (record instanceof SchemaChangeRecord) {
                    this.recordFlusher.flush((SchemaChangeRecord)record);
                } else if (record instanceof TransactionRecord) {
                    this.recordFlusher.flush((TransactionRecord)record);
                } else if (record instanceof DataChangeRecord) {
                    this.recordFlusher.flush(Collections.singletonList((DataChangeRecord)record));
                } else {
                    LOG.info("Discard unsupported record: {}", (Object)record);
                }
                this.metricGroup.getIOMetricGroup().getNumRecordsOutCounter().inc();
                this.currentRecord.compareAndSet(record, null);
                break;
            }
            catch (Exception e) {
                LOG.error("OceanBaseWriter flush error, retry times = {}", (Object)i, (Object)e);
                if (i >= this.options.getMaxRetries()) {
                    throw new IOException(e);
                }
                Thread.sleep(1000L * (long)i);
                continue;
            }
        }
    }

    public synchronized void close() {
        if (!this.closed) {
            this.closed = true;
            if (this.scheduledFuture != null) {
                this.scheduledFuture.cancel(false);
                this.scheduler.shutdown();
            }
            if (this.bufferCount > 0) {
                try {
                    this.flush(true);
                }
                catch (Exception e) {
                    LOG.warn("Writing records to OceanBase failed", (Throwable)e);
                    throw new RuntimeException("Writing records to OceanBase failed", e);
                }
            }
            if (this.writerEventListener != null) {
                this.writerEventListener.apply(OceanBaseWriterEvent.CLOSING);
            }
            try {
                if (this.recordFlusher != null) {
                    this.recordFlusher.close();
                }
            }
            catch (Exception e) {
                LOG.warn("Close statement executor failed", (Throwable)e);
            }
        }
        this.checkFlushException();
    }
}

