/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.fnexecution.environment;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeoutException;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.BeamUrns;
import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
import org.apache.beam.runners.fnexecution.control.ControlClientPool;
import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
import org.apache.beam.runners.fnexecution.environment.ProcessEnvironment;
import org.apache.beam.runners.fnexecution.environment.ProcessManager;
import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
import org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService;
import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.fn.server.GrpcFnServer;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.RemoteEnvironmentOptions;
import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ProtocolMessageEnum;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProcessEnvironmentFactory
implements EnvironmentFactory {
    private static final Logger LOG = LoggerFactory.getLogger(ProcessEnvironmentFactory.class);
    private final ProcessManager processManager;
    private final GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer;
    private final IdGenerator idGenerator;
    private final ControlClientPool.Source clientSource;
    private final PipelineOptions pipelineOptions;

    public static ProcessEnvironmentFactory create(ProcessManager processManager, GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer, ControlClientPool.Source clientSource, IdGenerator idGenerator, PipelineOptions pipelineOptions) {
        return new ProcessEnvironmentFactory(processManager, provisioningServiceServer, idGenerator, clientSource, pipelineOptions);
    }

    private ProcessEnvironmentFactory(ProcessManager processManager, GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer, IdGenerator idGenerator, ControlClientPool.Source clientSource, PipelineOptions pipelineOptions) {
        this.processManager = processManager;
        this.provisioningServiceServer = provisioningServiceServer;
        this.idGenerator = idGenerator;
        this.clientSource = clientSource;
        this.pipelineOptions = pipelineOptions;
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Override
    public RemoteEnvironment createEnvironment(RunnerApi.Environment environment, String workerId) throws Exception {
        Preconditions.checkState((boolean)environment.getUrn().equals(BeamUrns.getUrn((ProtocolMessageEnum)RunnerApi.StandardEnvironments.Environments.PROCESS)), (Object)"The passed environment does not contain a ProcessPayload.");
        RunnerApi.ProcessPayload processPayload = RunnerApi.ProcessPayload.parseFrom((ByteString)environment.getPayload());
        String executable = processPayload.getCommand();
        String provisionEndpoint = this.provisioningServiceServer.getApiServiceDescriptor().getUrl();
        String semiPersistDir = ((RemoteEnvironmentOptions)this.pipelineOptions.as(RemoteEnvironmentOptions.class)).getSemiPersistDir();
        ImmutableList.Builder argsBuilder = ImmutableList.builder().add((Object)String.format("--id=%s", workerId)).add((Object)String.format("--provision_endpoint=%s", provisionEndpoint));
        if (semiPersistDir != null) {
            argsBuilder.add((Object)String.format("--semi_persist_dir=%s", semiPersistDir));
        }
        LOG.debug("Creating Process for worker ID {}", (Object)workerId);
        InstructionRequestHandler instructionHandler = null;
        try {
            ProcessManager.RunningProcess process = this.processManager.startProcess(workerId, executable, (List<String>)argsBuilder.build(), processPayload.getEnvMap());
            while (instructionHandler == null) {
                try {
                    process.isAliveOrThrow();
                    instructionHandler = this.clientSource.take(workerId, Duration.ofSeconds(5L));
                }
                catch (TimeoutException timeoutEx) {
                    LOG.info("Still waiting for startup of environment '{}' for worker id {}", (Object)processPayload.getCommand(), (Object)workerId);
                }
                catch (InterruptedException interruptEx) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(interruptEx);
                    return ProcessEnvironment.create(this.processManager, environment, workerId, instructionHandler);
                }
            }
        }
        catch (Exception e) {
            try {
                this.processManager.stopProcess(workerId);
                throw e;
            }
            catch (Exception processKillException) {
                e.addSuppressed(processKillException);
            }
            throw e;
        }
    }

    public static class Provider
    implements EnvironmentFactory.Provider {
        private final PipelineOptions pipelineOptions;

        public Provider(PipelineOptions options) {
            this.pipelineOptions = options;
        }

        @Override
        public EnvironmentFactory createEnvironmentFactory(GrpcFnServer<FnApiControlClientPoolService> controlServiceServer, GrpcFnServer<GrpcLoggingService> loggingServiceServer, GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer, GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer, ControlClientPool clientPool, IdGenerator idGenerator) {
            return ProcessEnvironmentFactory.create(ProcessManager.create(), provisioningServiceServer, clientPool.getSource(), idGenerator, this.pipelineOptions);
        }
    }
}

