/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza;

import java.io.IOException;
import java.util.HashMap;
import java.util.UUID;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.fnexecution.FnService;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.ServerFactory;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.jobsubmission.InMemoryJobService;
import org.apache.beam.runners.jobsubmission.JobInvocation;
import org.apache.beam.runners.jobsubmission.JobInvoker;
import org.apache.beam.runners.jobsubmission.PortablePipelineRunner;
import org.apache.beam.runners.samza.SamzaPipelineRunner;
import org.apache.beam.runners.samza.SamzaPortablePipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Struct;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ListeningExecutorService;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SamzaJobServerDriver {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(SamzaJobServerDriver.class);
    private final @UnknownKeyFor @NonNull @Initialized SamzaPortablePipelineOptions pipelineOptions;

    private SamzaJobServerDriver(@UnknownKeyFor @NonNull @Initialized SamzaPortablePipelineOptions pipelineOptions) {
        this.pipelineOptions = pipelineOptions;
    }

    public static void main(@UnknownKeyFor @NonNull @Initialized String @UnknownKeyFor @NonNull @Initialized [] args) throws @UnknownKeyFor @NonNull @Initialized Exception {
        SamzaPortablePipelineOptions pipelineOptions = (SamzaPortablePipelineOptions)PipelineOptionsFactory.fromArgs((String[])args).as(SamzaPortablePipelineOptions.class);
        SamzaJobServerDriver.fromOptions(pipelineOptions).run();
    }

    public static @UnknownKeyFor @NonNull @Initialized SamzaJobServerDriver fromOptions(@UnknownKeyFor @NonNull @Initialized SamzaPortablePipelineOptions pipelineOptions) {
        HashMap<String, String> overrideConfig = pipelineOptions.getConfigOverride() != null ? pipelineOptions.getConfigOverride() : new HashMap<String, String>();
        overrideConfig.put("beam.override.portable", String.valueOf(true));
        overrideConfig.put("beam.override.control.port", String.valueOf(pipelineOptions.getControlPort()));
        pipelineOptions.setConfigOverride(overrideConfig);
        return new SamzaJobServerDriver(pipelineOptions);
    }

    private static @UnknownKeyFor @NonNull @Initialized InMemoryJobService createJobService(final @UnknownKeyFor @NonNull @Initialized SamzaPortablePipelineOptions pipelineOptions) throws @UnknownKeyFor @NonNull @Initialized IOException {
        JobInvoker jobInvoker = new JobInvoker("samza-job-invoker"){

            protected @UnknownKeyFor @NonNull @Initialized JobInvocation invokeWithExecutor(// Could not load outer class - annotation placement on inner may be incorrect
             @UnknownKeyFor @NonNull @Initialized RunnerApi.Pipeline pipeline, @UnknownKeyFor @NonNull @Initialized Struct options, @Nullable @UnknownKeyFor @Initialized String retrievalToken, @UnknownKeyFor @NonNull @Initialized ListeningExecutorService executorService) throws @UnknownKeyFor @NonNull @Initialized IOException {
                String invocationId = String.format("%s_%s", pipelineOptions.getJobName(), UUID.randomUUID().toString());
                SamzaPipelineRunner pipelineRunner = new SamzaPipelineRunner(pipelineOptions);
                JobInfo jobInfo = JobInfo.create((String)invocationId, (String)pipelineOptions.getJobName(), (String)retrievalToken, (Struct)PipelineOptionsTranslation.toProto((PipelineOptions)pipelineOptions));
                return new JobInvocation(jobInfo, executorService, pipeline, (PortablePipelineRunner)pipelineRunner);
            }
        };
        return InMemoryJobService.create(null, session -> session, stagingSessionToken -> {}, (JobInvoker)jobInvoker, (int)10);
    }

    public void run() throws @UnknownKeyFor @NonNull @Initialized Exception {
        InMemoryJobService service = SamzaJobServerDriver.createJobService(this.pipelineOptions);
        GrpcFnServer jobServiceGrpcFnServer = GrpcFnServer.allocatePortAndCreateFor((FnService)service, (ServerFactory)ServerFactory.createWithPortSupplier(this.pipelineOptions::getJobPort));
        LOG.info("JobServer started on {}", (Object)jobServiceGrpcFnServer.getApiServiceDescriptor().getUrl());
        try {
            jobServiceGrpcFnServer.getServer().awaitTermination();
        }
        finally {
            LOG.info("JobServer closing");
            jobServiceGrpcFnServer.close();
        }
    }
}

