/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.fnexecution.state;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnStateGrpc;
import org.apache.beam.runners.fnexecution.FnService;
import org.apache.beam.runners.fnexecution.state.StateDelegator;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCallStreamObserver;
import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;

public class GrpcStateService
extends BeamFnStateGrpc.BeamFnStateImplBase
implements StateDelegator,
FnService {
    private final ConcurrentLinkedQueue<Inbound> clients;
    private final ConcurrentMap<String, StateRequestHandler> requestHandlers = new ConcurrentHashMap<String, StateRequestHandler>();

    public static GrpcStateService create() {
        return new GrpcStateService();
    }

    private GrpcStateService() {
        this.clients = new ConcurrentLinkedQueue();
    }

    @Override
    public void close() throws Exception {
        Exception thrown = null;
        for (Inbound inbound : this.clients) {
            try {
                if (inbound.outboundObserver instanceof ServerCallStreamObserver && ((ServerCallStreamObserver)inbound.outboundObserver).isCancelled()) continue;
                inbound.outboundObserver.onCompleted();
            }
            catch (Exception t) {
                if (thrown == null) {
                    thrown = t;
                    continue;
                }
                thrown.addSuppressed(t);
            }
        }
        if (thrown != null) {
            throw thrown;
        }
    }

    public StreamObserver<BeamFnApi.StateRequest> state(StreamObserver<BeamFnApi.StateResponse> responseObserver) {
        Inbound rval = new Inbound(responseObserver);
        this.clients.add(rval);
        return rval;
    }

    @Override
    public StateDelegator.Registration registerForProcessBundleInstructionId(String processBundleInstructionId, StateRequestHandler handler) {
        this.requestHandlers.putIfAbsent(processBundleInstructionId, handler);
        return new Registration(processBundleInstructionId);
    }

    private class Inbound
    implements StreamObserver<BeamFnApi.StateRequest> {
        private final StreamObserver<BeamFnApi.StateResponse> outboundObserver;

        Inbound(StreamObserver<BeamFnApi.StateResponse> outboundObserver) {
            this.outboundObserver = outboundObserver;
        }

        public void onNext(BeamFnApi.StateRequest request) {
            StateRequestHandler handler = GrpcStateService.this.requestHandlers.getOrDefault(request.getInstructionId(), this::handlerNotFound);
            try {
                CompletionStage<BeamFnApi.StateResponse.Builder> result = handler.handle(request);
                result.whenComplete((responseBuilder, t) -> this.outboundObserver.onNext((Object)(t == null ? responseBuilder.setId(request.getId()).build() : this.createErrorResponse(request.getId(), (Throwable)t))));
            }
            catch (Exception e) {
                this.outboundObserver.onNext((Object)this.createErrorResponse(request.getId(), e));
            }
        }

        public void onError(Throwable t) {
            this.outboundObserver.onCompleted();
        }

        public void onCompleted() {
            this.outboundObserver.onCompleted();
        }

        private CompletionStage<BeamFnApi.StateResponse.Builder> handlerNotFound(BeamFnApi.StateRequest request) {
            CompletableFuture<BeamFnApi.StateResponse.Builder> result = new CompletableFuture<BeamFnApi.StateResponse.Builder>();
            result.complete(BeamFnApi.StateResponse.newBuilder().setError(String.format("Unknown process bundle instruction id '%s'", request.getInstructionId())));
            return result;
        }

        private BeamFnApi.StateResponse createErrorResponse(String id, Throwable t) {
            return BeamFnApi.StateResponse.newBuilder().setId(id).setError(Throwables.getStackTraceAsString((Throwable)t)).build();
        }
    }

    private class Registration
    implements StateDelegator.Registration {
        private final String processBundleInstructionId;

        private Registration(String processBundleInstructionId) {
            this.processBundleInstructionId = processBundleInstructionId;
        }

        @Override
        public void deregister() {
            GrpcStateService.this.requestHandlers.remove(this.processBundleInstructionId);
        }

        @Override
        public void abort() {
            this.deregister();
        }
    }
}

