/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.fnexecution.FnService;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.ServerFactory;
import org.apache.beam.runners.fnexecution.control.ControlClientPool;
import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
import org.apache.beam.runners.fnexecution.control.MapControlClientPool;
import org.apache.beam.runners.fnexecution.control.SingleEnvironmentInstanceJobBundleFactory;
import org.apache.beam.runners.fnexecution.data.GrpcDataService;
import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
import org.apache.beam.runners.fnexecution.state.GrpcStateService;
import org.apache.beam.runners.samza.SamzaExecutionContext;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.SamzaPipelineOptionsValidator;
import org.apache.beam.runners.samza.SamzaPipelineResult;
import org.apache.beam.runners.samza.SamzaRunnerOverrideConfigs;
import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
import org.apache.beam.runners.samza.translation.ConfigBuilder;
import org.apache.beam.runners.samza.translation.PViewToIdMapper;
import org.apache.beam.runners.samza.translation.PortableTranslationContext;
import org.apache.beam.runners.samza.translation.SamzaPipelineTranslator;
import org.apache.beam.runners.samza.translation.SamzaPortablePipelineTranslator;
import org.apache.beam.runners.samza.translation.SamzaTransformOverrides;
import org.apache.beam.runners.samza.translation.TranslationContext;
import org.apache.beam.runners.samza.util.PipelineDotRenderer;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.fn.IdGenerators;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PValue;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.operators.ContextManager;
import org.apache.samza.operators.StreamGraph;
import org.apache.samza.runtime.ApplicationRunner;
import org.apache.samza.task.TaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SamzaRunner
extends PipelineRunner<SamzaPipelineResult> {
    private static final Logger LOG = LoggerFactory.getLogger(SamzaRunner.class);
    private static final String SAMZA_WORKER_ID = "samza_py_worker_id";
    private GrpcFnServer<FnApiControlClientPoolService> fnControlServer;
    private GrpcFnServer<GrpcDataService> fnDataServer;
    private GrpcFnServer<GrpcStateService> fnStateServer;
    private ControlClientPool controlClientPool;
    private JobBundleFactory jobBundleFactory;
    private ExecutorService dataExecutor;
    private final SamzaPipelineOptions options;

    public static SamzaRunner fromOptions(PipelineOptions opts) {
        SamzaPipelineOptions samzaOptions = SamzaPipelineOptionsValidator.validate(opts);
        return new SamzaRunner(samzaOptions);
    }

    public SamzaRunner(SamzaPipelineOptions options) {
        this.options = options;
    }

    private static void closeAutoClosable(AutoCloseable closeable) {
        try {
            AutoCloseable closer = closeable;
            Throwable throwable = null;
            if (closer != null) {
                if (throwable != null) {
                    try {
                        closer.close();
                    }
                    catch (Throwable throwable2) {
                        throwable.addSuppressed(throwable2);
                    }
                } else {
                    closer.close();
                }
            }
        }
        catch (Exception e) {
            LOG.error("Failed to close {}. Ignore since this is shutdown process...", (Object)closeable.getClass().getSimpleName(), (Object)e);
        }
    }

    private void setUpContextManager(StreamGraph streamGraph, final SamzaExecutionContext executionContext) {
        streamGraph.withContextManager(new ContextManager(){

            public void init(Config config, TaskContext context) {
                if (executionContext.getMetricsContainer() == null) {
                    MetricsRegistryMap metricsRegistry = (MetricsRegistryMap)context.getSamzaContainerContext().metricsRegistry;
                    executionContext.setMetricsContainer(new SamzaMetricsContainer(metricsRegistry));
                }
                if (SamzaRunnerOverrideConfigs.isPortableMode(SamzaRunner.this.options) && SamzaRunner.this.jobBundleFactory == null) {
                    try {
                        long waitTimeoutMs = SamzaRunnerOverrideConfigs.getControlClientWaitTimeoutMs(SamzaRunner.this.options);
                        InstructionRequestHandler instructionHandler = SamzaRunner.this.controlClientPool.getSource().take(SamzaRunner.SAMZA_WORKER_ID, Duration.ofMillis(waitTimeoutMs));
                        EnvironmentFactory environmentFactory = environment -> RemoteEnvironment.forHandler((RunnerApi.Environment)environment, (InstructionRequestHandler)instructionHandler);
                        SamzaRunner.this.jobBundleFactory = SingleEnvironmentInstanceJobBundleFactory.create((EnvironmentFactory)environmentFactory, (GrpcFnServer)SamzaRunner.this.fnDataServer, (GrpcFnServer)SamzaRunner.this.fnStateServer, (IdGenerator)IdGenerators.incrementingLongs());
                    }
                    catch (Exception e) {
                        throw new RuntimeException("Running samza in Beam portable mode but failed to create job bundle factory", e);
                    }
                    executionContext.setJobBundleFactory(SamzaRunner.this.jobBundleFactory);
                }
                context.setUserContext((Object)executionContext);
            }

            public void close() {
                SamzaRunner.closeAutoClosable((AutoCloseable)SamzaRunner.this.fnControlServer);
                SamzaRunner.this.fnControlServer = null;
                SamzaRunner.closeAutoClosable((AutoCloseable)SamzaRunner.this.fnDataServer);
                SamzaRunner.this.fnDataServer = null;
                SamzaRunner.closeAutoClosable((AutoCloseable)SamzaRunner.this.fnStateServer);
                SamzaRunner.this.fnStateServer = null;
                if (SamzaRunner.this.dataExecutor != null) {
                    SamzaRunner.this.dataExecutor.shutdown();
                    SamzaRunner.this.dataExecutor = null;
                }
                SamzaRunner.this.controlClientPool = null;
                SamzaRunner.closeAutoClosable((AutoCloseable)SamzaRunner.this.jobBundleFactory);
                SamzaRunner.this.jobBundleFactory = null;
            }
        });
    }

    private void setUpFnApiServer() {
        this.controlClientPool = MapControlClientPool.create();
        this.dataExecutor = Executors.newCachedThreadPool();
        try {
            this.fnControlServer = GrpcFnServer.allocatePortAndCreateFor((FnService)FnApiControlClientPoolService.offeringClientsToPool((ControlClientPool.Sink)this.controlClientPool.getSink(), () -> SAMZA_WORKER_ID), (ServerFactory)ServerFactory.createWithPortSupplier(() -> SamzaRunnerOverrideConfigs.getFnControlPort(this.options)));
            this.fnDataServer = GrpcFnServer.allocatePortAndCreateFor((FnService)GrpcDataService.create((ExecutorService)this.dataExecutor, (OutboundObserverFactory)OutboundObserverFactory.serverDirect()), (ServerFactory)ServerFactory.createDefault());
            this.fnStateServer = GrpcFnServer.allocatePortAndCreateFor((FnService)GrpcStateService.create(), (ServerFactory)ServerFactory.createDefault());
        }
        catch (Exception e) {
            LOG.error("Failed to set up fn api servers", (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    SamzaPipelineResult runPortablePipeline(RunnerApi.Pipeline pipeline) {
        SamzaExecutionContext executionContext = new SamzaExecutionContext();
        this.setUpFnApiServer();
        ConfigBuilder configBuilder = new ConfigBuilder(this.options);
        SamzaPortablePipelineTranslator.createConfig(pipeline, configBuilder);
        ApplicationRunner runner = ApplicationRunner.fromConfig((Config)configBuilder.build());
        StreamApplication app = (streamGraph, config) -> {
            this.setUpContextManager(streamGraph, executionContext);
            SamzaPortablePipelineTranslator.translate(pipeline, new PortableTranslationContext(streamGraph, this.options));
        };
        SamzaPipelineResult result = new SamzaPipelineResult(app, runner, executionContext);
        runner.run(app);
        return result;
    }

    public SamzaPipelineResult run(final Pipeline pipeline) {
        MetricsEnvironment.setMetricsSupported((boolean)true);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Pre-processed Beam pipeline:\n{}", (Object)PipelineDotRenderer.toDotString(pipeline));
        }
        pipeline.replaceAll(SamzaTransformOverrides.getDefaultOverrides());
        if (LOG.isDebugEnabled()) {
            LOG.debug("Post-processed Beam pipeline:\n{}", (Object)PipelineDotRenderer.toDotString(pipeline));
        }
        final PValue dummySource = (PValue)pipeline.apply("Dummy Input Source", (PTransform)Create.of((Object)"dummy", (Object[])new String[0]));
        final Map<PValue, String> idMap = PViewToIdMapper.buildIdMap(pipeline);
        ConfigBuilder configBuilder = new ConfigBuilder(this.options);
        SamzaPipelineTranslator.createConfig(pipeline, this.options, idMap, configBuilder);
        ApplicationRunner runner = ApplicationRunner.fromConfig((Config)configBuilder.build());
        final SamzaExecutionContext executionContext = new SamzaExecutionContext();
        StreamApplication app = new StreamApplication(){

            public void init(StreamGraph streamGraph, Config config) {
                SamzaRunner.this.setUpContextManager(streamGraph, executionContext);
                SamzaPipelineTranslator.translate(pipeline, new TranslationContext(streamGraph, idMap, SamzaRunner.this.options, dummySource));
            }
        };
        SamzaPipelineResult result = new SamzaPipelineResult(app, runner, executionContext);
        runner.run(app);
        return result;
    }
}

