/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.flink;

import java.util.List;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.component.flink.DataStreamCallback;
import org.apache.camel.component.flink.FlinkEndpoint;
import org.apache.camel.support.DefaultProducer;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataStreamFlinkProducer
extends DefaultProducer {
    private static final Logger LOG = LoggerFactory.getLogger(DataStreamFlinkProducer.class);
    private volatile boolean environmentConfigured = false;

    public DataStreamFlinkProducer(FlinkEndpoint endpoint) {
        super((Endpoint)endpoint);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void process(Exchange exchange) throws Exception {
        DataStream ds = this.resolveDataStream(exchange);
        if (!this.environmentConfigured && ds != null) {
            DataStreamFlinkProducer dataStreamFlinkProducer = this;
            synchronized (dataStreamFlinkProducer) {
                if (!this.environmentConfigured) {
                    this.configureStreamExecutionEnvironment(ds);
                    this.environmentConfigured = true;
                }
            }
        }
        DataStreamCallback dataStreamCallback = this.resolveDataStreamCallback(exchange);
        Object body = exchange.getIn().getBody();
        Object result = body instanceof List ? dataStreamCallback.onDataStream(ds, ((List)body).toArray(new Object[0])) : dataStreamCallback.onDataStream(ds, body);
        this.collectResults(exchange, result);
    }

    public FlinkEndpoint getEndpoint() {
        return (FlinkEndpoint)super.getEndpoint();
    }

    protected void collectResults(Exchange exchange, Object result) {
        if (result instanceof DataStream) {
            if (this.getEndpoint().isCollect()) {
                throw new IllegalArgumentException("collect mode not supported for Flink DataStreams.");
            }
            exchange.getIn().setBody(result);
            exchange.getIn().setHeader("CamelFlinkDataStream", result);
        } else {
            exchange.getIn().setBody(result);
        }
    }

    protected DataStream resolveDataStream(Exchange exchange) {
        if (exchange.getIn().getHeader("CamelFlinkDataStream") != null) {
            return (DataStream)exchange.getIn().getHeader("CamelFlinkDataStream");
        }
        if (this.getEndpoint().getDataStream() != null) {
            return this.getEndpoint().getDataStream();
        }
        throw new IllegalArgumentException("No DataStream defined");
    }

    protected DataStreamCallback resolveDataStreamCallback(Exchange exchange) {
        if (exchange.getIn().getHeader("CamelFlinkDataStreamCallback") != null) {
            return (DataStreamCallback)exchange.getIn().getHeader("CamelFlinkDataStreamCallback");
        }
        if (this.getEndpoint().getDataStreamCallback() != null) {
            return this.getEndpoint().getDataStreamCallback();
        }
        throw new IllegalArgumentException("Cannot resolve DataStream callback.");
    }

    protected void configureStreamExecutionEnvironment(DataStream dataStream) {
        RuntimeExecutionMode mode;
        if (dataStream == null) {
            LOG.debug("No DataStream provided, skipping environment configuration");
            return;
        }
        StreamExecutionEnvironment env = dataStream.getExecutionEnvironment();
        if (this.getEndpoint().getExecutionMode() != null) {
            try {
                mode = RuntimeExecutionMode.valueOf((String)this.getEndpoint().getExecutionMode());
                env.setRuntimeMode(mode);
                LOG.info("Set Flink runtime execution mode to: {}", (Object)mode);
            }
            catch (IllegalArgumentException e) {
                LOG.warn("Invalid execution mode '{}'. Valid values are: STREAMING, BATCH, AUTOMATIC", (Object)this.getEndpoint().getExecutionMode());
            }
        }
        if (this.getEndpoint().getParallelism() != null) {
            env.setParallelism(this.getEndpoint().getParallelism().intValue());
            LOG.info("Set Flink parallelism to: {}", (Object)this.getEndpoint().getParallelism());
        }
        if (this.getEndpoint().getMaxParallelism() != null) {
            env.setMaxParallelism(this.getEndpoint().getMaxParallelism().intValue());
            LOG.info("Set Flink max parallelism to: {}", (Object)this.getEndpoint().getMaxParallelism());
        }
        if (this.getEndpoint().getCheckpointInterval() != null && this.getEndpoint().getCheckpointInterval() > 0L) {
            env.enableCheckpointing(this.getEndpoint().getCheckpointInterval().longValue());
            LOG.info("Enabled checkpointing with interval: {} ms", (Object)this.getEndpoint().getCheckpointInterval());
            if (this.getEndpoint().getCheckpointingMode() != null) {
                try {
                    mode = CheckpointingMode.valueOf((String)this.getEndpoint().getCheckpointingMode());
                    env.getCheckpointConfig().setCheckpointingMode((CheckpointingMode)mode);
                    LOG.info("Set checkpointing mode to: {}", (Object)mode);
                }
                catch (IllegalArgumentException e) {
                    LOG.warn("Invalid checkpointing mode '{}'. Valid values are: EXACTLY_ONCE, AT_LEAST_ONCE", (Object)this.getEndpoint().getCheckpointingMode());
                }
            }
            if (this.getEndpoint().getCheckpointTimeout() != null) {
                env.getCheckpointConfig().setCheckpointTimeout(this.getEndpoint().getCheckpointTimeout().longValue());
                LOG.info("Set checkpoint timeout to: {} ms", (Object)this.getEndpoint().getCheckpointTimeout());
            }
            if (this.getEndpoint().getMinPauseBetweenCheckpoints() != null) {
                env.getCheckpointConfig().setMinPauseBetweenCheckpoints(this.getEndpoint().getMinPauseBetweenCheckpoints().longValue());
                LOG.info("Set min pause between checkpoints to: {} ms", (Object)this.getEndpoint().getMinPauseBetweenCheckpoints());
            }
        }
        LOG.debug("StreamExecutionEnvironment configuration completed");
    }
}

