package org.streampipes.wrapper.flink;

import java.io.Serializable;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.streampipes.commons.exceptions.SpRuntimeException;
import org.streampipes.model.SpDataStream;
import org.streampipes.model.base.InvocableStreamPipesEntity;
import org.streampipes.model.grounding.JmsTransportProtocol;
import org.streampipes.model.grounding.KafkaTransportProtocol;
import org.streampipes.model.grounding.SimpleTopicDefinition;
import org.streampipes.wrapper.distributed.runtime.DistributedRuntime;
import org.streampipes.wrapper.flink.consumer.JmsConsumer;
import org.streampipes.wrapper.flink.converter.JsonToMapFormat;
import org.streampipes.wrapper.flink.logger.StatisticLogger;
import org.streampipes.wrapper.params.binding.BindingParams;

/* loaded from: input_file:org/streampipes/wrapper/flink/FlinkRuntime.class */
public abstract class FlinkRuntime<B extends BindingParams<I>, I extends InvocableStreamPipesEntity> extends DistributedRuntime<B, I> implements Runnable, Serializable {
    private static final long serialVersionUID = 1;
    protected TimeCharacteristic streamTimeCharacteristic;
    protected FlinkDeploymentConfig config;
    private boolean debug;
    private StreamExecutionEnvironment env;

    @Deprecated
    public FlinkRuntime(B b) {
        this((BindingParams) b, true);
    }

    @Deprecated
    public FlinkRuntime(B b, FlinkDeploymentConfig flinkDeploymentConfig) {
        this(b, flinkDeploymentConfig, false);
    }

    public FlinkRuntime(B b, boolean z) {
        super(b);
        if (z) {
            this.config = new FlinkDeploymentConfig("", "localhost", 6123);
        } else {
            this.config = getDeploymentConfig();
        }
        this.debug = z;
    }

    private FlinkRuntime(B b, FlinkDeploymentConfig flinkDeploymentConfig, boolean z) {
        super(b);
        this.config = flinkDeploymentConfig;
        this.debug = z;
    }

    protected abstract FlinkDeploymentConfig getDeploymentConfig();

    protected abstract void appendExecutionConfig(DataStream<Map<String, Object>>... dataStreamArr);

    @Override // java.lang.Runnable
    public void run() {
        try {
            this.env.execute(this.bindingParams.getGraph().getElementId());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void setStreamTimeCharacteristic(TimeCharacteristic timeCharacteristic) {
        this.streamTimeCharacteristic = timeCharacteristic;
    }

    private SourceFunction<String> getStream1Source() {
        return getStreamSource(0);
    }

    private SourceFunction<String> getStream2Source() {
        return getStreamSource(1);
    }

    private SourceFunction<String> getStreamSource(int i) {
        SpDataStream spDataStream;
        if (this.bindingParams.getGraph().getInputStreams().size() - 1 < i || (spDataStream = (SpDataStream) this.bindingParams.getGraph().getInputStreams().get(i)) == null) {
            return null;
        }
        KafkaTransportProtocol transportProtocol = spDataStream.getEventGrounding().getTransportProtocol();
        return transportProtocol instanceof KafkaTransportProtocol ? getKafkaConsumer(transportProtocol) : getJmsConsumer((JmsTransportProtocol) transportProtocol);
    }

    private SourceFunction<String> getJmsConsumer(JmsTransportProtocol jmsTransportProtocol) {
        return new JmsConsumer(jmsTransportProtocol);
    }

    private SourceFunction<String> getKafkaConsumer(KafkaTransportProtocol kafkaTransportProtocol) {
        return kafkaTransportProtocol.getTopicDefinition() instanceof SimpleTopicDefinition ? new FlinkKafkaConsumer010(kafkaTransportProtocol.getTopicDefinition().getActualTopicName(), new SimpleStringSchema(), getProperties(kafkaTransportProtocol)) : new FlinkKafkaConsumer010(Pattern.compile(replaceWildcardWithPatternFormat(kafkaTransportProtocol.getTopicDefinition().getActualTopicName())), new SimpleStringSchema(), getProperties(kafkaTransportProtocol));
    }

    public void prepareRuntime() throws SpRuntimeException {
        if (this.debug) {
            this.env = StreamExecutionEnvironment.createLocalEnvironment();
        } else {
            this.env = StreamExecutionEnvironment.createRemoteEnvironment(this.config.getHost(), this.config.getPort(), new String[]{this.config.getJarFile()});
        }
        appendEnvironmentConfig(this.env);
        SourceFunction<String> stream1Source = getStream1Source();
        if (stream1Source == null) {
            throw new SpRuntimeException("At least one source must be defined for a flink sepa");
        }
        SingleOutputStreamOperator flatMap = this.env.addSource(stream1Source).flatMap(new JsonToMapFormat()).flatMap(new StatisticLogger(getGraph()));
        SourceFunction<String> stream2Source = getStream2Source();
        if (stream2Source != null) {
            appendExecutionConfig(flatMap, this.env.addSource(stream2Source).flatMap(new JsonToMapFormat()).flatMap(new StatisticLogger(getGraph())));
        } else {
            appendExecutionConfig(flatMap);
        }
    }

    public void postDiscard() throws SpRuntimeException {
    }

    public void bindRuntime() throws SpRuntimeException {
        try {
            prepareRuntime();
            new Thread(this).start();
            if (!this.debug) {
                FlinkJobController flinkJobController = new FlinkJobController(this.config.getHost(), this.config.getPort());
                boolean z = false;
                int i = 0;
                do {
                    try {
                        i++;
                        Thread.sleep(1000L);
                        flinkJobController.findJobId(flinkJobController.getJobManagerGateway(), this.bindingParams.getGraph().getElementId());
                        z = true;
                    } catch (Exception e) {
                    }
                    if (z) {
                        break;
                    }
                } while (i < 60);
                if (i == 60) {
                    throw new SpRuntimeException("Error: Timeout reached when trying to connect to Flink Job Controller");
                }
            }
        } catch (Exception e2) {
            e2.printStackTrace();
            throw new SpRuntimeException(e2.getMessage());
        }
    }

    public void discardRuntime() throws SpRuntimeException {
        FlinkJobController flinkJobController = new FlinkJobController(this.config.getHost(), this.config.getPort());
        try {
            if (flinkJobController.deleteJob(flinkJobController.findJobId(flinkJobController.getJobManagerGateway(), this.bindingParams.getGraph().getElementId()))) {
            } else {
                throw new SpRuntimeException("Could not stop Flink Job");
            }
        } catch (Exception e) {
            throw new SpRuntimeException("Could not find Flink Job Manager, is it running?");
        }
    }

    public void appendEnvironmentConfig(StreamExecutionEnvironment streamExecutionEnvironment) {
        if (this.streamTimeCharacteristic != null) {
            streamExecutionEnvironment.setStreamTimeCharacteristic(this.streamTimeCharacteristic);
            streamExecutionEnvironment.setParallelism(1);
        }
    }
}
