/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.pubsublite.internal;

import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.PartitionLookupUtils;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.gcp.pubsublite.SubscriberOptions;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.CheckpointMarkImpl;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.CloserReference;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.MemoryBufferedSubscriber;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.SubscriberAssembler;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.SubscriptionPartition;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.TopicBacklogReader;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.UnboundedReaderImpl;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.checkerframework.checker.nullness.qual.Nullable;

public class UnboundedSourceImpl
extends UnboundedSource<SequencedMessage, CheckpointMarkImpl> {
    private final SubscriberOptions subscriberOptions;
    private final SubscriberFactory subscriberFactory;
    private final BacklogReaderFactory readerFactory;
    private final Optional<Partition> partition;

    UnboundedSourceImpl(SubscriberOptions subscriberOptions, SubscriberFactory subscriberFactory, BacklogReaderFactory readerFactory) {
        this.subscriberOptions = subscriberOptions;
        this.subscriberFactory = subscriberFactory;
        this.readerFactory = readerFactory;
        this.partition = Optional.absent();
    }

    private UnboundedSourceImpl(SubscriberOptions subscriberOptions, SubscriberFactory subscriberFactory, BacklogReaderFactory readerFactory, Partition partition) {
        this.subscriberOptions = subscriberOptions;
        this.subscriberFactory = subscriberFactory;
        this.readerFactory = readerFactory;
        this.partition = Optional.of((Object)partition);
    }

    public List<? extends UnboundedSource<SequencedMessage, CheckpointMarkImpl>> split(int desiredNumSplits, PipelineOptions options) throws Exception {
        Preconditions.checkState((!this.partition.isPresent() ? 1 : 0) != 0);
        int numPartitions = PartitionLookupUtils.numPartitions((SubscriptionPath)this.subscriberOptions.subscriptionPath());
        return IntStream.range(0, numPartitions).mapToObj(val -> new UnboundedSourceImpl(this.subscriberOptions, this.subscriberFactory, this.readerFactory, Partition.of((long)val))).collect(Collectors.toList());
    }

    public UnboundedSource.UnboundedReader<SequencedMessage> createReader(PipelineOptions options, @Nullable CheckpointMarkImpl checkpointMark) throws IOException {
        Preconditions.checkState((boolean)this.partition.isPresent());
        SubscriberAssembler assembler = new SubscriberAssembler(this.subscriberOptions, (Partition)this.partition.get());
        Offset initialOffset = checkpointMark == null ? assembler.getInitialOffsetReader().read() : checkpointMark.offset;
        SubscriptionPartition subscription = SubscriptionPartition.of(this.subscriberOptions.subscriptionPath(), (Partition)this.partition.get());
        MemoryBufferedSubscriber subscriber = this.subscriberFactory.create(subscription, initialOffset);
        return new UnboundedReaderImpl(this, subscriber, this.readerFactory.create(subscription), CloserReference.of(assembler.newCommitter()), initialOffset);
    }

    public Coder<CheckpointMarkImpl> getCheckpointMarkCoder() {
        return CheckpointMarkImpl.coder();
    }

    public Coder<SequencedMessage> getOutputCoder() {
        return ProtoCoder.of(SequencedMessage.class);
    }

    static interface BacklogReaderFactory
    extends Serializable {
        public TopicBacklogReader create(SubscriptionPartition var1);
    }

    static interface SubscriberFactory
    extends Serializable {
        public MemoryBufferedSubscriber create(SubscriptionPartition var1, Offset var2);
    }
}

