/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source.operator;

import java.io.Serializable;
import java.time.Duration;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.flink.compact.MultiAwareBucketTableScan;
import org.apache.paimon.flink.compact.MultiTableScanBase;
import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader;
import org.apache.paimon.flink.source.SimpleSourceSplit;
import org.apache.paimon.flink.source.operator.CombinedCompactorSource;
import org.apache.paimon.flink.source.operator.MultiTablesReadOperator;
import org.apache.paimon.flink.utils.JavaTypeInfo;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.Split;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CombinedAwareBatchSource
extends CombinedCompactorSource<Tuple2<Split, String>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(CombinedAwareBatchSource.class);
    private final Map<String, String> tableOptions;

    public CombinedAwareBatchSource(CatalogLoader catalogLoader, Pattern includingPattern, Pattern excludingPattern, Pattern databasePattern, Map<String, String> tableOptions) {
        super(catalogLoader, includingPattern, excludingPattern, databasePattern, false);
        this.tableOptions = tableOptions;
    }

    public SourceReader<Tuple2<Split, String>, SimpleSourceSplit> createReader(SourceReaderContext sourceReaderContext) throws Exception {
        return new Reader();
    }

    public static DataStream<RowData> buildSource(StreamExecutionEnvironment env, String name, TypeInformation<RowData> typeInfo, CatalogLoader catalogLoader, Pattern includingPattern, Pattern excludingPattern, Pattern databasePattern, Map<String, String> tableOptions, Duration partitionIdleTime) {
        CombinedAwareBatchSource source = new CombinedAwareBatchSource(catalogLoader, includingPattern, excludingPattern, databasePattern, tableOptions);
        TupleTypeInfo tupleTypeInfo = new TupleTypeInfo(new TypeInformation[]{new JavaTypeInfo<Split>(Split.class), BasicTypeInfo.STRING_TYPE_INFO});
        return env.fromSource((Source)source, WatermarkStrategy.noWatermarks(), name, (TypeInformation)tupleTypeInfo).forceNonParallel().partitionCustom((Partitioner & Serializable)(key, numPartitions) -> key % numPartitions, (KeySelector & Serializable)split -> ((DataSplit)split.f0).bucket()).transform(name, typeInfo, (OneInputStreamOperator)new MultiTablesReadOperator(catalogLoader, false, partitionIdleTime));
    }

    private class Reader
    extends AbstractNonCoordinatedSourceReader<Tuple2<Split, String>> {
        private MultiTableScanBase<Tuple2<Split, String>> tableScan;

        private Reader() {
        }

        @Override
        public void start() {
            super.start();
            this.tableScan = new MultiAwareBucketTableScan(CombinedAwareBatchSource.this.catalogLoader, CombinedAwareBatchSource.this.includingPattern, CombinedAwareBatchSource.this.excludingPattern, CombinedAwareBatchSource.this.databasePattern, CombinedAwareBatchSource.this.isStreaming, CombinedAwareBatchSource.this.tableOptions);
        }

        public InputStatus pollNext(ReaderOutput<Tuple2<Split, String>> readerOutput) throws Exception {
            MultiTableScanBase.ScanResult scanResult = this.tableScan.scanTable(readerOutput);
            if (scanResult == MultiTableScanBase.ScanResult.FINISHED) {
                return InputStatus.END_OF_INPUT;
            }
            if (scanResult == MultiTableScanBase.ScanResult.IS_EMPTY) {
                LOGGER.info("No file were collected for the table of aware-bucket");
            }
            return InputStatus.END_OF_INPUT;
        }

        @Override
        public void close() throws Exception {
            super.close();
            if (this.tableScan != null) {
                this.tableScan.close();
            }
        }
    }
}

