/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza;

import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
import java.util.HashMap;
import org.apache.beam.runners.fnexecution.FnService;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.ServerFactory;
import org.apache.beam.runners.jobsubmission.InMemoryJobService;
import org.apache.beam.runners.jobsubmission.JobInvocation;
import org.apache.beam.runners.jobsubmission.JobInvoker;
import org.apache.beam.runners.samza.SamzaJobInvocation;
import org.apache.beam.runners.samza.SamzaPortablePipelineOptions;
import org.apache.beam.sdk.expansion.service.ExpansionServer;
import org.apache.beam.sdk.expansion.service.ExpansionService;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Struct;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ListeningExecutorService;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SamzaJobServerDriver {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(SamzaJobServerDriver.class);
    private final @UnknownKeyFor @NonNull @Initialized SamzaPortablePipelineOptions pipelineOptions;

    protected SamzaJobServerDriver(@UnknownKeyFor @NonNull @Initialized SamzaPortablePipelineOptions pipelineOptions) {
        this.pipelineOptions = pipelineOptions;
    }

    public static void main(@UnknownKeyFor @NonNull @Initialized String @UnknownKeyFor @NonNull @Initialized [] args) throws @UnknownKeyFor @NonNull @Initialized Exception {
        SamzaPortablePipelineOptions pipelineOptions = (SamzaPortablePipelineOptions)PipelineOptionsFactory.fromArgs((String[])args).as(SamzaPortablePipelineOptions.class);
        SamzaJobServerDriver.fromOptions(pipelineOptions).run();
    }

    public static @UnknownKeyFor @NonNull @Initialized SamzaJobServerDriver fromOptions(@UnknownKeyFor @NonNull @Initialized SamzaPortablePipelineOptions pipelineOptions) {
        HashMap<String, String> overrideConfig = pipelineOptions.getConfigOverride() != null ? pipelineOptions.getConfigOverride() : new HashMap<String, String>();
        overrideConfig.put("beam.override.portable", String.valueOf(true));
        overrideConfig.put("beam.override.control.port", String.valueOf(pipelineOptions.getControlPort()));
        overrideConfig.put("beam.override.fs.token.path", pipelineOptions.getFsTokenPath());
        pipelineOptions.setConfigOverride(overrideConfig);
        return new SamzaJobServerDriver(pipelineOptions);
    }

    private @UnknownKeyFor @NonNull @Initialized InMemoryJobService createJobService() throws @UnknownKeyFor @NonNull @Initialized IOException {
        JobInvoker jobInvoker = new JobInvoker("samza-job-invoker"){

            protected @UnknownKeyFor @NonNull @Initialized JobInvocation invokeWithExecutor(// Could not load outer class - annotation placement on inner may be incorrect
             @UnknownKeyFor @NonNull @Initialized RunnerApi.Pipeline pipeline, @UnknownKeyFor @NonNull @Initialized Struct options, @Nullable @UnknownKeyFor @Initialized String retrievalToken, @UnknownKeyFor @NonNull @Initialized ListeningExecutorService executorService) throws @UnknownKeyFor @NonNull @Initialized IOException {
                return new SamzaJobInvocation(pipeline, SamzaJobServerDriver.this.pipelineOptions);
            }
        };
        return InMemoryJobService.create(null, session -> session, stagingSessionToken -> {}, (JobInvoker)jobInvoker, (int)10);
    }

    private @UnknownKeyFor @NonNull @Initialized ExpansionServer createExpansionService(@UnknownKeyFor @NonNull @Initialized String host, @UnknownKeyFor @NonNull @Initialized int expansionPort) throws @UnknownKeyFor @NonNull @Initialized IOException {
        if (host == null) {
            host = InetAddress.getLoopbackAddress().getHostName();
        }
        ExpansionServer expansionServer = ExpansionServer.create((ExpansionService)new ExpansionService(), (String)host, (int)expansionPort);
        LOG.info("Java ExpansionService started on {}:{}", (Object)expansionServer.getHost(), (Object)expansionServer.getPort());
        return expansionServer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run() throws @UnknownKeyFor @NonNull @Initialized Exception {
        InMemoryJobService service = this.createJobService();
        GrpcFnServer jobServiceGrpcFnServer = GrpcFnServer.allocatePortAndCreateFor((FnService)service, (ServerFactory)ServerFactory.createWithPortSupplier(this.pipelineOptions::getJobPort));
        String jobServerUrl = jobServiceGrpcFnServer.getApiServiceDescriptor().getUrl();
        LOG.info("JobServer started on {}", (Object)jobServerUrl);
        URI uri = new URI(jobServerUrl);
        ExpansionServer expansionServer = this.createExpansionService(uri.getHost(), this.pipelineOptions.getExpansionPort());
        try {
            jobServiceGrpcFnServer.getServer().awaitTermination();
        }
        finally {
            LOG.info("JobServer closing");
            jobServiceGrpcFnServer.close();
            if (expansionServer != null) {
                try {
                    expansionServer.close();
                    LOG.info("Expansion stopped on {}:{}", (Object)expansionServer.getHost(), (Object)expansionServer.getPort());
                }
                catch (Exception e) {
                    LOG.error("Error while closing the Expansion Service.", (Throwable)e);
                }
            }
        }
    }
}

