/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.pubsub;

import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubGrpcClient;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.TestPubsub;
import org.apache.beam.sdk.io.gcp.pubsub.TestPubsubOptions;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.ReadableInstant;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestPubsubSignal
implements TestRule {
    private static final Logger LOG = LoggerFactory.getLogger(TestPubsubSignal.class);
    private static final String RESULT_TOPIC_NAME = "result";
    private static final String RESULT_SUCCESS_MESSAGE = "SUCCESS";
    private static final String START_TOPIC_NAME = "start";
    private static final String START_SIGNAL_MESSAGE = "START SIGNAL";
    private static final String NO_ID_ATTRIBUTE = null;
    private static final String NO_TIMESTAMP_ATTRIBUTE = null;
    PubsubClient pubsub;
    private TestPubsubOptions pipelineOptions;
    @Nullable
    private PubsubClient.TopicPath resultTopicPath = null;
    @Nullable
    private PubsubClient.TopicPath startTopicPath = null;

    public static TestPubsubSignal create() {
        TestPubsubOptions options = (TestPubsubOptions)TestPipeline.testingPipelineOptions().as(TestPubsubOptions.class);
        return new TestPubsubSignal(options);
    }

    private TestPubsubSignal(TestPubsubOptions pipelineOptions) {
        this.pipelineOptions = pipelineOptions;
    }

    public Statement apply(final Statement base, final Description description) {
        return new Statement(){

            public void evaluate() throws Throwable {
                if (TestPubsubSignal.this.pubsub != null) {
                    throw new AssertionError((Object)("Pubsub client was not shutdown in previous test. Topic path is'" + TestPubsubSignal.this.resultTopicPath + "'. Current test: " + description.getDisplayName()));
                }
                try {
                    TestPubsubSignal.this.initializePubsub(description);
                    base.evaluate();
                }
                finally {
                    TestPubsubSignal.this.tearDown();
                }
            }
        };
    }

    private void initializePubsub(Description description) throws IOException {
        this.pubsub = PubsubGrpcClient.FACTORY.newClient(NO_TIMESTAMP_ATTRIBUTE, NO_ID_ATTRIBUTE, this.pipelineOptions);
        PubsubClient.TopicPath resultTopicPathTmp = PubsubClient.topicPathFromName(this.pipelineOptions.getProject(), TestPubsub.createTopicName(description, RESULT_TOPIC_NAME));
        PubsubClient.TopicPath startTopicPathTmp = PubsubClient.topicPathFromName(this.pipelineOptions.getProject(), TestPubsub.createTopicName(description, START_TOPIC_NAME));
        this.pubsub.createTopic(resultTopicPathTmp);
        this.pubsub.createTopic(startTopicPathTmp);
        this.resultTopicPath = resultTopicPathTmp;
        this.startTopicPath = startTopicPathTmp;
    }

    private void tearDown() throws IOException {
        if (this.pubsub == null) {
            return;
        }
        try {
            if (this.resultTopicPath != null) {
                this.pubsub.deleteTopic(this.resultTopicPath);
            }
        }
        finally {
            this.pubsub.close();
            this.pubsub = null;
            this.resultTopicPath = null;
        }
    }

    public PTransform<PBegin, PDone> signalStart() {
        return new PublishStart(this.startTopicPath);
    }

    public <T> PTransform<PCollection<? extends T>, POutput> signalSuccessWhen(Coder<T> coder, SerializableFunction<T, String> formatter, SerializableFunction<Set<T>, Boolean> successPredicate) {
        return new PublishSuccessWhen<T>(coder, formatter, successPredicate, this.resultTopicPath);
    }

    public <T> PTransform<PCollection<? extends T>, POutput> signalSuccessWhen(Coder<T> coder, SerializableFunction<Set<T>, Boolean> successPredicate) {
        return this.signalSuccessWhen(coder, Object::toString, successPredicate);
    }

    public Supplier<Void> waitForStart(Duration duration) throws IOException {
        PubsubClient.SubscriptionPath startSubscriptionPath = PubsubClient.subscriptionPathFromName(this.pipelineOptions.getProject(), "start-subscription-" + String.valueOf(ThreadLocalRandom.current().nextLong()));
        this.pubsub.createSubscription(this.startTopicPath, startSubscriptionPath, (int)duration.getStandardSeconds());
        return Suppliers.memoize(() -> {
            try {
                String result = this.pollForResultForDuration(startSubscriptionPath, duration);
                Preconditions.checkState((boolean)START_SIGNAL_MESSAGE.equals(result));
                return null;
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
    }

    public void waitForSuccess(Duration duration) throws IOException {
        PubsubClient.SubscriptionPath resultSubscriptionPath = PubsubClient.subscriptionPathFromName(this.pipelineOptions.getProject(), "result-subscription-" + String.valueOf(ThreadLocalRandom.current().nextLong()));
        this.pubsub.createSubscription(this.resultTopicPath, resultSubscriptionPath, (int)duration.getStandardSeconds());
        String result = this.pollForResultForDuration(resultSubscriptionPath, duration);
        if (!RESULT_SUCCESS_MESSAGE.equals(result)) {
            throw new AssertionError((Object)result);
        }
    }

    private String pollForResultForDuration(PubsubClient.SubscriptionPath signalSubscriptionPath, Duration duration) throws IOException {
        List<PubsubClient.IncomingMessage> signal = null;
        DateTime endPolling = DateTime.now().plus(duration.getMillis());
        while (true) {
            try {
                signal = this.pubsub.pull(DateTime.now().getMillis(), signalSubscriptionPath, 1, false);
                this.pubsub.acknowledge(signalSubscriptionPath, signal.stream().map(PubsubClient.IncomingMessage::ackId).collect(Collectors.toList()));
            }
            catch (StatusRuntimeException e) {
                if (!Status.DEADLINE_EXCEEDED.equals((Object)e.getStatus())) {
                    LOG.warn("(Will retry) Error while polling {} for signal: {}", (Object)signalSubscriptionPath, (Object)e.getStatus());
                }
                this.sleep(500L);
                if (DateTime.now().isBefore((ReadableInstant)endPolling)) continue;
            }
            break;
        }
        if (signal == null) {
            throw new AssertionError((Object)String.format("Did not receive signal on %s in %ss", signalSubscriptionPath, duration.getStandardSeconds()));
        }
        return signal.get(0).message().getData().toStringUtf8();
    }

    private void sleep(long t) {
        try {
            Thread.sleep(t);
        }
        catch (InterruptedException ex) {
            throw new RuntimeException(ex);
        }
    }

    static class StatefulPredicateCheck<T>
    extends DoFn<KV<String, ? extends T>, String> {
        private final SerializableFunction<T, String> formatter;
        private SerializableFunction<Set<T>, Boolean> successPredicate;
        private static final String SEEN_EVENTS = "seenEvents";
        @DoFn.StateId(value="seenEvents")
        private final StateSpec<BagState<T>> seenEvents;

        StatefulPredicateCheck(Coder<T> coder, SerializableFunction<T, String> formatter, SerializableFunction<Set<T>, Boolean> successPredicate) {
            this.seenEvents = StateSpecs.bag(coder);
            this.formatter = formatter;
            this.successPredicate = successPredicate;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext context, @DoFn.StateId(value="seenEvents") BagState<T> seenEvents) {
            seenEvents.add(((KV)context.element()).getValue());
            ImmutableSet eventsSoFar = ImmutableSet.copyOf((Iterable)seenEvents.read());
            try {
                if (((Boolean)this.successPredicate.apply((Object)eventsSoFar)).booleanValue()) {
                    context.output((Object)TestPubsubSignal.RESULT_SUCCESS_MESSAGE);
                }
            }
            catch (Throwable e) {
                context.output((Object)("FAILURE: " + e.getMessage()));
            }
        }
    }

    static class PublishSuccessWhen<T>
    extends PTransform<PCollection<? extends T>, POutput> {
        private final Coder<T> coder;
        private final SerializableFunction<T, String> formatter;
        private final SerializableFunction<Set<T>, Boolean> successPredicate;
        private final PubsubClient.TopicPath resultTopicPath;

        PublishSuccessWhen(Coder<T> coder, SerializableFunction<T, String> formatter, SerializableFunction<Set<T>, Boolean> successPredicate, PubsubClient.TopicPath resultTopicPath) {
            this.coder = coder;
            this.formatter = formatter;
            this.successPredicate = successPredicate;
            this.resultTopicPath = resultTopicPath;
        }

        public POutput expand(PCollection<? extends T> input) {
            return ((PCollection)((PCollection)((PCollection)input.apply((PTransform)Window.into((WindowFn)new GlobalWindows()))).apply((PTransform)WithKeys.of((Object)"dummyKey"))).apply("checkAllEventsForSuccess", (PTransform)ParDo.of(new StatefulPredicateCheck<T>(this.coder, this.formatter, this.successPredicate)))).apply("publishSuccess", PubsubIO.writeStrings().to(this.resultTopicPath.getPath()));
        }
    }

    static class PublishStart
    extends PTransform<PBegin, PDone> {
        private final PubsubClient.TopicPath startTopicPath;

        PublishStart(PubsubClient.TopicPath startTopicPath) {
            this.startTopicPath = startTopicPath;
        }

        public PDone expand(PBegin input) {
            return (PDone)((PCollection)input.apply("Start signal", (PTransform)Create.of((Object)TestPubsubSignal.START_SIGNAL_MESSAGE, (Object[])new String[0]))).apply(PubsubIO.writeStrings().to(this.startTopicPath.getPath()));
        }
    }
}

