/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.client.pubsub.publisher;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.core.CredentialsProvider;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import java.io.IOException;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.pubsub.publisher.PubSubPublisherClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class PubSubPublisherClientImpl
implements PubSubPublisherClient {
    private static final Logger LOG = LoggerFactory.getLogger(PubSubPublisherClientImpl.class);
    private final Vertx vertx;
    private Publisher publisher;

    PubSubPublisherClientImpl(Vertx vertx, String projectId, String topic, CredentialsProvider credentialsProvider) throws ClientErrorException {
        this.vertx = Objects.requireNonNull(vertx);
        Objects.requireNonNull(projectId);
        Objects.requireNonNull(topic);
        try {
            TopicName topicName = TopicName.of((String)projectId, (String)topic);
            Publisher.Builder builder = Publisher.newBuilder((TopicName)topicName).setEnableMessageOrdering(true);
            Optional.ofNullable(credentialsProvider).ifPresent(arg_0 -> ((Publisher.Builder)builder).setCredentialsProvider(arg_0));
            this.publisher = builder.build();
        }
        catch (IOException e) {
            this.publisher = null;
            LOG.warn("error initializing publisher client", (Throwable)e);
            throw new ClientErrorException(503, "failed to create publisher for Pub/Sub", (Throwable)e);
        }
    }

    @Override
    public void close() {
        this.vertx.executeBlocking(() -> {
            if (this.publisher != null) {
                try {
                    this.publisher.shutdown();
                    this.publisher.awaitTermination(5L, TimeUnit.SECONDS);
                }
                catch (InterruptedException e) {
                    LOG.debug("timed out waiting for shut down of publisher", (Throwable)e);
                    Thread.currentThread().interrupt();
                }
            }
            return null;
        });
    }

    @Override
    public Future<String> publish(PubsubMessage pubsubMessage) {
        final Promise result = Promise.promise();
        final Context context = this.vertx.getOrCreateContext();
        ApiFuture messageIdFuture = this.publisher.publish(pubsubMessage);
        ApiFutures.addCallback((ApiFuture)messageIdFuture, (ApiFutureCallback)new ApiFutureCallback<String>(){

            public void onSuccess(String messageId) {
                context.runOnContext(ok -> result.complete((Object)messageId));
            }

            public void onFailure(Throwable t) {
                context.runOnContext(ok -> {
                    LOG.debug("error publishing messages to Pub/Sub", t);
                    result.fail((Throwable)new ServerErrorException(503, t));
                });
            }
        }, (Executor)MoreExecutors.directExecutor());
        return result.future();
    }
}

