/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.api.core.ApiFuture;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.protobuf.Descriptors;
import io.grpc.Status;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices;
import org.apache.beam.sdk.io.gcp.bigquery.CreateTableHelpers;
import org.apache.beam.sdk.io.gcp.bigquery.RetryManager;
import org.apache.beam.sdk.io.gcp.bigquery.SplittingIterable;
import org.apache.beam.sdk.io.gcp.bigquery.StorageApiDynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.StorageApiFlushAndFinalizeDoFn;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.bigquery.TwoLevelMessageConverterCache;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.ShardedKey;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StorageApiWritesShardedRecords<DestinationT, ElementT>
extends PTransform<PCollection<KV<ShardedKey<DestinationT>, Iterable<byte[]>>>, PCollection<Void>> {
    private static final Logger LOG = LoggerFactory.getLogger(StorageApiWritesShardedRecords.class);
    private final StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations;
    private final BigQueryIO.Write.CreateDisposition createDisposition;
    private final String kmsKey;
    private final BigQueryServices bqServices;
    private final Coder<DestinationT> destinationCoder;
    private static final ExecutorService closeWriterExecutor = Executors.newCachedThreadPool();
    private static final Cache<String, BigQueryServices.StreamAppendClient> APPEND_CLIENTS = CacheBuilder.newBuilder().expireAfterAccess(5L, TimeUnit.MINUTES).removalListener(removal -> {
        BigQueryServices.StreamAppendClient streamAppendClient = (BigQueryServices.StreamAppendClient)removal.getValue();
        StorageApiWritesShardedRecords.runAsyncIgnoreFailure(closeWriterExecutor, streamAppendClient::close);
    }).build();

    private static void runAsyncIgnoreFailure(ExecutorService executor, ThrowingRunnable task) {
        executor.submit(() -> {
            try {
                task.run();
            }
            catch (Exception exception) {
                // empty catch block
            }
        });
    }

    public StorageApiWritesShardedRecords(StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations, BigQueryIO.Write.CreateDisposition createDisposition, String kmsKey, BigQueryServices bqServices, Coder<DestinationT> destinationCoder) {
        this.dynamicDestinations = dynamicDestinations;
        this.createDisposition = createDisposition;
        this.kmsKey = kmsKey;
        this.bqServices = bqServices;
        this.destinationCoder = destinationCoder;
    }

    public PCollection<Void> expand(PCollection<KV<ShardedKey<DestinationT>, Iterable<byte[]>>> input) {
        SchemaCoder operationCoder;
        String operationName = input.getName() + "/" + this.getName();
        PCollection written = (PCollection)input.apply("Write Records", (PTransform)ParDo.of((DoFn)new WriteRecordsDoFn(operationName)).withSideInputs(this.dynamicDestinations.getSideInputs()));
        try {
            SchemaRegistry schemaRegistry = input.getPipeline().getSchemaRegistry();
            operationCoder = SchemaCoder.of((Schema)schemaRegistry.getSchema(StorageApiFlushAndFinalizeDoFn.Operation.class), (TypeDescriptor)TypeDescriptor.of(StorageApiFlushAndFinalizeDoFn.Operation.class), (SerializableFunction)schemaRegistry.getToRowFunction(StorageApiFlushAndFinalizeDoFn.Operation.class), (SerializableFunction)schemaRegistry.getFromRowFunction(StorageApiFlushAndFinalizeDoFn.Operation.class));
        }
        catch (NoSuchSchemaException e) {
            throw new RuntimeException(e);
        }
        return (PCollection)((PCollection)((PCollection)written.setCoder((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)operationCoder)).apply((PTransform)Window.configure().triggering((Trigger)Repeatedly.forever((Trigger)AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds((long)1L)))).discardingFiredPanes())).apply("maxFlushPosition", (PTransform)Combine.perKey((CombineFnBase.GlobalCombineFn)Max.naturalOrder((Comparable)new StorageApiFlushAndFinalizeDoFn.Operation(-1L, false))))).apply("Flush and finalize writes", (PTransform)ParDo.of((DoFn)new StorageApiFlushAndFinalizeDoFn(this.bqServices)));
    }

    class WriteRecordsDoFn
    extends DoFn<KV<ShardedKey<DestinationT>, Iterable<byte[]>>, KV<String, StorageApiFlushAndFinalizeDoFn.Operation>> {
        private final Counter recordsAppended = Metrics.counter(WriteRecordsDoFn.class, (String)"recordsAppended");
        private final Counter streamsCreated = Metrics.counter(WriteRecordsDoFn.class, (String)"streamsCreated");
        private final Counter appendFailures = Metrics.counter(WriteRecordsDoFn.class, (String)"appendFailures");
        private final Counter appendOffsetFailures = Metrics.counter(WriteRecordsDoFn.class, (String)"appendOffsetFailures");
        private final Counter flushesScheduled = Metrics.counter(WriteRecordsDoFn.class, (String)"flushesScheduled");
        private final Distribution appendLatencyDistribution = Metrics.distribution(WriteRecordsDoFn.class, (String)"appendLatencyDistributionMs");
        private final Distribution appendSizeDistribution = Metrics.distribution(WriteRecordsDoFn.class, (String)"appendSizeDistribution");
        private final Distribution appendSplitDistribution = Metrics.distribution(WriteRecordsDoFn.class, (String)"appendSplitDistribution");
        private TwoLevelMessageConverterCache<DestinationT, ElementT> messageConverters;
        private Map<DestinationT, TableDestination> destinations = Maps.newHashMap();
        @Nullable
        private transient BigQueryServices.DatasetService datasetServiceInternal = null;
        @DoFn.StateId(value="streamName")
        private final StateSpec<ValueState<String>> streamNameSpec = StateSpecs.value();
        @DoFn.StateId(value="streamOffset")
        private final StateSpec<ValueState<Long>> streamOffsetSpec = StateSpecs.value();

        public WriteRecordsDoFn(String operationName) {
            this.messageConverters = new TwoLevelMessageConverterCache(operationName);
        }

        @DoFn.StartBundle
        public void startBundle() throws IOException {
            this.destinations = Maps.newHashMap();
        }

        String getOrCreateStream(String tableId, ValueState<String> streamName, ValueState<Long> streamOffset, BigQueryServices.DatasetService datasetService) throws IOException, InterruptedException {
            String stream = (String)streamName.read();
            if (Strings.isNullOrEmpty((String)stream)) {
                stream = datasetService.createWriteStream(tableId, WriteStream.Type.BUFFERED).getName();
                streamName.write((Object)stream);
                streamOffset.write((Object)0L);
                this.streamsCreated.inc();
            }
            return stream;
        }

        private BigQueryServices.DatasetService getDatasetService(PipelineOptions pipelineOptions) throws IOException {
            if (this.datasetServiceInternal == null) {
                this.datasetServiceInternal = StorageApiWritesShardedRecords.this.bqServices.getDatasetService((BigQueryOptions)pipelineOptions.as(BigQueryOptions.class));
            }
            return this.datasetServiceInternal;
        }

        @DoFn.Teardown
        public void onTeardown() {
            try {
                if (this.datasetServiceInternal != null) {
                    this.datasetServiceInternal.close();
                    this.datasetServiceInternal = null;
                }
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @DoFn.ProcessElement
        public void process(DoFn.ProcessContext c, PipelineOptions pipelineOptions, @DoFn.Element KV<ShardedKey<DestinationT>, Iterable<byte[]>> element, @DoFn.AlwaysFetched @DoFn.StateId(value="streamName") ValueState<String> streamName, @DoFn.AlwaysFetched @DoFn.StateId(value="streamOffset") ValueState<Long> streamOffset, DoFn.OutputReceiver<KV<String, StorageApiFlushAndFinalizeDoFn.Operation>> o) throws Exception {
            class AppendRowsContext
            extends RetryManager.Operation.Context<AppendRowsResponse> {
                final ShardedKey<DestinationT> key;
                String streamName = "";
                BigQueryServices.StreamAppendClient client = null;
                long offset = -1L;
                long numRows = 0L;
                long tryIteration = 0L;

                AppendRowsContext(ShardedKey<DestinationT> key) {
                    this.key = key;
                }

                public String toString() {
                    return "Context: key=" + this.key + " streamName=" + this.streamName + " offset=" + this.offset + " numRows=" + this.numRows + " tryIteration: " + this.tryIteration;
                }
            }
            StorageApiWritesShardedRecords.this.dynamicDestinations.setSideInputAccessorFromProcessContext(c);
            TableDestination tableDestination = this.destinations.computeIfAbsent(((ShardedKey)element.getKey()).getKey(), dest -> {
                TableDestination tableDestination1 = StorageApiWritesShardedRecords.this.dynamicDestinations.getTable(dest);
                Preconditions.checkArgument((tableDestination1 != null ? 1 : 0) != 0, (String)"DynamicDestinations.getTable() may not return null, but %s returned null for destination %s", (Object)StorageApiWritesShardedRecords.this.dynamicDestinations, (Object)dest);
                Supplier schemaSupplier = () -> StorageApiWritesShardedRecords.this.dynamicDestinations.getSchema(dest);
                return CreateTableHelpers.possiblyCreateTable(c, tableDestination1, (Supplier<TableSchema>)schemaSupplier, StorageApiWritesShardedRecords.this.createDisposition, StorageApiWritesShardedRecords.this.destinationCoder, StorageApiWritesShardedRecords.this.kmsKey, StorageApiWritesShardedRecords.this.bqServices);
            });
            String tableId = tableDestination.getTableUrn();
            BigQueryServices.DatasetService datasetService = this.getDatasetService(pipelineOptions);
            StorageApiDynamicDestinations.MessageConverter messageConverter = this.messageConverters.get(((ShardedKey)element.getKey()).getKey(), StorageApiWritesShardedRecords.this.dynamicDestinations, datasetService);
            Descriptors.Descriptor descriptor = messageConverter.getSchemaDescriptor();
            long oneMb = 0x100000L;
            SplittingIterable messages = new SplittingIterable((Iterable)element.getValue(), 0x100000L);
            BiConsumer<Iterable, Boolean> initializeContexts = (contexts, isFailure) -> {
                try {
                    if (isFailure.booleanValue()) {
                        streamName.write((Object)"");
                    }
                    String stream = this.getOrCreateStream(tableId, streamName, streamOffset, datasetService);
                    BigQueryServices.StreamAppendClient appendClient = (BigQueryServices.StreamAppendClient)APPEND_CLIENTS.get((Object)stream, () -> datasetService.getStreamAppendClient(stream, descriptor));
                    for (AppendRowsContext context : contexts) {
                        context.streamName = stream;
                        appendClient.pin();
                        context.client = appendClient;
                        context.offset = (Long)streamOffset.read();
                        ++context.tryIteration;
                        streamOffset.write((Object)(context.offset + context.numRows));
                    }
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            };
            Consumer<Iterable> clearClients = contexts -> {
                APPEND_CLIENTS.invalidate(streamName.read());
                for (AppendRowsContext context : contexts) {
                    if (context.client == null) continue;
                    StorageApiWritesShardedRecords.runAsyncIgnoreFailure(closeWriterExecutor, context.client::unpin);
                    context.client = null;
                }
            };
            Instant now = Instant.now();
            ArrayList contexts2 = Lists.newArrayList();
            RetryManager retryManager = new RetryManager(Duration.standardSeconds((long)1L), Duration.standardSeconds((long)10L), 1000);
            int numSplits = 0;
            for (ProtoRows protoRows : messages) {
                ++numSplits;
                Function<AppendRowsContext, ApiFuture> run = context -> {
                    try {
                        BigQueryServices.StreamAppendClient appendClient = (BigQueryServices.StreamAppendClient)APPEND_CLIENTS.get((Object)context.streamName, () -> datasetService.getStreamAppendClient(context.streamName, descriptor));
                        return appendClient.appendRows(context.offset, protoRows);
                    }
                    catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                };
                Function onError = failedContexts -> {
                    AppendRowsContext failedContext = (AppendRowsContext)Preconditions.checkNotNull((Object)((AppendRowsContext)Iterables.getFirst((Iterable)failedContexts, null)));
                    Status.Code statusCode = Status.fromThrowable((Throwable)failedContext.getError()).getCode();
                    LOG.error("Got error " + failedContext.getError() + " closing " + failedContext.streamName);
                    clearClients.accept(contexts2);
                    this.appendFailures.inc();
                    if (statusCode.equals((Object)Status.Code.OUT_OF_RANGE) || statusCode.equals((Object)Status.Code.ALREADY_EXISTS)) {
                        this.appendOffsetFailures.inc();
                        LOG.warn("Append to " + failedContext + " failed with " + failedContext.getError() + " Will retry with a new stream");
                        o.output((Object)KV.of((Object)failedContext.streamName, (Object)new StorageApiFlushAndFinalizeDoFn.Operation(failedContext.offset - 1L, true)));
                        initializeContexts.accept((Iterable)failedContexts, true);
                        return RetryManager.RetryType.RETRY_ALL_OPERATIONS;
                    }
                    return RetryManager.RetryType.RETRY_ALL_OPERATIONS;
                };
                Consumer<AppendRowsContext> onSuccess = context -> {
                    o.output((Object)KV.of((Object)context.streamName, (Object)new StorageApiFlushAndFinalizeDoFn.Operation(context.offset + context.numRows - 1L, false)));
                    this.flushesScheduled.inc((long)protoRows.getSerializedRowsCount());
                };
                AppendRowsContext context2 = new AppendRowsContext((ShardedKey)element.getKey());
                context2.numRows = protoRows.getSerializedRowsCount();
                contexts2.add(context2);
                retryManager.addOperation(run, onError, onSuccess, context2);
                this.recordsAppended.inc((long)protoRows.getSerializedRowsCount());
                this.appendSizeDistribution.update(context2.numRows);
            }
            initializeContexts.accept(contexts2, false);
            try {
                retryManager.run(true);
            }
            catch (Throwable throwable) {
                for (AppendRowsContext context3 : contexts2) {
                    if (context3.client == null) continue;
                    StorageApiWritesShardedRecords.runAsyncIgnoreFailure(closeWriterExecutor, context3.client::unpin);
                }
                throw throwable;
            }
            for (AppendRowsContext context4 : contexts2) {
                if (context4.client == null) continue;
                StorageApiWritesShardedRecords.runAsyncIgnoreFailure(closeWriterExecutor, context4.client::unpin);
            }
            this.appendSplitDistribution.update((long)numSplits);
            java.time.Duration timeElapsed = java.time.Duration.between(now, Instant.now());
            this.appendLatencyDistribution.update(timeElapsed.toMillis());
        }

        @DoFn.OnWindowExpiration
        public void onWindowExpiration(@DoFn.AlwaysFetched @DoFn.StateId(value="streamName") ValueState<String> streamName, @DoFn.AlwaysFetched @DoFn.StateId(value="streamOffset") ValueState<Long> streamOffset, DoFn.OutputReceiver<KV<String, StorageApiFlushAndFinalizeDoFn.Operation>> o) {
            String stream = (String)MoreObjects.firstNonNull((Object)((String)streamName.read()), null);
            if (!Strings.isNullOrEmpty((String)stream)) {
                long nextOffset = (Long)MoreObjects.firstNonNull((Object)((Long)streamOffset.read()), (Object)0L);
                o.output((Object)KV.of((Object)stream, (Object)new StorageApiFlushAndFinalizeDoFn.Operation(nextOffset - 1L, true)));
                APPEND_CLIENTS.invalidate((Object)stream);
            }
        }
    }

    private static interface ThrowingRunnable {
        public void run() throws Exception;
    }
}

