/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.EnumMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import org.apache.beam.fn.harness.Cache;
import org.apache.beam.fn.harness.Caches;
import org.apache.beam.fn.harness.control.BeamFnControlClient;
import org.apache.beam.fn.harness.control.ExecutionStateSampler;
import org.apache.beam.fn.harness.control.FinalizeBundleHandler;
import org.apache.beam.fn.harness.control.HarnessMonitoringInfosInstructionHandler;
import org.apache.beam.fn.harness.control.ProcessBundleHandler;
import org.apache.beam.fn.harness.data.BeamFnDataGrpcClient;
import org.apache.beam.fn.harness.debug.DataSampler;
import org.apache.beam.fn.harness.logging.LoggingClient;
import org.apache.beam.fn.harness.logging.LoggingClientFactory;
import org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache;
import org.apache.beam.fn.harness.status.BeamFnStatusClient;
import org.apache.beam.fn.harness.stream.HarnessStreamObserverFactories;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.core.metrics.ShortIdMap;
import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.fn.IdGenerators;
import org.apache.beam.sdk.fn.JvmInitializers;
import org.apache.beam.sdk.fn.channel.AddHarnessIdInterceptor;
import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.function.ThrowingFunction;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.ExecutorOptions;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.SdkHarnessOptions;
import org.apache.beam.sdk.util.UnboundedScheduledExecutorService;
import org.apache.beam.sdk.util.construction.CoderTranslation;
import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.Message;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.TextFormat;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Channel;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ManagedChannel;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.MoreExecutors;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FnHarness {
    private static final @UnknownKeyFor @NonNull @Initialized String HARNESS_ID = "HARNESS_ID";
    private static final @UnknownKeyFor @NonNull @Initialized String CONTROL_API_SERVICE_DESCRIPTOR = "CONTROL_API_SERVICE_DESCRIPTOR";
    private static final @UnknownKeyFor @NonNull @Initialized String LOGGING_API_SERVICE_DESCRIPTOR = "LOGGING_API_SERVICE_DESCRIPTOR";
    private static final @UnknownKeyFor @NonNull @Initialized String STATUS_API_SERVICE_DESCRIPTOR = "STATUS_API_SERVICE_DESCRIPTOR";
    private static final @UnknownKeyFor @NonNull @Initialized String PIPELINE_OPTIONS_FILE = "PIPELINE_OPTIONS_FILE";
    private static final @UnknownKeyFor @NonNull @Initialized String PIPELINE_OPTIONS = "PIPELINE_OPTIONS";
    private static final @UnknownKeyFor @NonNull @Initialized String RUNNER_CAPABILITIES = "RUNNER_CAPABILITIES";
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(FnHarness.class);

    private static // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized Endpoints.ApiServiceDescriptor getApiServiceDescriptor(@UnknownKeyFor @NonNull @Initialized String descriptor) throws // Could not load outer class - annotation placement on inner may be incorrect
    @UnknownKeyFor @NonNull @Initialized TextFormat.ParseException {
        Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptorBuilder = Endpoints.ApiServiceDescriptor.newBuilder();
        TextFormat.merge((CharSequence)descriptor, (Message.Builder)apiServiceDescriptorBuilder);
        return apiServiceDescriptorBuilder.build();
    }

    public static @UnknownKeyFor @NonNull @Initialized String removeNestedKey(@UnknownKeyFor @NonNull @Initialized String jsonString, @UnknownKeyFor @NonNull @Initialized String keyToRemove) throws @UnknownKeyFor @NonNull @Initialized Exception {
        ObjectMapper mapper = new ObjectMapper();
        JsonNode rootNode = mapper.readTree(jsonString);
        FnHarness.removeKeyRecursively(rootNode, keyToRemove);
        return mapper.writeValueAsString((Object)rootNode);
    }

    private static void removeKeyRecursively(@UnknownKeyFor @NonNull @Initialized JsonNode node, @UnknownKeyFor @NonNull @Initialized String keyToRemove) {
        if (node.isObject()) {
            Iterator iterator = node.fields();
            while (iterator.hasNext()) {
                Map.Entry field = (Map.Entry)iterator.next();
                if (((String)field.getKey()).equals(keyToRemove)) {
                    iterator.remove();
                    continue;
                }
                FnHarness.removeKeyRecursively((JsonNode)field.getValue(), keyToRemove);
            }
        }
    }

    public static void main(@UnknownKeyFor @NonNull @Initialized String @UnknownKeyFor @NonNull @Initialized [] args) throws @UnknownKeyFor @NonNull @Initialized Exception {
        FnHarness.main(System::getenv);
    }

    @VisibleForTesting
    public static void main(@UnknownKeyFor @NonNull @Initialized Function<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized String> environmentVarGetter) throws @UnknownKeyFor @NonNull @Initialized Exception {
        JvmInitializers.runOnStartup();
        System.out.format("SDK Fn Harness started%n", new Object[0]);
        System.out.format("Harness ID %s%n", environmentVarGetter.apply(HARNESS_ID));
        System.out.format("Logging location %s%n", environmentVarGetter.apply(LOGGING_API_SERVICE_DESCRIPTOR));
        System.out.format("Control location %s%n", environmentVarGetter.apply(CONTROL_API_SERVICE_DESCRIPTOR));
        System.out.format("Status location %s%n", environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR));
        String id = environmentVarGetter.apply(HARNESS_ID);
        String pipelineOptionsJson = environmentVarGetter.apply(PIPELINE_OPTIONS);
        try {
            Path filePath;
            String pipelineOptionsPath = environmentVarGetter.apply(PIPELINE_OPTIONS_FILE);
            System.out.format("Pipeline Options File %s%n", pipelineOptionsPath);
            if (pipelineOptionsPath != null && Files.exists(filePath = Paths.get(pipelineOptionsPath, new String[0]), new LinkOption[0])) {
                System.out.format("Pipeline Options File %s exists. Overriding existing options.%n", pipelineOptionsPath);
                pipelineOptionsJson = new String(Files.readAllBytes(filePath), StandardCharsets.UTF_8);
            }
        }
        catch (Exception e) {
            System.out.format("Problem loading pipeline options from file: %s%n", e.getMessage());
        }
        System.out.format("Pipeline options %s%n", pipelineOptionsJson);
        pipelineOptionsJson = FnHarness.removeNestedKey(pipelineOptionsJson, "impersonateServiceAccount");
        PipelineOptions options = PipelineOptionsTranslation.fromJson((String)pipelineOptionsJson);
        Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor = FnHarness.getApiServiceDescriptor(environmentVarGetter.apply(LOGGING_API_SERVICE_DESCRIPTOR));
        Endpoints.ApiServiceDescriptor controlApiServiceDescriptor = FnHarness.getApiServiceDescriptor(environmentVarGetter.apply(CONTROL_API_SERVICE_DESCRIPTOR));
        Endpoints.ApiServiceDescriptor statusApiServiceDescriptor = environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR) == null ? null : FnHarness.getApiServiceDescriptor(environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR));
        String runnerCapabilitesOrNull = environmentVarGetter.apply(RUNNER_CAPABILITIES);
        ImmutableSet runnerCapabilites = runnerCapabilitesOrNull == null ? Collections.emptySet() : ImmutableSet.copyOf((Object[])runnerCapabilitesOrNull.split("\\s+"));
        FnHarness.main(id, options, (Set<String>)runnerCapabilites, loggingApiServiceDescriptor, controlApiServiceDescriptor, statusApiServiceDescriptor);
    }

    public static void main(@UnknownKeyFor @NonNull @Initialized String id, @UnknownKeyFor @NonNull @Initialized PipelineOptions options, @UnknownKeyFor @NonNull @Initialized Set<@UnknownKeyFor @NonNull @Initialized String> runnerCapabilities, // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor, // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized Endpoints.ApiServiceDescriptor controlApiServiceDescriptor, @javax.annotation.Nullable // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @Nullable @Initialized Endpoints.ApiServiceDescriptor statusApiServiceDescriptor) throws @UnknownKeyFor @NonNull @Initialized Exception {
        ManagedChannelFactory channelFactory = ExperimentalOptions.hasExperiment((PipelineOptions)options, (String)"beam_fn_api_epoll") ? ManagedChannelFactory.createEpoll() : ManagedChannelFactory.createDefault();
        OutboundObserverFactory outboundObserverFactory = HarnessStreamObserverFactories.fromOptions(options);
        FnHarness.main(id, options, runnerCapabilities, loggingApiServiceDescriptor, controlApiServiceDescriptor, statusApiServiceDescriptor, channelFactory, outboundObserverFactory, Caches.fromOptions(options));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(@UnknownKeyFor @NonNull @Initialized String id, @UnknownKeyFor @NonNull @Initialized PipelineOptions options, @UnknownKeyFor @NonNull @Initialized Set<@UnknownKeyFor @NonNull @Initialized String> runnerCapabilites, // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor, // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized Endpoints.ApiServiceDescriptor controlApiServiceDescriptor, // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized Endpoints.ApiServiceDescriptor statusApiServiceDescriptor, @UnknownKeyFor @NonNull @Initialized ManagedChannelFactory channelFactory, @UnknownKeyFor @NonNull @Initialized OutboundObserverFactory outboundObserverFactory, final @UnknownKeyFor @NonNull @Initialized Cache<@UnknownKeyFor @NonNull @Initialized Object, @UnknownKeyFor @NonNull @Initialized Object> processWideCache) throws @UnknownKeyFor @NonNull @Initialized Exception {
        channelFactory = channelFactory.withInterceptors((List)ImmutableList.of((Object)AddHarnessIdInterceptor.create((String)id)));
        IdGenerator idGenerator = IdGenerators.decrementingLongs();
        ShortIdMap metricsShortIds = new ShortIdMap();
        UnboundedScheduledExecutorService executorService = new UnboundedScheduledExecutorService();
        ((ExecutorOptions)options.as(ExecutorOptions.class)).setScheduledExecutorService((ScheduledExecutorService)executorService);
        CompletableFuture samplerTerminationFuture = new CompletableFuture();
        ExecutionStateSampler executionStateSampler = new ExecutionStateSampler(options, System::currentTimeMillis, message -> {
            String errMsg = "FATAL ERROR: Timeout occurred! Exiting JVM. Details:" + message;
            samplerTerminationFuture.completeExceptionally(new RuntimeException(errMsg));
        });
        DataSampler dataSampler = DataSampler.create(options);
        try (LoggingClient logging = LoggingClientFactory.createAndStart(options, loggingApiServiceDescriptor, arg_0 -> ((ManagedChannelFactory)channelFactory).forDescriptor(arg_0));){
            LOG.info("Fn Harness started");
            FileSystems.setDefaultPipelineOptions((PipelineOptions)options);
            CoderTranslation.verifyModelCodersRegistered();
            EnumMap<BeamFnApi.InstructionRequest.RequestCase, ThrowingFunction<BeamFnApi.InstructionRequest, BeamFnApi.InstructionResponse.Builder>> handlers = new EnumMap<BeamFnApi.InstructionRequest.RequestCase, ThrowingFunction<BeamFnApi.InstructionRequest, BeamFnApi.InstructionResponse.Builder>>(BeamFnApi.InstructionRequest.RequestCase.class);
            ManagedChannel channel = channelFactory.forDescriptor(controlApiServiceDescriptor);
            BeamFnControlGrpc.BeamFnControlStub controlStub = BeamFnControlGrpc.newStub((Channel)channel);
            final BeamFnControlGrpc.BeamFnControlBlockingStub blockingControlStub = BeamFnControlGrpc.newBlockingStub((Channel)channel);
            BeamFnDataGrpcClient beamFnDataMultiplexer = new BeamFnDataGrpcClient(options, arg_0 -> ((ManagedChannelFactory)channelFactory).forDescriptor(arg_0), outboundObserverFactory);
            BeamFnStateGrpcClientCache beamFnStateGrpcClientCache = new BeamFnStateGrpcClientCache(idGenerator, channelFactory, outboundObserverFactory);
            FinalizeBundleHandler finalizeBundleHandler = new FinalizeBundleHandler((ExecutorService)executorService);
            Function<String, BeamFnApi.ProcessBundleDescriptor> getProcessBundleDescriptor = new Function<String, BeamFnApi.ProcessBundleDescriptor>(){
                private static final @UnknownKeyFor @NonNull @Initialized String PROCESS_BUNDLE_DESCRIPTORS = "ProcessBundleDescriptors";
                private final @UnknownKeyFor @NonNull @Initialized Cache<@UnknownKeyFor @NonNull @Initialized String, // Could not load outer class - annotation placement on inner may be incorrect
                 @UnknownKeyFor @NonNull @Initialized BeamFnApi.ProcessBundleDescriptor> cache;
                {
                    this.cache = Caches.subCache(processWideCache, PROCESS_BUNDLE_DESCRIPTORS, new Object[0]);
                }

                @Override
                public // Could not load outer class - annotation placement on inner may be incorrect
                 @UnknownKeyFor @NonNull @Initialized BeamFnApi.ProcessBundleDescriptor apply(@UnknownKeyFor @NonNull @Initialized String id) {
                    return this.cache.computeIfAbsent(id, this::loadDescriptor);
                }

                private // Could not load outer class - annotation placement on inner may be incorrect
                 @UnknownKeyFor @NonNull @Initialized BeamFnApi.ProcessBundleDescriptor loadDescriptor(@UnknownKeyFor @NonNull @Initialized String id) {
                    return blockingControlStub.getProcessBundleDescriptor(BeamFnApi.GetProcessBundleDescriptorRequest.newBuilder().setProcessBundleDescriptorId(id).build());
                }
            };
            MetricsEnvironment.setProcessWideContainer((MetricsContainer)MetricsContainerImpl.createProcessWideContainer());
            ProcessBundleHandler processBundleHandler = new ProcessBundleHandler(options, runnerCapabilites, getProcessBundleDescriptor, beamFnDataMultiplexer, beamFnStateGrpcClientCache, finalizeBundleHandler, metricsShortIds, executionStateSampler, processWideCache, dataSampler);
            BeamFnStatusClient beamFnStatusClient = null;
            if (statusApiServiceDescriptor != null) {
                beamFnStatusClient = new BeamFnStatusClient(statusApiServiceDescriptor, arg_0 -> ((ManagedChannelFactory)channelFactory).forDescriptor(arg_0), processBundleHandler.getBundleProcessorCache(), options, processWideCache);
            }
            handlers.put(BeamFnApi.InstructionRequest.RequestCase.REGISTER, request -> BeamFnApi.InstructionResponse.newBuilder().setRegister(BeamFnApi.RegisterResponse.getDefaultInstance()));
            handlers.put(BeamFnApi.InstructionRequest.RequestCase.FINALIZE_BUNDLE, finalizeBundleHandler::finalizeBundle);
            handlers.put(BeamFnApi.InstructionRequest.RequestCase.PROCESS_BUNDLE, processBundleHandler::processBundle);
            handlers.put(BeamFnApi.InstructionRequest.RequestCase.PROCESS_BUNDLE_PROGRESS, processBundleHandler::progress);
            handlers.put(BeamFnApi.InstructionRequest.RequestCase.PROCESS_BUNDLE_SPLIT, processBundleHandler::trySplit);
            handlers.put(BeamFnApi.InstructionRequest.RequestCase.MONITORING_INFOS, request -> BeamFnApi.InstructionResponse.newBuilder().setMonitoringInfos(BeamFnApi.MonitoringInfosMetadataResponse.newBuilder().putAllMonitoringInfo(metricsShortIds.get((List<String>)request.getMonitoringInfos().getMonitoringInfoIdList()))));
            HarnessMonitoringInfosInstructionHandler processWideHandler = new HarnessMonitoringInfosInstructionHandler(metricsShortIds);
            handlers.put(BeamFnApi.InstructionRequest.RequestCase.HARNESS_MONITORING_INFOS, processWideHandler::harnessMonitoringInfos);
            handlers.put(BeamFnApi.InstructionRequest.RequestCase.SAMPLE_DATA, request -> dataSampler == null ? BeamFnApi.InstructionResponse.newBuilder().setSampleData(BeamFnApi.SampleDataResponse.newBuilder()) : dataSampler.handleDataSampleRequest((BeamFnApi.InstructionRequest)request));
            JvmInitializers.runBeforeProcessing((PipelineOptions)options);
            LOG.info("Entering instruction processing loop");
            BeamFnControlClient control = new BeamFnControlClient((BeamFnControlGrpc.BeamFnControlStub)controlStub.withExecutor(MoreExecutors.directExecutor()), outboundObserverFactory, (Executor)executorService, handlers);
            if (((SdkHarnessOptions)options.as(SdkHarnessOptions.class)).getEnableLogViaFnApi()) {
                CompletableFuture.anyOf(control.terminationFuture(), logging.terminationFuture(), samplerTerminationFuture).get();
            } else {
                CompletableFuture.anyOf(control.terminationFuture(), samplerTerminationFuture).get();
            }
            if (beamFnStatusClient != null) {
                beamFnStatusClient.close();
            }
            processBundleHandler.shutdown();
        }
        catch (Exception e) {
            LOG.error("Shutting down harness due to exception", (Throwable)e);
            e.printStackTrace();
        }
        finally {
            LOG.info("Shutting SDK harness down.");
            executionStateSampler.stop();
            executorService.shutdown();
        }
    }
}

