/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.values.sink;

import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.event.ChangeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.DataSink;
import org.apache.flink.cdc.common.sink.EventSinkProvider;
import org.apache.flink.cdc.common.sink.FlinkSinkFunctionProvider;
import org.apache.flink.cdc.common.sink.FlinkSinkProvider;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.connectors.values.ValuesDatabase;
import org.apache.flink.cdc.connectors.values.sink.ValuesDataSinkFunction;
import org.apache.flink.cdc.connectors.values.sink.ValuesDataSinkHelper;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

@Internal
public class ValuesDataSink
implements DataSink,
Serializable {
    private final boolean materializedInMemory;
    private final boolean print;
    private final SinkApi sinkApi;
    private final boolean errorOnSchemaChange;

    public ValuesDataSink(boolean materializedInMemory, boolean print, SinkApi sinkApi, boolean errorOnSchemaChange) {
        this.materializedInMemory = materializedInMemory;
        this.print = print;
        this.sinkApi = sinkApi;
        this.errorOnSchemaChange = errorOnSchemaChange;
    }

    public EventSinkProvider getEventSinkProvider() {
        if (SinkApi.SINK_V2.equals((Object)this.sinkApi)) {
            return FlinkSinkProvider.of((Sink)new ValuesSink(this.materializedInMemory, this.print));
        }
        return FlinkSinkFunctionProvider.of((SinkFunction)new ValuesDataSinkFunction(this.materializedInMemory, this.print));
    }

    public MetadataApplier getMetadataApplier() {
        if (this.errorOnSchemaChange) {
            return new ValuesDatabase.ErrorOnChangeMetadataApplier();
        }
        return new ValuesDatabase.ValuesMetadataApplier();
    }

    public static enum SinkApi {
        SINK_FUNCTION,
        SINK_V2;

    }

    private static class ValuesSinkWriter
    implements SinkWriter<Event> {
        private final boolean materializedInMemory;
        private final boolean print;
        private final int subtaskIndex;
        private final int numSubtasks;
        private final Map<TableId, Schema> schemaMaps;
        private final Map<TableId, List<RecordData.FieldGetter>> fieldGetterMaps;

        public ValuesSinkWriter(boolean materializedInMemory, boolean print, int subtaskIndex, int numSubtasks) {
            this.materializedInMemory = materializedInMemory;
            this.print = print;
            this.subtaskIndex = subtaskIndex;
            this.numSubtasks = numSubtasks;
            this.schemaMaps = new HashMap<TableId, Schema>();
            this.fieldGetterMaps = new HashMap<TableId, List<RecordData.FieldGetter>>();
        }

        public void write(Event event, SinkWriter.Context context) {
            if (event instanceof SchemaChangeEvent) {
                SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent)event;
                TableId tableId = schemaChangeEvent.tableId();
                if (event instanceof CreateTableEvent) {
                    Schema schema = ((CreateTableEvent)event).getSchema();
                    this.schemaMaps.put(tableId, schema);
                    this.fieldGetterMaps.put(tableId, SchemaUtils.createFieldGetters((Schema)schema));
                } else {
                    if (!this.schemaMaps.containsKey(tableId)) {
                        throw new RuntimeException("schema of " + tableId + " is not existed.");
                    }
                    Schema schema = SchemaUtils.applySchemaChangeEvent((Schema)this.schemaMaps.get(tableId), (SchemaChangeEvent)schemaChangeEvent);
                    this.schemaMaps.put(tableId, schema);
                    this.fieldGetterMaps.put(tableId, SchemaUtils.createFieldGetters((Schema)schema));
                }
            } else if (this.materializedInMemory && event instanceof DataChangeEvent) {
                ValuesDatabase.applyDataChangeEvent((DataChangeEvent)event);
            }
            if (this.print) {
                String prefix = this.numSubtasks > 1 ? this.subtaskIndex + "> " : "";
                System.out.println(prefix + ValuesDataSinkHelper.convertEventToStr(event, this.fieldGetterMaps.get(((ChangeEvent)event).tableId())));
            }
        }

        public void flush(boolean endOfInput) {
        }

        public void close() {
        }
    }

    private static class ValuesSink
    implements Sink<Event> {
        private final boolean materializedInMemory;
        private final boolean print;

        public ValuesSink(boolean materializedInMemory, boolean print) {
            this.materializedInMemory = materializedInMemory;
            this.print = print;
        }

        public SinkWriter<Event> createWriter(Sink.InitContext context) {
            return new ValuesSinkWriter(this.materializedInMemory, this.print, context.getSubtaskId(), context.getNumberOfParallelSubtasks());
        }
    }
}

