/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness.state;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnStateGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Channel;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BeamFnStateGrpcClientCache {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(BeamFnStateGrpcClientCache.class);
    private final @UnknownKeyFor @NonNull @Initialized Map<// Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized Endpoints.ApiServiceDescriptor, @UnknownKeyFor @NonNull @Initialized BeamFnStateClient> cache;
    private final @UnknownKeyFor @NonNull @Initialized ManagedChannelFactory channelFactory;
    private final @UnknownKeyFor @NonNull @Initialized OutboundObserverFactory outboundObserverFactory;
    private final @UnknownKeyFor @NonNull @Initialized IdGenerator idGenerator;

    public BeamFnStateGrpcClientCache(@UnknownKeyFor @NonNull @Initialized IdGenerator idGenerator, @UnknownKeyFor @NonNull @Initialized ManagedChannelFactory channelFactory, @UnknownKeyFor @NonNull @Initialized OutboundObserverFactory outboundObserverFactory) {
        this.idGenerator = idGenerator;
        this.channelFactory = channelFactory.withDirectExecutor();
        this.outboundObserverFactory = outboundObserverFactory;
        this.cache = new HashMap<Endpoints.ApiServiceDescriptor, BeamFnStateClient>();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized @UnknownKeyFor @NonNull @Initialized BeamFnStateClient forApiServiceDescriptor(// Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized Endpoints.ApiServiceDescriptor apiServiceDescriptor) throws @UnknownKeyFor @NonNull @Initialized IOException {
        BeamFnStateClient rval;
        Map<Endpoints.ApiServiceDescriptor, BeamFnStateClient> map = this.cache;
        synchronized (map) {
            rval = this.cache.get(apiServiceDescriptor);
        }
        if (rval == null) {
            rval = new GrpcStateClient(apiServiceDescriptor);
            map = this.cache;
            synchronized (map) {
                this.cache.put(apiServiceDescriptor, rval);
            }
        }
        return rval;
    }

    private class GrpcStateClient
    implements BeamFnStateClient {
        private final @UnknownKeyFor @NonNull @Initialized Object lock = new Object();
        private final // Could not load outer class - annotation placement on inner may be incorrect
         @UnknownKeyFor @NonNull @Initialized Endpoints.ApiServiceDescriptor apiServiceDescriptor;
        private final @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized CompletableFuture<// Could not load outer class - annotation placement on inner may be incorrect
         @UnknownKeyFor @NonNull @Initialized BeamFnApi.StateResponse>> outstandingRequests;
        private final @UnknownKeyFor @NonNull @Initialized StreamObserver<// Could not load outer class - annotation placement on inner may be incorrect
         @UnknownKeyFor @NonNull @Initialized BeamFnApi.StateRequest> outboundObserver;
        private final @UnknownKeyFor @NonNull @Initialized ManagedChannel channel;
        private @UnknownKeyFor @NonNull @Initialized RuntimeException closed;
        private @UnknownKeyFor @NonNull @Initialized boolean errorDuringConstruction;

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private GrpcStateClient(Endpoints.ApiServiceDescriptor apiServiceDescriptor) {
            this.apiServiceDescriptor = apiServiceDescriptor;
            this.outstandingRequests = new HashMap<String, CompletableFuture<BeamFnApi.StateResponse>>();
            this.channel = BeamFnStateGrpcClientCache.this.channelFactory.forDescriptor(apiServiceDescriptor);
            this.errorDuringConstruction = false;
            this.outboundObserver = BeamFnStateGrpcClientCache.this.outboundObserverFactory.outboundObserverFor(arg_0 -> ((BeamFnStateGrpc.BeamFnStateStub)BeamFnStateGrpc.newStub((Channel)this.channel)).state(arg_0), (StreamObserver)new InboundObserver());
            Object object = this.lock;
            synchronized (object) {
                if (this.errorDuringConstruction) {
                    this.outboundObserver.onCompleted();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public @UnknownKeyFor @NonNull @Initialized CompletableFuture<// Could not load outer class - annotation placement on inner may be incorrect
         @UnknownKeyFor @NonNull @Initialized BeamFnApi.StateResponse> handle(// Could not load outer class - annotation placement on inner may be incorrect
         @UnknownKeyFor @NonNull @Initialized BeamFnApi.StateRequest.Builder requestBuilder) {
            requestBuilder.setId(BeamFnStateGrpcClientCache.this.idGenerator.getId());
            BeamFnApi.StateRequest request = requestBuilder.build();
            CompletableFuture<BeamFnApi.StateResponse> response = new CompletableFuture<BeamFnApi.StateResponse>();
            Object object = this.lock;
            synchronized (object) {
                if (this.closed != null) {
                    response.completeExceptionally(this.closed);
                    return response;
                }
                this.outstandingRequests.put(request.getId(), response);
            }
            LOG.debug("Sending StateRequest {}", (Object)request);
            this.outboundObserver.onNext((Object)request);
            return response;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void closeAndCleanUp(@UnknownKeyFor @NonNull @Initialized RuntimeException cause) {
            Object object = this.lock;
            synchronized (object) {
                if (this.closed != null) {
                    return;
                }
                this.closed = cause;
                Map map = BeamFnStateGrpcClientCache.this.cache;
                synchronized (map) {
                    BeamFnStateGrpcClientCache.this.cache.remove(this.apiServiceDescriptor);
                }
                if (!this.outstandingRequests.isEmpty()) {
                    LOG.error("BeamFnState failed, clearing outstanding requests {}", this.outstandingRequests);
                    for (CompletableFuture completableFuture : this.outstandingRequests.values()) {
                        completableFuture.completeExceptionally(cause);
                    }
                    this.outstandingRequests.clear();
                }
                if (this.outboundObserver == null) {
                    this.errorDuringConstruction = true;
                } else {
                    this.outboundObserver.onCompleted();
                }
            }
        }

        private class InboundObserver
        implements StreamObserver<BeamFnApi.StateResponse> {
            private InboundObserver() {
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void onNext(// Could not load outer class - annotation placement on inner may be incorrect
             @UnknownKeyFor @NonNull @Initialized BeamFnApi.StateResponse value) {
                CompletableFuture responseFuture;
                LOG.debug("Received StateResponse {}", (Object)value);
                Object object = GrpcStateClient.this.lock;
                synchronized (object) {
                    responseFuture = (CompletableFuture)GrpcStateClient.this.outstandingRequests.remove(value.getId());
                }
                if (responseFuture == null) {
                    LOG.warn("Dropped unknown StateResponse {}", (Object)value);
                    return;
                }
                if (value.getError().isEmpty()) {
                    responseFuture.complete(value);
                } else {
                    responseFuture.completeExceptionally(new IllegalStateException(value.getError()));
                }
            }

            public void onError(@UnknownKeyFor @NonNull @Initialized Throwable t) {
                GrpcStateClient.this.closeAndCleanUp(t instanceof RuntimeException ? (RuntimeException)t : new RuntimeException(t));
            }

            public void onCompleted() {
                GrpcStateClient.this.closeAndCleanUp(new RuntimeException("Server hanged up."));
            }
        }
    }
}

