/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.api.client.googleapis.json.GoogleJsonError;
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
import com.google.api.client.googleapis.services.GoogleClientRequestInitializer;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.util.BackOff;
import com.google.api.client.util.BackOffUtils;
import com.google.api.client.util.ExponentialBackOff;
import com.google.api.client.util.Sleeper;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.api.gax.rpc.ServerStream;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.model.Dataset;
import com.google.api.services.bigquery.model.DatasetReference;
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobConfiguration;
import com.google.api.services.bigquery.model.JobConfigurationExtract;
import com.google.api.services.bigquery.model.JobConfigurationLoad;
import com.google.api.services.bigquery.model.JobConfigurationQuery;
import com.google.api.services.bigquery.model.JobConfigurationTableCopy;
import com.google.api.services.bigquery.model.JobReference;
import com.google.api.services.bigquery.model.JobStatistics;
import com.google.api.services.bigquery.model.JobStatus;
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableDataInsertAllRequest;
import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
import com.google.api.services.bigquery.model.TableDataList;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.auth.Credentials;
import com.google.auth.http.HttpCredentialsAdapter;
import com.google.cloud.bigquery.storage.v1.BigQueryReadClient;
import com.google.cloud.bigquery.storage.v1.BigQueryReadSettings;
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
import com.google.cloud.bigquery.storage.v1.ReadSession;
import com.google.cloud.bigquery.storage.v1.SplitReadStreamRequest;
import com.google.cloud.bigquery.storage.v1.SplitReadStreamResponse;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.extensions.gcp.util.BackOffAdapter;
import org.apache.beam.sdk.extensions.gcp.util.CustomHttpErrors;
import org.apache.beam.sdk.extensions.gcp.util.LatencyRecordingHttpRequestInitializer;
import org.apache.beam.sdk.extensions.gcp.util.RetryHttpRequestInitializer;
import org.apache.beam.sdk.extensions.gcp.util.Transport;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIOMetadata;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices;
import org.apache.beam.sdk.io.gcp.bigquery.ErrorContainer;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.Histogram;
import org.apache.beam.sdk.util.ReleaseInfo;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class BigQueryServicesImpl
implements BigQueryServices {
    private static final Logger LOG = LoggerFactory.getLogger(BigQueryServicesImpl.class);
    private static final Duration POLLING_LOG_GAP = Duration.standardMinutes((long)10L);
    private static final int MAX_RPC_RETRIES = 9;
    private static final Duration INITIAL_RPC_BACKOFF = Duration.standardSeconds((long)1L);
    private static final Duration INITIAL_JOB_STATUS_POLL_BACKOFF = Duration.standardSeconds((long)1L);
    private static final FluentBackoff DEFAULT_BACKOFF_FACTORY = FluentBackoff.DEFAULT.withMaxRetries(9).withInitialBackoff(INITIAL_RPC_BACKOFF);
    private static final String QUOTA_EXCEEDED = "quotaExceeded";
    static final SerializableFunction<IOException, Boolean> DONT_RETRY_NOT_FOUND = (SerializableFunction & Serializable)input -> {
        ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
        return !errorExtractor.itemNotFound(input);
    };
    static final SerializableFunction<IOException, Boolean> ALWAYS_RETRY = (SerializableFunction & Serializable)input -> true;

    BigQueryServicesImpl() {
    }

    @Override
    public BigQueryServices.JobService getJobService(BigQueryOptions options) {
        return new JobServiceImpl(options);
    }

    @Override
    public BigQueryServices.DatasetService getDatasetService(BigQueryOptions options) {
        return new DatasetServiceImpl(options, null);
    }

    @Override
    public BigQueryServices.DatasetService getDatasetService(BigQueryOptions options, Histogram requestLatencies) {
        return new DatasetServiceImpl(options, requestLatencies);
    }

    @Override
    public BigQueryServices.StorageClient getStorageClient(BigQueryOptions options) throws IOException {
        return new StorageClientImpl(options);
    }

    private static BackOff createDefaultBackoff() {
        return BackOffAdapter.toGcpBackOff((org.apache.beam.sdk.util.BackOff)DEFAULT_BACKOFF_FACTORY.backoff());
    }

    @VisibleForTesting
    static <T> T executeWithRetries(AbstractGoogleClientRequest<T> request, String errorMessage, Sleeper sleeper, BackOff backoff, SerializableFunction<IOException, Boolean> shouldRetry) throws IOException, InterruptedException {
        IOException lastException = null;
        while (true) {
            try {
                return (T)request.execute();
            }
            catch (IOException e) {
                lastException = e;
                if (((Boolean)shouldRetry.apply((Object)e)).booleanValue()) {
                    LOG.info("Ignore the error and retry the request.", (Throwable)e);
                    if (BigQueryServicesImpl.nextBackOff(sleeper, backoff)) continue;
                }
                throw new IOException(errorMessage, lastException);
            }
            break;
        }
    }

    private static boolean nextBackOff(Sleeper sleeper, BackOff backoff) throws InterruptedException {
        try {
            return BackOffUtils.next((Sleeper)sleeper, (BackOff)backoff);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private static Bigquery.Builder newBigQueryClient(BigQueryOptions options, @Nullable Histogram requestLatencies) {
        RetryHttpRequestInitializer httpRequestInitializer = new RetryHttpRequestInitializer((Collection)ImmutableList.of((Object)404));
        httpRequestInitializer.setCustomErrors(BigQueryServicesImpl.createBigQueryClientCustomErrors());
        httpRequestInitializer.setWriteTimeout(options.getHTTPWriteTimeout().intValue());
        ImmutableList.Builder initBuilder = ImmutableList.builder();
        Credentials credential = options.getGcpCredential();
        initBuilder.add(credential == null ? new NullCredentialInitializer() : new HttpCredentialsAdapter(credential));
        initBuilder.add((Object)httpRequestInitializer);
        if (requestLatencies != null) {
            initBuilder.add((Object)new LatencyRecordingHttpRequestInitializer(requestLatencies));
        }
        ChainingHttpRequestInitializer chainInitializer = new ChainingHttpRequestInitializer((HttpRequestInitializer[])Iterables.toArray((Iterable)initBuilder.build(), HttpRequestInitializer.class));
        return new Bigquery.Builder(Transport.getTransport(), Transport.getJsonFactory(), (HttpRequestInitializer)chainInitializer).setApplicationName(options.getAppName()).setGoogleClientRequestInitializer((GoogleClientRequestInitializer)options.getGoogleApiTrace());
    }

    public static CustomHttpErrors createBigQueryClientCustomErrors() {
        CustomHttpErrors.Builder builder = new CustomHttpErrors.Builder();
        builder.addErrorForCodeAndUrlContains(403, "/tables?", "The GCP project is most likely exceeding the rate limit on bigquery.tables.list, please find the instructions to increase this limit at: https://cloud.google.com/service-infrastructure/docs/rate-limiting#configure");
        return builder.build();
    }

    private static class BoundedExecutorService
    implements ExecutorService {
        private final ExecutorService executor;
        private final Semaphore semaphore;
        private final int parallelism;

        BoundedExecutorService(ExecutorService executor, int parallelism) {
            this.executor = executor;
            this.parallelism = parallelism;
            this.semaphore = new Semaphore(parallelism);
        }

        @Override
        public void shutdown() {
            this.executor.shutdown();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public List<Runnable> shutdownNow() {
            List<Runnable> runnables = this.executor.shutdownNow();
            BoundedExecutorService boundedExecutorService = this;
            synchronized (boundedExecutorService) {
                if (this.semaphore.availablePermits() <= this.parallelism) {
                    this.semaphore.release(Integer.MAX_VALUE - this.parallelism);
                }
            }
            return runnables;
        }

        @Override
        public boolean isShutdown() {
            return this.executor.isShutdown();
        }

        @Override
        public boolean isTerminated() {
            return this.executor.isTerminated();
        }

        @Override
        public boolean awaitTermination(long l, TimeUnit timeUnit) throws InterruptedException {
            return this.executor.awaitTermination(l, timeUnit);
        }

        @Override
        public <T> Future<T> submit(Callable<T> callable) {
            return this.executor.submit(new SemaphoreCallable<T>(callable));
        }

        @Override
        public <T> Future<T> submit(Runnable runnable, T t) {
            return this.executor.submit(new SemaphoreRunnable(runnable), t);
        }

        @Override
        public Future<?> submit(Runnable runnable) {
            return this.executor.submit(new SemaphoreRunnable(runnable));
        }

        @Override
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> collection) throws InterruptedException {
            return this.executor.invokeAll(collection.stream().map(x$0 -> new SemaphoreCallable(x$0)).collect(Collectors.toList()));
        }

        @Override
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> collection, long l, TimeUnit timeUnit) throws InterruptedException {
            return this.executor.invokeAll(collection.stream().map(x$0 -> new SemaphoreCallable(x$0)).collect(Collectors.toList()), l, timeUnit);
        }

        @Override
        public <T> T invokeAny(Collection<? extends Callable<T>> collection) throws InterruptedException, ExecutionException {
            return this.executor.invokeAny(collection.stream().map(x$0 -> new SemaphoreCallable(x$0)).collect(Collectors.toList()));
        }

        @Override
        public <T> T invokeAny(Collection<? extends Callable<T>> collection, long l, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
            return this.executor.invokeAny(collection.stream().map(x$0 -> new SemaphoreCallable(x$0)).collect(Collectors.toList()), l, timeUnit);
        }

        @Override
        public void execute(Runnable runnable) {
            this.executor.execute(new SemaphoreRunnable(runnable));
        }

        private class SemaphoreCallable<V>
        implements Callable<V> {
            private final Callable<V> callable;

            SemaphoreCallable(Callable<V> callable) {
                this.callable = callable;
            }

            @Override
            public V call() throws Exception {
                BoundedExecutorService.this.semaphore.acquire();
                try {
                    V v = this.callable.call();
                    return v;
                }
                finally {
                    BoundedExecutorService.this.semaphore.release();
                }
            }
        }

        private class SemaphoreRunnable
        implements Runnable {
            private final Runnable runnable;

            SemaphoreRunnable(Runnable runnable) {
                this.runnable = runnable;
            }

            @Override
            public void run() {
                try {
                    BoundedExecutorService.this.semaphore.acquire();
                }
                catch (InterruptedException e) {
                    throw new RuntimeException("semaphore acquisition interrupted. task canceled.");
                }
                try {
                    this.runnable.run();
                }
                finally {
                    BoundedExecutorService.this.semaphore.release();
                }
            }
        }
    }

    static class StorageClientImpl
    implements BigQueryServices.StorageClient {
        private static final HeaderProvider USER_AGENT_HEADER_PROVIDER = FixedHeaderProvider.create((String[])new String[]{"user-agent", "Apache_Beam_Java/" + ReleaseInfo.getReleaseInfo().getVersion()});
        private final BigQueryReadClient client;

        private StorageClientImpl(BigQueryOptions options) throws IOException {
            BigQueryReadSettings settings = ((BigQueryReadSettings.Builder)((BigQueryReadSettings.Builder)BigQueryReadSettings.newBuilder().setCredentialsProvider((CredentialsProvider)FixedCredentialsProvider.create((Credentials)options.getGcpCredential()))).setTransportChannelProvider((TransportChannelProvider)BigQueryReadSettings.defaultGrpcTransportProviderBuilder().setHeaderProvider(USER_AGENT_HEADER_PROVIDER).build())).build();
            this.client = BigQueryReadClient.create((BigQueryReadSettings)settings);
        }

        @Override
        public ReadSession createReadSession(CreateReadSessionRequest request) {
            return this.client.createReadSession(request);
        }

        @Override
        public BigQueryServices.BigQueryServerStream<ReadRowsResponse> readRows(ReadRowsRequest request) {
            return new BigQueryServerStreamImpl<ReadRowsResponse>(this.client.readRowsCallable().call((Object)request));
        }

        @Override
        public SplitReadStreamResponse splitReadStream(SplitReadStreamRequest request) {
            return this.client.splitReadStream(request);
        }

        @Override
        public void close() {
            this.client.close();
        }
    }

    static class BigQueryServerStreamImpl<T>
    implements BigQueryServices.BigQueryServerStream<T> {
        private final ServerStream<T> serverStream;

        public BigQueryServerStreamImpl(ServerStream<T> serverStream) {
            this.serverStream = serverStream;
        }

        @Override
        public Iterator<T> iterator() {
            return this.serverStream.iterator();
        }

        @Override
        public void cancel() {
            this.serverStream.cancel();
        }
    }

    @VisibleForTesting
    static class DatasetServiceImpl
    implements BigQueryServices.DatasetService {
        private static final FluentBackoff INSERT_BACKOFF_FACTORY = FluentBackoff.DEFAULT.withInitialBackoff(Duration.millis((long)200L)).withMaxRetries(5);
        private static final FluentBackoff RATE_LIMIT_BACKOFF_FACTORY = FluentBackoff.DEFAULT.withInitialBackoff(Duration.standardSeconds((long)1L)).withMaxRetries(13);
        private final ApiErrorExtractor errorExtractor;
        private final Bigquery client;
        private final PipelineOptions options;
        private final long maxRowsPerBatch;
        private final long maxRowBatchSize;
        private final Counter throttlingMsecs = Metrics.counter(DatasetServiceImpl.class, (String)"throttling-msecs");
        private ExecutorService executor;
        private static final int RETRY_CREATE_TABLE_DURATION_MILLIS = (int)TimeUnit.MINUTES.toMillis(5L);

        @VisibleForTesting
        DatasetServiceImpl(Bigquery client, PipelineOptions options) {
            BigQueryOptions bqOptions = (BigQueryOptions)options.as(BigQueryOptions.class);
            this.errorExtractor = new ApiErrorExtractor();
            this.client = client;
            this.options = options;
            this.maxRowsPerBatch = bqOptions.getMaxStreamingRowsToBatch();
            this.maxRowBatchSize = bqOptions.getMaxStreamingBatchSize();
            this.executor = null;
        }

        @VisibleForTesting
        DatasetServiceImpl(Bigquery client, PipelineOptions options, long maxRowsPerBatch) {
            BigQueryOptions bqOptions = (BigQueryOptions)options.as(BigQueryOptions.class);
            this.errorExtractor = new ApiErrorExtractor();
            this.client = client;
            this.options = options;
            this.maxRowsPerBatch = maxRowsPerBatch;
            this.maxRowBatchSize = bqOptions.getMaxStreamingBatchSize();
            this.executor = null;
        }

        private DatasetServiceImpl(BigQueryOptions bqOptions, @Nullable Histogram requestLatencies) {
            this.errorExtractor = new ApiErrorExtractor();
            this.client = BigQueryServicesImpl.newBigQueryClient(bqOptions, requestLatencies).build();
            this.options = bqOptions;
            this.maxRowsPerBatch = bqOptions.getMaxStreamingRowsToBatch();
            this.maxRowBatchSize = bqOptions.getMaxStreamingBatchSize();
            this.executor = null;
        }

        @Override
        public @Nullable Table getTable(TableReference tableRef) throws IOException, InterruptedException {
            return this.getTable(tableRef, null);
        }

        @Override
        public @Nullable Table getTable(TableReference tableRef, List<String> selectedFields) throws IOException, InterruptedException {
            return this.getTable(tableRef, selectedFields, BigQueryServicesImpl.createDefaultBackoff(), Sleeper.DEFAULT);
        }

        @VisibleForTesting
        @Nullable Table getTable(TableReference ref, @Nullable List<String> selectedFields, BackOff backoff, Sleeper sleeper) throws IOException, InterruptedException {
            Bigquery.Tables.Get get = this.client.tables().get(ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
            if (selectedFields != null && !selectedFields.isEmpty()) {
                get.setSelectedFields(String.join((CharSequence)",", selectedFields));
            }
            try {
                return (Table)BigQueryServicesImpl.executeWithRetries(get, String.format("Unable to get table: %s, aborting after %d retries.", ref.getTableId(), 9), sleeper, backoff, DONT_RETRY_NOT_FOUND);
            }
            catch (IOException e) {
                if (this.errorExtractor.itemNotFound(e)) {
                    return null;
                }
                throw e;
            }
        }

        @Override
        public void createTable(Table table) throws InterruptedException, IOException {
            LOG.info("Trying to create BigQuery table: {}", (Object)BigQueryHelpers.toTableSpec(table.getTableReference()));
            ExponentialBackOff backoff = new ExponentialBackOff.Builder().setMaxElapsedTimeMillis(RETRY_CREATE_TABLE_DURATION_MILLIS).build();
            this.tryCreateTable(table, (BackOff)backoff, Sleeper.DEFAULT);
        }

        @VisibleForTesting
        @Nullable Table tryCreateTable(Table table, BackOff backoff, Sleeper sleeper) throws IOException {
            boolean retry = false;
            while (true) {
                try {
                    return (Table)this.client.tables().insert(table.getTableReference().getProjectId(), table.getTableReference().getDatasetId(), table).execute();
                }
                catch (IOException e) {
                    block8: {
                        ApiErrorExtractor extractor = new ApiErrorExtractor();
                        if (extractor.itemAlreadyExists(e)) {
                            return null;
                        }
                        if (extractor.rateLimited(e)) {
                            try {
                                if (BackOffUtils.next((Sleeper)sleeper, (BackOff)backoff)) {
                                    if (retry) continue;
                                    LOG.info("Quota limit reached when creating table {}:{}.{}, retrying up to {} minutes", new Object[]{table.getTableReference().getProjectId(), table.getTableReference().getDatasetId(), table.getTableReference().getTableId(), (double)TimeUnit.MILLISECONDS.toSeconds(RETRY_CREATE_TABLE_DURATION_MILLIS) / 60.0});
                                    retry = true;
                                    continue;
                                }
                                break block8;
                            }
                            catch (InterruptedException e1) {
                                Thread.currentThread().interrupt();
                                throw e;
                            }
                            continue;
                        }
                    }
                    throw e;
                }
                break;
            }
        }

        @Override
        public void deleteTable(TableReference tableRef) throws IOException, InterruptedException {
            BigQueryServicesImpl.executeWithRetries(this.client.tables().delete(tableRef.getProjectId(), tableRef.getDatasetId(), tableRef.getTableId()), String.format("Unable to delete table: %s, aborting after %d retries.", tableRef.getTableId(), 9), Sleeper.DEFAULT, BigQueryServicesImpl.createDefaultBackoff(), ALWAYS_RETRY);
        }

        @Override
        public boolean isTableEmpty(TableReference tableRef) throws IOException, InterruptedException {
            return this.isTableEmpty(tableRef, BigQueryServicesImpl.createDefaultBackoff(), Sleeper.DEFAULT);
        }

        @VisibleForTesting
        boolean isTableEmpty(TableReference tableRef, BackOff backoff, Sleeper sleeper) throws IOException, InterruptedException {
            TableDataList dataList = (TableDataList)BigQueryServicesImpl.executeWithRetries(this.client.tabledata().list(tableRef.getProjectId(), tableRef.getDatasetId(), tableRef.getTableId()), String.format("Unable to list table data: %s, aborting after %d retries.", tableRef.getTableId(), 9), sleeper, backoff, DONT_RETRY_NOT_FOUND);
            return dataList.getRows() == null || dataList.getRows().isEmpty();
        }

        @Override
        public Dataset getDataset(String projectId, String datasetId) throws IOException, InterruptedException {
            return (Dataset)BigQueryServicesImpl.executeWithRetries(this.client.datasets().get(projectId, datasetId), String.format("Unable to get dataset: %s, aborting after %d retries.", datasetId, 9), Sleeper.DEFAULT, BigQueryServicesImpl.createDefaultBackoff(), DONT_RETRY_NOT_FOUND);
        }

        @Override
        public void createDataset(String projectId, String datasetId, @Nullable String location, @Nullable String description, @Nullable Long defaultTableExpirationMs) throws IOException, InterruptedException {
            this.createDataset(projectId, datasetId, location, description, defaultTableExpirationMs, Sleeper.DEFAULT, BigQueryServicesImpl.createDefaultBackoff());
        }

        private void createDataset(String projectId, String datasetId, @Nullable String location, @Nullable String description, @Nullable Long defaultTableExpirationMs, Sleeper sleeper, BackOff backoff) throws IOException, InterruptedException {
            Throwable lastException;
            DatasetReference datasetRef = new DatasetReference().setProjectId(projectId).setDatasetId(datasetId);
            Dataset dataset = new Dataset().setDatasetReference(datasetRef);
            if (location != null) {
                dataset.setLocation(location);
            }
            if (description != null) {
                dataset.setFriendlyName(description);
                dataset.setDescription(description);
            }
            if (defaultTableExpirationMs != null) {
                dataset.setDefaultTableExpirationMs(defaultTableExpirationMs);
            }
            do {
                try {
                    this.client.datasets().insert(projectId, dataset).execute();
                    return;
                }
                catch (GoogleJsonResponseException e) {
                    if (this.errorExtractor.itemAlreadyExists((IOException)((Object)e))) {
                        return;
                    }
                    LOG.info("Ignore the error and retry creating the dataset.", (Throwable)e);
                    lastException = e;
                }
                catch (IOException e) {
                    LOG.info("Ignore the error and retry creating the dataset.", (Throwable)e);
                    lastException = e;
                }
            } while (BigQueryServicesImpl.nextBackOff(sleeper, backoff));
            throw new IOException(String.format("Unable to create dataset: %s, aborting after %d .", datasetId, 9), lastException);
        }

        @Override
        public void deleteDataset(String projectId, String datasetId) throws IOException, InterruptedException {
            BigQueryServicesImpl.executeWithRetries(this.client.datasets().delete(projectId, datasetId), String.format("Unable to delete table: %s, aborting after %d retries.", datasetId, 9), Sleeper.DEFAULT, BigQueryServicesImpl.createDefaultBackoff(), ALWAYS_RETRY);
        }

        @VisibleForTesting
        <T> long insertAll(TableReference ref, List<ValueInSingleWindow<TableRow>> rowList, @Nullable List<String> insertIdList, BackOff backoff, FluentBackoff rateLimitBackoffFactory, Sleeper sleeper, InsertRetryPolicy retryPolicy, List<ValueInSingleWindow<T>> failedInserts, ErrorContainer<T> errorContainer, boolean skipInvalidRows, boolean ignoreUnkownValues, boolean ignoreInsertIds) throws IOException, InterruptedException {
            Preconditions.checkNotNull((Object)ref, (Object)"ref");
            if (this.executor == null) {
                this.executor = new BoundedExecutorService(((GcsOptions)this.options.as(GcsOptions.class)).getExecutorService(), ((BigQueryOptions)this.options.as(BigQueryOptions.class)).getInsertBundleParallelism());
            }
            if (insertIdList != null && rowList.size() != insertIdList.size()) {
                throw new AssertionError((Object)"If insertIdList is not null it needs to have at least as many elements as rowList");
            }
            long retTotalDataSize = 0L;
            ArrayList<TableDataInsertAllResponse.InsertErrors> allErrors = new ArrayList<TableDataInsertAllResponse.InsertErrors>();
            List<ValueInSingleWindow<TableRow>> rowsToPublish = rowList;
            List<String> idsToPublish = null;
            if (!ignoreInsertIds) {
                idsToPublish = insertIdList;
            }
            while (true) {
                long nextBackoffMillis;
                int i;
                ArrayList<ValueInSingleWindow<TableRow>> retryRows = new ArrayList<ValueInSingleWindow<TableRow>>();
                ArrayList<String> retryIds = idsToPublish != null ? new ArrayList<String>() : null;
                int strideIndex = 0;
                ArrayList<Object> rows = new ArrayList<Object>();
                long dataSize = 0L;
                ArrayList<Future<List>> futures = new ArrayList<Future<List>>();
                ArrayList<Integer> strideIndices = new ArrayList<Integer>();
                AtomicLong maxThrottlingMsec = new AtomicLong();
                for (i = 0; i < rowsToPublish.size(); ++i) {
                    TableRow row = (TableRow)rowsToPublish.get(i).getValue();
                    TableDataInsertAllRequest.Rows out = new TableDataInsertAllRequest.Rows();
                    if (idsToPublish != null) {
                        out.setInsertId(idsToPublish.get(i));
                    }
                    out.setJson(row.getUnknownKeys());
                    rows.add(out);
                    try {
                    }
                    catch (Exception ex) {
                        throw new RuntimeException("Failed to convert the row to JSON", ex);
                    }
                    if ((dataSize += TableRowJsonCoder.of().getEncodedElementByteSize(row)) < this.maxRowBatchSize && (long)rows.size() < this.maxRowsPerBatch && i != rowsToPublish.size() - 1) continue;
                    TableDataInsertAllRequest content = new TableDataInsertAllRequest();
                    content.setRows(rows);
                    content.setSkipInvalidRows(Boolean.valueOf(skipInvalidRows));
                    content.setIgnoreUnknownValues(Boolean.valueOf(ignoreUnkownValues));
                    Bigquery.Tabledata.InsertAll insert = this.client.tabledata().insertAll(ref.getProjectId(), ref.getDatasetId(), ref.getTableId(), content);
                    futures.add(this.executor.submit(() -> {
                        BackOff backoff1 = BackOffAdapter.toGcpBackOff((org.apache.beam.sdk.util.BackOff)rateLimitBackoffFactory.backoff());
                        long totalBackoffMillis = 0L;
                        while (true) {
                            try {
                                return ((TableDataInsertAllResponse)insert.execute()).getInsertErrors();
                            }
                            catch (IOException e) {
                                GoogleJsonError.ErrorInfo errorInfo = this.getErrorInfo(e);
                                if (errorInfo == null) {
                                    throw e;
                                }
                                if (!ApiErrorExtractor.INSTANCE.rateLimited(e) && !errorInfo.getReason().equals(BigQueryServicesImpl.QUOTA_EXCEEDED)) {
                                    throw e;
                                }
                                LOG.info(String.format("BigQuery insertAll error, retrying: %s", ApiErrorExtractor.INSTANCE.getErrorMessage(e)));
                                try {
                                    long nextBackOffMillis = backoff1.nextBackOffMillis();
                                    if (nextBackOffMillis == -1L) {
                                        throw e;
                                    }
                                    sleeper.sleep(nextBackOffMillis);
                                    long totalBackoffMillisSoFar = totalBackoffMillis += nextBackOffMillis;
                                    maxThrottlingMsec.getAndUpdate(current -> Math.max(current, totalBackoffMillisSoFar));
                                }
                                catch (InterruptedException interrupted) {
                                    throw new IOException("Interrupted while waiting before retrying insertAll");
                                }
                            }
                        }
                    }));
                    strideIndices.add(strideIndex);
                    retTotalDataSize += dataSize;
                    dataSize = 0L;
                    strideIndex = i + 1;
                    rows = new ArrayList();
                }
                try {
                    for (i = 0; i < futures.size(); ++i) {
                        List errors = (List)((Future)futures.get(i)).get();
                        if (errors == null) continue;
                        for (TableDataInsertAllResponse.InsertErrors error : errors) {
                            if (error.getIndex() == null) {
                                throw new IOException("Insert failed: " + error + ", other errors: " + allErrors);
                            }
                            int errorIndex = error.getIndex().intValue() + (Integer)strideIndices.get(i);
                            if (retryPolicy.shouldRetry(new InsertRetryPolicy.Context(error))) {
                                allErrors.add(error);
                                retryRows.add(rowsToPublish.get(errorIndex));
                                if (retryIds == null) continue;
                                retryIds.add(idsToPublish.get(errorIndex));
                                continue;
                            }
                            errorContainer.add(failedInserts, error, ref, rowsToPublish.get(errorIndex));
                        }
                    }
                    this.throttlingMsecs.inc(maxThrottlingMsec.get());
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new IOException("Interrupted while inserting " + rowsToPublish);
                }
                catch (ExecutionException e) {
                    throw new RuntimeException(e.getCause());
                }
                if (allErrors.isEmpty() || (nextBackoffMillis = backoff.nextBackOffMillis()) == -1L) break;
                try {
                    sleeper.sleep(nextBackoffMillis);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new IOException("Interrupted while waiting before retrying insert of " + retryRows);
                }
                rowsToPublish = retryRows;
                idsToPublish = retryIds;
                allErrors.clear();
                LOG.info("Retrying {} failed inserts to BigQuery", (Object)rowsToPublish.size());
            }
            if (!allErrors.isEmpty()) {
                throw new IOException("Insert failed: " + allErrors);
            }
            return retTotalDataSize;
        }

        @Override
        public <T> long insertAll(TableReference ref, List<ValueInSingleWindow<TableRow>> rowList, @Nullable List<String> insertIdList, InsertRetryPolicy retryPolicy, List<ValueInSingleWindow<T>> failedInserts, ErrorContainer<T> errorContainer, boolean skipInvalidRows, boolean ignoreUnknownValues, boolean ignoreInsertIds) throws IOException, InterruptedException {
            return this.insertAll(ref, rowList, insertIdList, BackOffAdapter.toGcpBackOff((org.apache.beam.sdk.util.BackOff)INSERT_BACKOFF_FACTORY.backoff()), RATE_LIMIT_BACKOFF_FACTORY, Sleeper.DEFAULT, retryPolicy, failedInserts, errorContainer, skipInvalidRows, ignoreUnknownValues, ignoreInsertIds);
        }

        protected GoogleJsonError.ErrorInfo getErrorInfo(IOException e) {
            if (!(e instanceof GoogleJsonResponseException)) {
                return null;
            }
            GoogleJsonError jsonError = ((GoogleJsonResponseException)((Object)e)).getDetails();
            GoogleJsonError.ErrorInfo errorInfo = (GoogleJsonError.ErrorInfo)Iterables.getFirst((Iterable)jsonError.getErrors(), null);
            return errorInfo;
        }

        @Override
        public Table patchTableDescription(TableReference tableReference, @Nullable String tableDescription) throws IOException, InterruptedException {
            Table table = new Table();
            table.setDescription(tableDescription);
            return (Table)BigQueryServicesImpl.executeWithRetries(this.client.tables().patch(tableReference.getProjectId(), tableReference.getDatasetId(), tableReference.getTableId(), table), String.format("Unable to patch table description: %s, aborting after %d retries.", tableReference, 9), Sleeper.DEFAULT, BigQueryServicesImpl.createDefaultBackoff(), ALWAYS_RETRY);
        }
    }

    @VisibleForTesting
    static class JobServiceImpl
    implements BigQueryServices.JobService {
        private final ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
        private final Bigquery client;
        private final BigQueryIOMetadata bqIOMetadata;

        @VisibleForTesting
        JobServiceImpl(Bigquery client) {
            this.client = client;
            this.bqIOMetadata = BigQueryIOMetadata.create();
        }

        private JobServiceImpl(BigQueryOptions options) {
            this.client = BigQueryServicesImpl.newBigQueryClient(options, null).build();
            this.bqIOMetadata = BigQueryIOMetadata.create();
        }

        @Override
        public void startLoadJob(JobReference jobRef, JobConfigurationLoad loadConfig) throws InterruptedException, IOException {
            HashMap<String, String> labelMap = new HashMap<String, String>();
            Job job = new Job().setJobReference(jobRef).setConfiguration(new JobConfiguration().setLoad(loadConfig).setLabels(this.bqIOMetadata.addAdditionalJobLabels(labelMap)));
            JobServiceImpl.startJob(job, this.errorExtractor, this.client);
        }

        @Override
        public void startExtractJob(JobReference jobRef, JobConfigurationExtract extractConfig) throws InterruptedException, IOException {
            HashMap<String, String> labelMap = new HashMap<String, String>();
            Job job = new Job().setJobReference(jobRef).setConfiguration(new JobConfiguration().setExtract(extractConfig).setLabels(this.bqIOMetadata.addAdditionalJobLabels(labelMap)));
            JobServiceImpl.startJob(job, this.errorExtractor, this.client);
        }

        @Override
        public void startQueryJob(JobReference jobRef, JobConfigurationQuery queryConfig) throws IOException, InterruptedException {
            HashMap<String, String> labelMap = new HashMap<String, String>();
            Job job = new Job().setJobReference(jobRef).setConfiguration(new JobConfiguration().setQuery(queryConfig).setLabels(this.bqIOMetadata.addAdditionalJobLabels(labelMap)));
            JobServiceImpl.startJob(job, this.errorExtractor, this.client);
        }

        @Override
        public void startCopyJob(JobReference jobRef, JobConfigurationTableCopy copyConfig) throws IOException, InterruptedException {
            HashMap<String, String> labelMap = new HashMap<String, String>();
            Job job = new Job().setJobReference(jobRef).setConfiguration(new JobConfiguration().setCopy(copyConfig).setLabels(this.bqIOMetadata.addAdditionalJobLabels(labelMap)));
            JobServiceImpl.startJob(job, this.errorExtractor, this.client);
        }

        private static void startJob(Job job, ApiErrorExtractor errorExtractor, Bigquery client) throws IOException, InterruptedException {
            JobServiceImpl.startJob(job, errorExtractor, client, Sleeper.DEFAULT, BigQueryServicesImpl.createDefaultBackoff());
        }

        @VisibleForTesting
        static void startJob(Job job, ApiErrorExtractor errorExtractor, Bigquery client, Sleeper sleeper, BackOff backoff) throws IOException, InterruptedException {
            JobReference jobRef = job.getJobReference();
            while (true) {
                try {
                    client.jobs().insert(jobRef.getProjectId(), job).execute();
                    LOG.info("Started BigQuery job: {}.\n{}", (Object)jobRef, (Object)JobServiceImpl.formatBqStatusCommand(jobRef.getProjectId(), jobRef.getJobId()));
                    return;
                }
                catch (IOException e) {
                    if (errorExtractor.itemAlreadyExists(e)) {
                        LOG.info("BigQuery job " + jobRef + " already exists, will not retry inserting it:", (Throwable)e);
                        return;
                    }
                    LOG.info("Failed to insert job " + jobRef + ", will retry:", (Throwable)e);
                    IOException lastException = e;
                    if (BigQueryServicesImpl.nextBackOff(sleeper, backoff)) continue;
                    throw new IOException(String.format("Unable to insert job: %s, aborting after %d .", jobRef.getJobId(), 9), lastException);
                }
                break;
            }
        }

        @Override
        public Job pollJob(JobReference jobRef, int maxAttempts) throws InterruptedException {
            BackOff backoff = BackOffAdapter.toGcpBackOff((org.apache.beam.sdk.util.BackOff)FluentBackoff.DEFAULT.withMaxRetries(maxAttempts).withInitialBackoff(INITIAL_JOB_STATUS_POLL_BACKOFF).withMaxBackoff(Duration.standardMinutes((long)1L)).backoff());
            return this.pollJob(jobRef, Sleeper.DEFAULT, backoff);
        }

        @VisibleForTesting
        Job pollJob(JobReference jobRef, Sleeper sleeper, BackOff backoff) throws InterruptedException {
            do {
                try {
                    Job job = (Job)this.client.jobs().get(jobRef.getProjectId(), jobRef.getJobId()).setLocation(jobRef.getLocation()).execute();
                    if (job == null) {
                        LOG.info("Still waiting for BigQuery job {} to start", (Object)jobRef);
                        continue;
                    }
                    JobStatus status = job.getStatus();
                    if (status == null) {
                        LOG.info("Still waiting for BigQuery job {} to enter pending state", (Object)jobRef);
                        continue;
                    }
                    if ("DONE".equals(status.getState())) {
                        LOG.info("BigQuery job {} completed in state DONE", (Object)jobRef);
                        return job;
                    }
                    LOG.info("Still waiting for BigQuery job {}, currently in status {}\n{}", new Object[]{jobRef.getJobId(), status, JobServiceImpl.formatBqStatusCommand(jobRef.getProjectId(), jobRef.getJobId())});
                }
                catch (IOException e) {
                    LOG.info("Ignore the error and retry polling job status.", (Throwable)e);
                }
            } while (BigQueryServicesImpl.nextBackOff(sleeper, backoff));
            LOG.warn("Unable to poll job status: {}, aborting after reached max .", (Object)jobRef.getJobId());
            return null;
        }

        private static String formatBqStatusCommand(String projectId, String jobId) {
            return String.format("bq show -j --format=prettyjson --project_id=%s %s", projectId, jobId);
        }

        @Override
        public JobStatistics dryRunQuery(String projectId, JobConfigurationQuery queryConfig, String location) throws InterruptedException, IOException {
            JobReference jobRef = new JobReference().setLocation(location).setProjectId(projectId);
            Job job = new Job().setJobReference(jobRef).setConfiguration(new JobConfiguration().setQuery(queryConfig).setDryRun(Boolean.valueOf(true)));
            return ((Job)BigQueryServicesImpl.executeWithRetries(this.client.jobs().insert(projectId, job), String.format("Unable to dry run query: %s, aborting after %d retries.", queryConfig, 9), Sleeper.DEFAULT, BigQueryServicesImpl.createDefaultBackoff(), ALWAYS_RETRY)).getStatistics();
        }

        @Override
        public Job getJob(JobReference jobRef) throws IOException, InterruptedException {
            return this.getJob(jobRef, Sleeper.DEFAULT, BigQueryServicesImpl.createDefaultBackoff());
        }

        @VisibleForTesting
        public Job getJob(JobReference jobRef, Sleeper sleeper, BackOff backoff) throws IOException, InterruptedException {
            Throwable lastException;
            String jobId = jobRef.getJobId();
            do {
                try {
                    return (Job)this.client.jobs().get(jobRef.getProjectId(), jobId).setLocation(jobRef.getLocation()).execute();
                }
                catch (GoogleJsonResponseException e) {
                    if (this.errorExtractor.itemNotFound((IOException)((Object)e))) {
                        LOG.info("No BigQuery job with job id {} found in location {}.", (Object)jobId, (Object)jobRef.getLocation());
                        return null;
                    }
                    LOG.info("Ignoring the error encountered while trying to query the BigQuery job {}", (Object)jobId, (Object)e);
                    lastException = e;
                }
                catch (IOException e) {
                    LOG.info("Ignoring the error encountered while trying to query the BigQuery job {}", (Object)jobId, (Object)e);
                    lastException = e;
                }
            } while (BigQueryServicesImpl.nextBackOff(sleeper, backoff));
            throw new IOException(String.format("Unable to find BigQuery job: %s, aborting after %d retries.", jobRef, 9), lastException);
        }
    }
}

