/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.pubsublite;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.ApiService;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.PublishMetadata;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.grpc.StatusException;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import org.apache.beam.sdk.io.gcp.pubsublite.PerServerPublisherCache;
import org.apache.beam.sdk.io.gcp.pubsublite.PublisherOptions;
import org.apache.beam.sdk.io.gcp.pubsublite.PublisherOrError;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;

class PubsubLiteSink
extends DoFn<PubSubMessage, Void> {
    private final @UnknownKeyFor @NonNull @Initialized PublisherOptions options;
    @GuardedBy(value="this")
    private transient @UnknownKeyFor @NonNull @Initialized PublisherOrError publisherOrError;
    @GuardedBy(value="this")
    private transient @UnknownKeyFor @NonNull @Initialized int outstanding;
    @GuardedBy(value="this")
    private transient @UnknownKeyFor @NonNull @Initialized Deque<@UnknownKeyFor @NonNull @Initialized StatusException> errorsSinceLastFinish;
    private static final @UnknownKeyFor @NonNull @Initialized Executor executor = Executors.newCachedThreadPool();

    PubsubLiteSink(@UnknownKeyFor @NonNull @Initialized PublisherOptions options) {
        this.options = options;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @DoFn.Setup
    public void setup() throws @UnknownKeyFor @NonNull @Initialized StatusException {
        Publisher<PublishMetadata> publisher = this.options.usesCache() ? PerServerPublisherCache.PUBLISHER_CACHE.get(this.options) : this.options.getPublisher();
        PubsubLiteSink pubsubLiteSink = this;
        synchronized (pubsubLiteSink) {
            this.outstanding = 0;
            this.errorsSinceLastFinish = new ArrayDeque<StatusException>();
            this.publisherOrError = PublisherOrError.ofPublisher(publisher);
        }
        final Consumer<Throwable> onFailure = t -> {
            PubsubLiteSink pubsubLiteSink = this;
            synchronized (pubsubLiteSink) {
                this.publisherOrError = PublisherOrError.ofError(ExtractStatus.toCanonical((Throwable)t));
            }
        };
        publisher.addListener(new ApiService.Listener(){

            public void failed(// Could not load outer class - annotation placement on inner may be incorrect
             @UnknownKeyFor @NonNull @Initialized ApiService.State s, @UnknownKeyFor @NonNull @Initialized Throwable t) {
                onFailure.accept(t);
            }
        }, MoreExecutors.directExecutor());
        if (!this.options.usesCache()) {
            publisher.startAsync();
        }
    }

    private synchronized void decrementOutstanding() {
        --this.outstanding;
        ((Object)((Object)this)).notify();
    }

    @DoFn.ProcessElement
    public synchronized void processElement(@DoFn.Element @UnknownKeyFor @NonNull @Initialized PubSubMessage message) throws @UnknownKeyFor @NonNull @Initialized StatusException {
        ++this.outstanding;
        if (this.publisherOrError.getKind() == PublisherOrError.Kind.ERROR) {
            throw this.publisherOrError.error();
        }
        ApiFuture future = this.publisherOrError.publisher().publish(Message.fromProto((PubSubMessage)message));
        final Consumer<Throwable> onFailure = t -> {
            PubsubLiteSink pubsubLiteSink = this;
            synchronized (pubsubLiteSink) {
                this.decrementOutstanding();
                this.errorsSinceLastFinish.push(ExtractStatus.toCanonical((Throwable)t));
            }
        };
        ApiFutures.addCallback((ApiFuture)future, (ApiFutureCallback)new ApiFutureCallback<PublishMetadata>(){

            public void onSuccess(@UnknownKeyFor @NonNull @Initialized PublishMetadata publishMetadata) {
                PubsubLiteSink.this.decrementOutstanding();
            }

            public void onFailure(@UnknownKeyFor @NonNull @Initialized Throwable t) {
                onFailure.accept(t);
            }
        }, (Executor)executor);
    }

    @DoFn.FinishBundle
    public synchronized void finishBundle() throws @UnknownKeyFor @NonNull @Initialized StatusException, @UnknownKeyFor @NonNull @Initialized IOException, @UnknownKeyFor @NonNull @Initialized InterruptedException {
        while (this.outstanding > 0) {
            ((Object)((Object)this)).wait();
        }
        if (!this.errorsSinceLastFinish.isEmpty()) {
            StatusException canonical = this.errorsSinceLastFinish.pop();
            while (!this.errorsSinceLastFinish.isEmpty()) {
                canonical.addSuppressed((Throwable)this.errorsSinceLastFinish.pop());
            }
            throw canonical;
        }
        if (this.publisherOrError.getKind() == PublisherOrError.Kind.ERROR) {
            throw this.publisherOrError.error();
        }
    }
}

