/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza;

import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
import java.util.HashMap;
import java.util.UUID;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.fnexecution.FnService;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.ServerFactory;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.jobsubmission.InMemoryJobService;
import org.apache.beam.runners.jobsubmission.JobInvocation;
import org.apache.beam.runners.jobsubmission.JobInvoker;
import org.apache.beam.runners.jobsubmission.PortablePipelineRunner;
import org.apache.beam.runners.samza.SamzaPipelineRunner;
import org.apache.beam.runners.samza.SamzaPortablePipelineOptions;
import org.apache.beam.sdk.expansion.service.ExpansionServer;
import org.apache.beam.sdk.expansion.service.ExpansionService;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Struct;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ListeningExecutorService;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SamzaJobServerDriver {
    private static final Logger LOG = LoggerFactory.getLogger(SamzaJobServerDriver.class);
    private final SamzaPortablePipelineOptions pipelineOptions;

    protected SamzaJobServerDriver(SamzaPortablePipelineOptions pipelineOptions) {
        this.pipelineOptions = pipelineOptions;
    }

    public static void main(String[] args) throws Exception {
        SamzaPortablePipelineOptions pipelineOptions = (SamzaPortablePipelineOptions)PipelineOptionsFactory.fromArgs((String[])args).as(SamzaPortablePipelineOptions.class);
        SamzaJobServerDriver.fromOptions(pipelineOptions).run();
    }

    public static SamzaJobServerDriver fromOptions(SamzaPortablePipelineOptions pipelineOptions) {
        HashMap<String, String> overrideConfig = pipelineOptions.getConfigOverride() != null ? pipelineOptions.getConfigOverride() : new HashMap<String, String>();
        overrideConfig.put("beam.override.portable", String.valueOf(true));
        overrideConfig.put("beam.override.control.port", String.valueOf(pipelineOptions.getControlPort()));
        overrideConfig.put("beam.override.fs.token.path", pipelineOptions.getFsTokenPath());
        pipelineOptions.setConfigOverride(overrideConfig);
        return new SamzaJobServerDriver(pipelineOptions);
    }

    private InMemoryJobService createJobService() throws IOException {
        JobInvoker jobInvoker = new JobInvoker("samza-job-invoker"){

            protected JobInvocation invokeWithExecutor(RunnerApi.Pipeline pipeline, Struct options, @Nullable String retrievalToken, ListeningExecutorService executorService) throws IOException {
                SamzaPipelineRunner pipelineRunner = new SamzaPipelineRunner(SamzaJobServerDriver.this.pipelineOptions);
                String invocationId = String.format("%s_%s", SamzaJobServerDriver.this.pipelineOptions.getJobName(), UUID.randomUUID().toString());
                JobInfo jobInfo = JobInfo.create((String)invocationId, (String)SamzaJobServerDriver.this.pipelineOptions.getJobName(), (String)retrievalToken, (Struct)PipelineOptionsTranslation.toProto((PipelineOptions)SamzaJobServerDriver.this.pipelineOptions));
                return new JobInvocation(jobInfo, executorService, pipeline, (PortablePipelineRunner)pipelineRunner);
            }
        };
        return InMemoryJobService.create(null, session -> session, stagingSessionToken -> {}, (JobInvoker)jobInvoker, (int)10);
    }

    private ExpansionServer createExpansionService(String host, int expansionPort) throws IOException {
        if (host == null) {
            host = InetAddress.getLoopbackAddress().getHostName();
        }
        ExpansionServer expansionServer = ExpansionServer.create((ExpansionService)new ExpansionService(), (String)host, (int)expansionPort);
        LOG.info("Java ExpansionService started on {}:{}", (Object)expansionServer.getHost(), (Object)expansionServer.getPort());
        return expansionServer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run() throws Exception {
        InMemoryJobService service = this.createJobService();
        GrpcFnServer jobServiceGrpcFnServer = GrpcFnServer.allocatePortAndCreateFor((FnService)service, (ServerFactory)ServerFactory.createWithPortSupplier(this.pipelineOptions::getJobPort));
        String jobServerUrl = jobServiceGrpcFnServer.getApiServiceDescriptor().getUrl();
        LOG.info("JobServer started on {}", (Object)jobServerUrl);
        URI uri = new URI(jobServerUrl);
        ExpansionServer expansionServer = this.createExpansionService(uri.getHost(), this.pipelineOptions.getExpansionPort());
        try {
            jobServiceGrpcFnServer.getServer().awaitTermination();
        }
        finally {
            LOG.info("JobServer closing");
            jobServiceGrpcFnServer.close();
            if (expansionServer != null) {
                try {
                    expansionServer.close();
                    LOG.info("Expansion stopped on {}:{}", (Object)expansionServer.getHost(), (Object)expansionServer.getPort());
                }
                catch (Exception e) {
                    LOG.error("Error while closing the Expansion Service.", (Throwable)e);
                }
            }
        }
    }
}

