/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.UUID;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.fnexecution.FnService;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.ServerFactory;
import org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService;
import org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService;
import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker;
import org.apache.beam.runners.samza.SamzaJobInvocation;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.Struct;
import org.kohsuke.args4j.CmdLineException;
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SamzaJobServerDriver {
    private static final Logger LOG = LoggerFactory.getLogger(SamzaJobServerDriver.class);
    private final ServerConfiguration config;

    private SamzaJobServerDriver(ServerConfiguration config) {
        this.config = config;
    }

    public static void main(String[] args) throws Exception {
        ServerConfiguration configuration = new ServerConfiguration();
        CmdLineParser parser = new CmdLineParser((Object)configuration);
        try {
            parser.parseArgument(args);
            SamzaJobServerDriver.fromConfig(configuration).run();
        }
        catch (CmdLineException e) {
            LOG.error("Unable to parse command line arguments {}", Arrays.asList(args), (Object)e);
            throw new IllegalArgumentException("Unable to parse command line arguments.", e);
        }
        catch (Exception e) {
            LOG.error("Hit exception with SamzaJobServer. Exiting...", (Throwable)e);
            throw e;
        }
    }

    public static SamzaJobServerDriver fromConfig(ServerConfiguration config) {
        return new SamzaJobServerDriver(config);
    }

    private static InMemoryJobService createJobService(final int controlPort) throws IOException {
        JobInvoker jobInvoker = new JobInvoker(){

            public JobInvocation invoke(RunnerApi.Pipeline pipeline, Struct options, @Nullable String retrievalToken) throws IOException {
                SamzaPipelineOptions samzaPipelineOptions = (SamzaPipelineOptions)PipelineOptionsTranslation.fromProto((Struct)options).as(SamzaPipelineOptions.class);
                HashMap<String, String> overrideConfig = samzaPipelineOptions.getConfigOverride() != null ? samzaPipelineOptions.getConfigOverride() : new HashMap<String, String>();
                overrideConfig.put("beam.override.portable", String.valueOf(true));
                overrideConfig.put("beam.override.control.port", String.valueOf(controlPort));
                samzaPipelineOptions.setConfigOverride(overrideConfig);
                String invocationId = String.format("%s_%s", samzaPipelineOptions.getJobName(), UUID.randomUUID().toString());
                return new SamzaJobInvocation(pipeline, samzaPipelineOptions, invocationId);
            }
        };
        return InMemoryJobService.create(null, session -> {
            try {
                return BeamFileSystemArtifactStagingService.generateStagingSessionToken((String)session, (String)"/tmp/beam-artifact-staging");
            }
            catch (Exception exn) {
                throw new RuntimeException(exn);
            }
        }, stagingSessionToken -> {}, (JobInvoker)jobInvoker);
    }

    private void run() throws Exception {
        InMemoryJobService service = SamzaJobServerDriver.createJobService(this.config.controlPort);
        GrpcFnServer jobServiceGrpcFnServer = GrpcFnServer.allocatePortAndCreateFor((FnService)service, (ServerFactory)ServerFactory.createWithPortSupplier(() -> this.config.jobPort));
        LOG.info("JobServer started on {}", (Object)jobServiceGrpcFnServer.getApiServiceDescriptor().getUrl());
        try {
            jobServiceGrpcFnServer.getServer().awaitTermination();
        }
        finally {
            LOG.info("JobServer closing");
            jobServiceGrpcFnServer.close();
        }
    }

    private static class ServerConfiguration {
        @Option(name="--job-port", usage="The job service port. (Default: 11440)")
        private int jobPort = 11440;
        @Option(name="--control-port", usage="The FnControl port. (Default: 11441)")
        private int controlPort = 11441;

        private ServerConfiguration() {
        }
    }
}

