/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza;

import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.fnexecution.FnService;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.ServerFactory;
import org.apache.beam.runners.fnexecution.control.ControlClientPool;
import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
import org.apache.beam.runners.fnexecution.control.MapControlClientPool;
import org.apache.beam.runners.fnexecution.control.SingleEnvironmentInstanceJobBundleFactory;
import org.apache.beam.runners.fnexecution.data.GrpcDataService;
import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
import org.apache.beam.runners.fnexecution.state.GrpcStateService;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.SamzaRunnerOverrideConfigs;
import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.fn.IdGenerators;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.samza.context.ApplicationContainerContext;
import org.apache.samza.context.ApplicationContainerContextFactory;
import org.apache.samza.context.ContainerContext;
import org.apache.samza.context.ExternalContext;
import org.apache.samza.context.JobContext;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SamzaExecutionContext
implements ApplicationContainerContext {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(SamzaExecutionContext.class);
    private static final @UnknownKeyFor @NonNull @Initialized String SAMZA_WORKER_ID = "samza_py_worker_id";
    private final @UnknownKeyFor @NonNull @Initialized SamzaPipelineOptions options;
    private @UnknownKeyFor @NonNull @Initialized SamzaMetricsContainer metricsContainer;
    private @UnknownKeyFor @NonNull @Initialized JobBundleFactory jobBundleFactory;
    private @UnknownKeyFor @NonNull @Initialized GrpcFnServer<@UnknownKeyFor @NonNull @Initialized FnApiControlClientPoolService> fnControlServer;
    private @UnknownKeyFor @NonNull @Initialized GrpcFnServer<@UnknownKeyFor @NonNull @Initialized GrpcDataService> fnDataServer;
    private @UnknownKeyFor @NonNull @Initialized GrpcFnServer<@UnknownKeyFor @NonNull @Initialized GrpcStateService> fnStateServer;
    private @UnknownKeyFor @NonNull @Initialized ControlClientPool controlClientPool;
    private @UnknownKeyFor @NonNull @Initialized ExecutorService dataExecutor;
    private @UnknownKeyFor @NonNull @Initialized IdGenerator idGenerator = IdGenerators.incrementingLongs();

    public SamzaExecutionContext(@UnknownKeyFor @NonNull @Initialized SamzaPipelineOptions options) {
        this.options = options;
    }

    public @UnknownKeyFor @NonNull @Initialized SamzaPipelineOptions getPipelineOptions() {
        return this.options;
    }

    public @UnknownKeyFor @NonNull @Initialized SamzaMetricsContainer getMetricsContainer() {
        return this.metricsContainer;
    }

    void setMetricsContainer(@UnknownKeyFor @NonNull @Initialized SamzaMetricsContainer metricsContainer) {
        this.metricsContainer = metricsContainer;
    }

    public @UnknownKeyFor @NonNull @Initialized JobBundleFactory getJobBundleFactory() {
        return this.jobBundleFactory;
    }

    void setJobBundleFactory(@UnknownKeyFor @NonNull @Initialized JobBundleFactory jobBundleFactory) {
        this.jobBundleFactory = jobBundleFactory;
    }

    public void start() {
        Preconditions.checkState((this.getJobBundleFactory() == null ? 1 : 0) != 0, (Object)"jobBundleFactory has been created!");
        if (SamzaRunnerOverrideConfigs.isPortableMode(this.options)) {
            try {
                this.controlClientPool = MapControlClientPool.create();
                this.dataExecutor = Executors.newCachedThreadPool();
                this.fnControlServer = GrpcFnServer.allocatePortAndCreateFor((FnService)FnApiControlClientPoolService.offeringClientsToPool((ControlClientPool.Sink)this.controlClientPool.getSink(), () -> SAMZA_WORKER_ID), (ServerFactory)ServerFactory.createWithPortSupplier(() -> SamzaRunnerOverrideConfigs.getFnControlPort(this.options)));
                LOG.info("Started control server on port {}", (Object)this.fnControlServer.getServer().getPort());
                this.fnDataServer = GrpcFnServer.allocatePortAndCreateFor((FnService)GrpcDataService.create((PipelineOptions)this.options, (ExecutorService)this.dataExecutor, (OutboundObserverFactory)OutboundObserverFactory.serverDirect()), (ServerFactory)ServerFactory.createDefault());
                LOG.info("Started data server on port {}", (Object)this.fnDataServer.getServer().getPort());
                this.fnStateServer = GrpcFnServer.allocatePortAndCreateFor((FnService)GrpcStateService.create(), (ServerFactory)ServerFactory.createDefault());
                LOG.info("Started state server on port {}", (Object)this.fnStateServer.getServer().getPort());
                long waitTimeoutMs = SamzaRunnerOverrideConfigs.getControlClientWaitTimeoutMs(this.options);
                LOG.info("Control client wait timeout config: " + waitTimeoutMs);
                InstructionRequestHandler instructionHandler = this.controlClientPool.getSource().take(SAMZA_WORKER_ID, Duration.ofMillis(waitTimeoutMs));
                EnvironmentFactory environmentFactory = (environment, workerId) -> RemoteEnvironment.forHandler((RunnerApi.Environment)environment, (InstructionRequestHandler)instructionHandler);
                this.jobBundleFactory = SingleEnvironmentInstanceJobBundleFactory.create((EnvironmentFactory)environmentFactory, this.fnDataServer, this.fnStateServer, (IdGenerator)this.idGenerator);
                LOG.info("Started job bundle factory");
            }
            catch (Exception e) {
                throw new RuntimeException("Running samza in Beam portable mode but failed to create job bundle factory", e);
            }
            this.setJobBundleFactory(this.jobBundleFactory);
        }
    }

    public void stop() {
        SamzaExecutionContext.closeAutoClosable(this.fnControlServer, "controlServer");
        this.fnControlServer = null;
        SamzaExecutionContext.closeAutoClosable(this.fnDataServer, "dataServer");
        this.fnDataServer = null;
        SamzaExecutionContext.closeAutoClosable(this.fnStateServer, "stateServer");
        this.fnStateServer = null;
        if (this.dataExecutor != null) {
            this.dataExecutor.shutdown();
            this.dataExecutor = null;
        }
        this.controlClientPool = null;
        SamzaExecutionContext.closeAutoClosable((AutoCloseable)this.jobBundleFactory, "jobBundle");
        this.jobBundleFactory = null;
    }

    private static void closeAutoClosable(@UnknownKeyFor @NonNull @Initialized AutoCloseable closeable, @UnknownKeyFor @NonNull @Initialized String name) {
        try (AutoCloseable closer = closeable;){
            LOG.info("Closed {}", (Object)name);
        }
        catch (Exception e) {
            LOG.error("Failed to close {}. Ignore since this is shutdown process...", (Object)closeable.getClass().getSimpleName(), (Object)e);
        }
    }

    public class Factory
    implements ApplicationContainerContextFactory<SamzaExecutionContext> {
        public @UnknownKeyFor @NonNull @Initialized SamzaExecutionContext create(@UnknownKeyFor @NonNull @Initialized ExternalContext externalContext, @UnknownKeyFor @NonNull @Initialized JobContext jobContext, @UnknownKeyFor @NonNull @Initialized ContainerContext containerContext) {
            MetricsRegistryMap metricsRegistry = (MetricsRegistryMap)containerContext.getContainerMetricsRegistry();
            SamzaExecutionContext.this.setMetricsContainer(new SamzaMetricsContainer(metricsRegistry));
            return SamzaExecutionContext.this;
        }
    }
}

