/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.nodes.exec.common;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.runtime.operators.sink.TestSink;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.factories.TableFactoryHarness;
import org.apache.flink.table.types.AbstractDataType;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.testutils.junit.SharedObjects;
import org.apache.flink.testutils.junit.SharedReference;
import org.apache.flink.types.Row;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

public class CommonExecSinkITCase
extends AbstractTestBase {
    private StreamExecutionEnvironment env;
    @Rule
    public final SharedObjects sharedObjects = SharedObjects.create();

    @Before
    public void before() {
        this.env = StreamExecutionEnvironment.getExecutionEnvironment();
        this.env.setParallelism(4);
    }

    @Test
    public void testStreamRecordTimestampInserterSinkRuntimeProvider() throws ExecutionException, InterruptedException {
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)this.env);
        final SharedReference timestamps = this.sharedObjects.add(new ArrayList());
        List<Row> rows = Arrays.asList(Row.of((Object[])new Object[]{1, "foo", Instant.parse("2020-11-10T12:34:56.123Z")}), Row.of((Object[])new Object[]{2, "foo", Instant.parse("2020-11-10T11:34:56.789Z")}), Row.of((Object[])new Object[]{3, "foo", Instant.parse("2020-11-11T10:11:22.777Z")}), Row.of((Object[])new Object[]{4, "foo", Instant.parse("2020-11-11T10:11:23.888Z")}));
        TableDescriptor sourceDescriptor = TableFactoryHarness.newBuilder().schema(CommonExecSinkITCase.schemaStreamRecordTimestampInserter(true)).source(new TimestampTestSource(rows)).sink(new TableFactoryHarness.SinkBase(){

            @Override
            public DynamicTableSink.SinkRuntimeProvider getSinkRuntimeProvider(DynamicTableSink.Context context) {
                return SinkProvider.of((Sink)TestSink.newBuilder().setWriter((TestSink.DefaultSinkWriter)new TestTimestampWriter(timestamps)).setCommittableSerializer((SimpleVersionedSerializer)TestSink.StringCommittableSerializer.INSTANCE).build());
            }
        }).build();
        tableEnv.createTable("T1", sourceDescriptor);
        String sqlStmt = "INSERT INTO T1 SELECT * FROM T1";
        CommonExecSinkITCase.assertPlan(tableEnv, sqlStmt, true);
        tableEnv.executeSql(sqlStmt).await();
        CommonExecSinkITCase.assertTimestampResults((SharedReference<List<Long>>)timestamps, rows);
    }

    @Test
    public void testStreamRecordTimestampInserterDataStreamSinkProvider() throws ExecutionException, InterruptedException {
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)this.env);
        final SharedReference timestamps = this.sharedObjects.add(new ArrayList());
        List<Row> rows = Arrays.asList(Row.of((Object[])new Object[]{1, "foo", Instant.parse("2020-11-10T11:34:56.123Z")}), Row.of((Object[])new Object[]{2, "foo", Instant.parse("2020-11-10T12:34:56.789Z")}), Row.of((Object[])new Object[]{3, "foo", Instant.parse("2020-11-11T10:11:22.777Z")}), Row.of((Object[])new Object[]{4, "foo", Instant.parse("2020-11-11T10:11:23.888Z")}));
        TableDescriptor sourceDescriptor = TableFactoryHarness.newBuilder().schema(CommonExecSinkITCase.schemaStreamRecordTimestampInserter(true)).source(new TimestampTestSource(rows)).sink(new TableFactoryHarness.SinkBase(){

            public DataStreamSinkProvider getSinkRuntimeProvider(DynamicTableSink.Context context) {
                return dataStream -> dataStream.addSink((SinkFunction)new SinkFunction<RowData>(){

                    public void invoke(RowData value, SinkFunction.Context context) {
                        CommonExecSinkITCase.addElement(timestamps, context.timestamp());
                    }
                });
            }
        }).build();
        tableEnv.createTable("T1", sourceDescriptor);
        String sqlStmt = "INSERT INTO T1 SELECT * FROM T1";
        CommonExecSinkITCase.assertPlan(tableEnv, sqlStmt, true);
        tableEnv.executeSql(sqlStmt).await();
        Collections.sort((List)timestamps.get());
        CommonExecSinkITCase.assertTimestampResults((SharedReference<List<Long>>)timestamps, rows);
    }

    @Test
    public void testUnifiedSinksAreUsableWithDataStreamSinkProvider() throws ExecutionException, InterruptedException {
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)this.env);
        final SharedReference fetched = this.sharedObjects.add(new ArrayList());
        List<Row> rows = Arrays.asList(Row.of((Object[])new Object[]{1}), Row.of((Object[])new Object[]{2}));
        TableDescriptor sourceDescriptor = TableFactoryHarness.newBuilder().schema(Schema.newBuilder().column("a", (AbstractDataType)DataTypes.INT()).build()).source(new TimestampTestSource(rows)).sink(new TableFactoryHarness.SinkBase(){

            public DataStreamSinkProvider getSinkRuntimeProvider(DynamicTableSink.Context context) {
                return CommonExecSinkITCase.buildRecordTestSinkProvider((SharedReference<List<RowData>>)fetched);
            }
        }).build();
        tableEnv.createTable("T1", sourceDescriptor);
        String sqlStmt = "INSERT INTO T1 SELECT * FROM T1";
        tableEnv.executeSql(sqlStmt).await();
        List fetchedRows = ((List)fetched.get()).stream().map(r -> r.getInt(0)).sorted().collect(Collectors.toList());
        Assert.assertEquals((long)((Integer)fetchedRows.get(0)).intValue(), (long)1L);
        Assert.assertEquals((long)((Integer)fetchedRows.get(1)).intValue(), (long)2L);
    }

    @Test
    public void testStreamRecordTimestampInserterNotApplied() {
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)this.env);
        final SharedReference timestamps = this.sharedObjects.add(new ArrayList());
        List<Row> rows = Arrays.asList(Row.of((Object[])new Object[]{1, "foo", Instant.parse("2020-11-10T11:34:56.123Z")}), Row.of((Object[])new Object[]{2, "foo", Instant.parse("2020-11-10T12:34:56.789Z")}), Row.of((Object[])new Object[]{3, "foo", Instant.parse("2020-11-11T10:11:22.777Z")}), Row.of((Object[])new Object[]{4, "foo", Instant.parse("2020-11-11T10:11:23.888Z")}));
        TableDescriptor sourceDescriptor = TableFactoryHarness.newBuilder().schema(CommonExecSinkITCase.schemaStreamRecordTimestampInserter(false)).source(new TimestampTestSource(rows)).sink(new TableFactoryHarness.SinkBase(){

            @Override
            public DynamicTableSink.SinkRuntimeProvider getSinkRuntimeProvider(DynamicTableSink.Context context) {
                return SinkProvider.of((Sink)TestSink.newBuilder().setWriter((TestSink.DefaultSinkWriter)new TestTimestampWriter(timestamps)).setCommittableSerializer((SimpleVersionedSerializer)TestSink.StringCommittableSerializer.INSTANCE).build());
            }
        }).build();
        tableEnv.createTable("T1", sourceDescriptor);
        CommonExecSinkITCase.assertPlan(tableEnv, "INSERT INTO T1 SELECT * FROM T1", false);
    }

    private static DataStreamSinkProvider buildRecordTestSinkProvider(SharedReference<List<RowData>> fetched) {
        return dataStream -> dataStream.sinkTo((Sink)TestSink.newBuilder().setWriter((TestSink.DefaultSinkWriter)new RecordWriter(fetched)).setCommittableSerializer((SimpleVersionedSerializer)TestSink.StringCommittableSerializer.INSTANCE).build());
    }

    private static <T> void addElement(SharedReference<List<T>> elements, T element) {
        elements.applySync(l -> l.add(element));
    }

    private static void assertPlan(StreamTableEnvironment tableEnv, String sql, boolean containsStreamRecordTimestampInserter) {
        Matcher matcher = Matchers.containsString((String)"StreamRecordTimestampInserter(rowtime field: 2");
        if (!containsStreamRecordTimestampInserter) {
            matcher = Matchers.not((Matcher)matcher);
        }
        MatcherAssert.assertThat((Object)tableEnv.explainSql(sql, new ExplainDetail[]{ExplainDetail.JSON_EXECUTION_PLAN}), (Matcher)matcher);
    }

    private static Schema schemaStreamRecordTimestampInserter(boolean withWatermark) {
        Schema.Builder builder = Schema.newBuilder().column("a", "INT").column("b", "STRING").column("ts", "TIMESTAMP_LTZ(3)");
        if (withWatermark) {
            builder.watermark("ts", "ts");
        }
        return builder.build();
    }

    private static void assertTimestampResults(SharedReference<List<Long>> timestamps, List<Row> rows) {
        Assert.assertEquals((long)rows.size(), (long)((List)timestamps.get()).size());
        for (int i = 0; i < rows.size(); ++i) {
            Assert.assertEquals((Object)rows.get(i).getField(2), (Object)Instant.ofEpochMilli((Long)((List)timestamps.get()).get(i)));
        }
    }

    private static class RecordWriter
    extends TestSink.DefaultSinkWriter<RowData> {
        private final SharedReference<List<RowData>> rows;

        private RecordWriter(SharedReference<List<RowData>> rows) {
            this.rows = rows;
        }

        public void write(RowData element, SinkWriter.Context context) {
            CommonExecSinkITCase.addElement(this.rows, element);
            super.write((Object)element, context);
        }
    }

    private static class TestTimestampWriter
    extends TestSink.DefaultSinkWriter<RowData> {
        private final SharedReference<List<Long>> timestamps;

        private TestTimestampWriter(SharedReference<List<Long>> timestamps) {
            this.timestamps = timestamps;
        }

        public void write(RowData element, SinkWriter.Context context) {
            CommonExecSinkITCase.addElement(this.timestamps, context.timestamp());
            super.write((Object)element, context);
        }
    }

    private static class TestSource
    implements SourceFunction<RowData> {
        private final List<Row> rows;
        private final DynamicTableSource.DataStructureConverter converter;

        public TestSource(List<Row> rows, DynamicTableSource.DataStructureConverter converter) {
            this.rows = rows;
            this.converter = converter;
        }

        public void run(SourceFunction.SourceContext<RowData> ctx) throws Exception {
            this.rows.stream().map(row2 -> (RowData)this.converter.toInternal(row2)).forEach(arg_0 -> ctx.collect(arg_0));
        }

        public void cancel() {
        }
    }

    private static class TimestampTestSource
    extends TableFactoryHarness.ScanSourceBase {
        private final List<Row> rows;

        private TimestampTestSource(List<Row> rows) {
            super(false);
            this.rows = rows;
        }

        @Override
        public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.ScanContext context) {
            DynamicTableSource.DataStructureConverter converter = context.createDataStructureConverter(this.getFactoryContext().getCatalogTable().getResolvedSchema().toPhysicalRowDataType());
            return SourceFunctionProvider.of((SourceFunction)new TestSource(this.rows, converter), (boolean)true);
        }
    }
}

