/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza;

import java.io.IOException;
import java.util.HashMap;
import java.util.UUID;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.fnexecution.FnService;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.ServerFactory;
import org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService;
import org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService;
import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker;
import org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineRunner;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.samza.SamzaPipelineRunner;
import org.apache.beam.runners.samza.SamzaPortablePipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.Struct;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ListeningExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SamzaJobServerDriver {
    private static final Logger LOG = LoggerFactory.getLogger(SamzaJobServerDriver.class);
    private final SamzaPortablePipelineOptions pipelineOptions;

    private SamzaJobServerDriver(SamzaPortablePipelineOptions pipelineOptions) {
        this.pipelineOptions = pipelineOptions;
    }

    public static void main(String[] args) throws Exception {
        SamzaPortablePipelineOptions pipelineOptions = (SamzaPortablePipelineOptions)PipelineOptionsFactory.fromArgs((String[])args).as(SamzaPortablePipelineOptions.class);
        SamzaJobServerDriver.fromOptions(pipelineOptions).run();
    }

    public static SamzaJobServerDriver fromOptions(SamzaPortablePipelineOptions pipelineOptions) {
        HashMap<String, String> overrideConfig = pipelineOptions.getConfigOverride() != null ? pipelineOptions.getConfigOverride() : new HashMap<String, String>();
        overrideConfig.put("beam.override.portable", String.valueOf(true));
        overrideConfig.put("beam.override.control.port", String.valueOf(pipelineOptions.getControlPort()));
        pipelineOptions.setConfigOverride(overrideConfig);
        return new SamzaJobServerDriver(pipelineOptions);
    }

    private static InMemoryJobService createJobService(final SamzaPortablePipelineOptions pipelineOptions) throws IOException {
        JobInvoker jobInvoker = new JobInvoker("samza-job-invoker"){

            protected JobInvocation invokeWithExecutor(RunnerApi.Pipeline pipeline, Struct options, @Nullable String retrievalToken, ListeningExecutorService executorService) throws IOException {
                String invocationId = String.format("%s_%s", pipelineOptions.getJobName(), UUID.randomUUID().toString());
                SamzaPipelineRunner pipelineRunner = new SamzaPipelineRunner(pipelineOptions);
                JobInfo jobInfo = JobInfo.create((String)invocationId, (String)pipelineOptions.getJobName(), (String)retrievalToken, (Struct)PipelineOptionsTranslation.toProto((PipelineOptions)pipelineOptions));
                return new JobInvocation(jobInfo, executorService, pipeline, (PortablePipelineRunner)pipelineRunner);
            }
        };
        return InMemoryJobService.create(null, session -> {
            try {
                return BeamFileSystemArtifactStagingService.generateStagingSessionToken((String)session, (String)"/tmp/beam-artifact-staging");
            }
            catch (Exception exn) {
                throw new RuntimeException(exn);
            }
        }, stagingSessionToken -> {}, (JobInvoker)jobInvoker);
    }

    public void run() throws Exception {
        InMemoryJobService service = SamzaJobServerDriver.createJobService(this.pipelineOptions);
        GrpcFnServer jobServiceGrpcFnServer = GrpcFnServer.allocatePortAndCreateFor((FnService)service, (ServerFactory)ServerFactory.createWithPortSupplier(this.pipelineOptions::getJobPort));
        LOG.info("JobServer started on {}", (Object)jobServiceGrpcFnServer.getApiServiceDescriptor().getUrl());
        try {
            jobServiceGrpcFnServer.getServer().awaitTermination();
        }
        finally {
            LOG.info("JobServer closing");
            jobServiceGrpcFnServer.close();
        }
    }
}

