/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.hadoop.format;

import com.google.auto.value.AutoValue;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
import org.apache.beam.sdk.io.hadoop.WritableCoder;
import org.apache.beam.sdk.io.hadoop.format.AutoValue_HadoopFormatIO_Read;
import org.apache.beam.sdk.io.hadoop.format.ExternalSynchronization;
import org.apache.beam.sdk.io.hadoop.format.HadoopFormats;
import org.apache.beam.sdk.io.hadoop.format.IterableCombinerFn;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.AtomicDouble;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental(value=Experimental.Kind.SOURCE_SINK)
public class HadoopFormatIO {
    private static final Logger LOG = LoggerFactory.getLogger(HadoopFormatIO.class);
    public static final String OUTPUT_FORMAT_CLASS_ATTR = "mapreduce.job.outputformat.class";
    public static final String OUTPUT_KEY_CLASS = "mapreduce.job.output.key.class";
    public static final String OUTPUT_VALUE_CLASS = "mapreduce.job.output.value.class";
    public static final String NUM_REDUCES = "mapreduce.job.reduces";
    public static final String PARTITIONER_CLASS_ATTR = "mapreduce.job.partitioner.class";
    public static final String JOB_ID = "mapreduce.job.id";
    public static final String OUTPUT_DIR = "mapreduce.output.fileoutputformat.outputdir";

    public static <K, V> Read<K, V> read() {
        return new AutoValue_HadoopFormatIO_Read.Builder().build();
    }

    public static <KeyT, ValueT> Write.WriteBuilder<KeyT, ValueT> write() {
        return new Write.Builder();
    }

    private static class PrepareNonPartitionedTasksFn<KeyT, ValueT>
    extends DoFn<KV<KeyT, ValueT>, KV<Integer, KV<KeyT, ValueT>>> {
        private transient TaskID taskId;
        private final PCollectionView<Configuration> configView;
        private final ExternalSynchronization externalSynchronization;

        private PrepareNonPartitionedTasksFn(PCollectionView<Configuration> configView, ExternalSynchronization externalSynchronization) {
            this.configView = configView;
            this.externalSynchronization = externalSynchronization;
        }

        @DoFn.StartBundle
        public void startBundle() {
            this.taskId = null;
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element KV<KeyT, ValueT> element, DoFn.OutputReceiver<KV<Integer, KV<KeyT, ValueT>>> output, DoFn.ProcessContext c) {
            if (this.taskId == null) {
                Configuration conf = (Configuration)c.sideInput(this.configView);
                this.taskId = this.externalSynchronization.acquireTaskIdLock(conf);
            }
            output.output((Object)KV.of((Object)this.taskId.getId(), element));
        }
    }

    private static class WriteFn<KeyT, ValueT>
    extends DoFn<KV<Integer, KV<KeyT, ValueT>>, Integer> {
        private final PCollectionView<Configuration> configView;
        private final ExternalSynchronization externalSynchronization;
        private transient Map<KV<BoundedWindow, Integer>, TaskContext<KeyT, ValueT>> bundleTaskContextMap;

        WriteFn(PCollectionView<Configuration> configView, ExternalSynchronization externalSynchronization) {
            this.configView = configView;
            this.externalSynchronization = externalSynchronization;
        }

        @DoFn.StartBundle
        public void startBundle() {
            this.bundleTaskContextMap = new HashMap<KV<BoundedWindow, Integer>, TaskContext<KeyT, ValueT>>();
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element KV<Integer, KV<KeyT, ValueT>> element, DoFn.ProcessContext c, BoundedWindow b) {
            Integer taskID = (Integer)element.getKey();
            KV win2TaskId = KV.of((Object)b, (Object)taskID);
            TaskContext taskContext = this.bundleTaskContextMap.computeIfAbsent((KV<BoundedWindow, Integer>)win2TaskId, w2t -> this.setupTask((Integer)w2t.getValue(), c));
            this.write((KV)element.getValue(), taskContext);
        }

        @DoFn.FinishBundle
        public void finishBundle(DoFn.FinishBundleContext c) {
            if (this.bundleTaskContextMap == null) {
                return;
            }
            for (Map.Entry<KV<BoundedWindow, Integer>, TaskContext<KeyT, ValueT>> entry : this.bundleTaskContextMap.entrySet()) {
                TaskContext<KeyT, ValueT> taskContext = entry.getValue();
                try {
                    taskContext.getRecordWriter().close(taskContext.getTaskAttemptContext());
                    taskContext.getOutputCommitter().commitTask(taskContext.getTaskAttemptContext());
                    LOG.info("Write task for {} was successfully committed!", taskContext);
                }
                catch (Exception e) {
                    this.processTaskException(taskContext, e);
                }
                BoundedWindow window = (BoundedWindow)entry.getKey().getKey();
                c.output((Object)taskContext.getTaskId(), Objects.requireNonNull(window).maxTimestamp(), window);
            }
        }

        private void processTaskException(TaskContext<KeyT, ValueT> taskContext, Exception e) {
            LOG.warn("Write task for {} failed. Will abort task.", taskContext);
            taskContext.abortTask();
            throw new IllegalArgumentException(e);
        }

        private void write(KV<KeyT, ValueT> kv, TaskContext<KeyT, ValueT> taskContext) {
            try {
                RecordWriter<KeyT, ValueT> recordWriter = taskContext.getRecordWriter();
                recordWriter.write(kv.getKey(), kv.getValue());
            }
            catch (Exception e) {
                this.processTaskException(taskContext, e);
            }
        }

        private TaskContext<KeyT, ValueT> setupTask(Integer taskId, DoFn.ProcessContext c) throws IllegalStateException {
            Configuration conf = (Configuration)c.sideInput(this.configView);
            TaskAttemptID taskAttemptID = this.externalSynchronization.acquireTaskAttemptIdLock(conf, taskId);
            TaskContext taskContext = new TaskContext(taskAttemptID, conf);
            try {
                taskContext.getOutputCommitter().setupTask(taskContext.getTaskAttemptContext());
            }
            catch (Exception e) {
                this.processTaskException(taskContext, e);
            }
            LOG.info("Task with id {} of job {} was successfully setup!", (Object)taskId, (Object)HadoopFormats.getJobId(conf).getJtIdentifier());
            return taskContext;
        }
    }

    private static class AssignTaskFn<KeyT, ValueT>
    extends DoFn<KV<KeyT, ValueT>, KV<Integer, KV<KeyT, ValueT>>> {
        private final PCollectionView<Configuration> configView;
        private transient Map<Integer, TaskID> partitionToTaskContext;
        private transient Partitioner<KeyT, ValueT> partitioner;
        private transient Integer reducersCount;
        private transient JobID jobId;

        AssignTaskFn(PCollectionView<Configuration> configView) {
            this.configView = configView;
        }

        @DoFn.StartBundle
        public void startBundle() {
            this.partitionToTaskContext = new HashMap<Integer, TaskID>();
            this.partitioner = null;
            this.jobId = null;
            this.reducersCount = null;
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element KV<KeyT, ValueT> element, DoFn.OutputReceiver<KV<Integer, KV<KeyT, ValueT>>> receiver, DoFn.ProcessContext c) {
            Configuration config = (Configuration)c.sideInput(this.configView);
            TaskID taskID = this.createTaskIDForKV(element, config);
            int taskId = taskID.getId();
            receiver.output((Object)KV.of((Object)taskId, element));
        }

        private TaskID createTaskIDForKV(KV<KeyT, ValueT> kv, Configuration config) {
            int taskContextKey = this.getPartitioner(config).getPartition(kv.getKey(), kv.getValue(), this.getReducersCount(config));
            return this.partitionToTaskContext.computeIfAbsent(taskContextKey, key -> HadoopFormats.createTaskID(this.getJobId(config), key));
        }

        private JobID getJobId(Configuration config) {
            if (this.jobId == null) {
                this.jobId = HadoopFormats.getJobId(config);
            }
            return this.jobId;
        }

        private int getReducersCount(Configuration config) {
            if (this.reducersCount == null) {
                this.reducersCount = HadoopFormats.getReducersCount(config);
            }
            return this.reducersCount;
        }

        private Partitioner<KeyT, ValueT> getPartitioner(Configuration config) {
            if (this.partitioner == null) {
                this.partitioner = HadoopFormats.getPartitioner(config);
            }
            return this.partitioner;
        }
    }

    private static class CommitJobFn<T>
    extends DoFn<Iterable<T>, Void> {
        private final PCollectionView<Configuration> configView;
        private final ExternalSynchronization externalSynchronization;

        CommitJobFn(PCollectionView<Configuration> configView, ExternalSynchronization externalSynchronization) {
            this.configView = configView;
            this.externalSynchronization = externalSynchronization;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) {
            Configuration config = (Configuration)c.sideInput(this.configView);
            this.cleanupJob(config);
        }

        private void cleanupJob(Configuration config) {
            this.externalSynchronization.releaseJobIdLock(config);
            JobID jobID = HadoopFormats.getJobId(config);
            TaskAttemptContext cleanupTaskContext = HadoopFormats.createCleanupTaskContext(config, jobID);
            OutputFormat outputFormat = HadoopFormats.createOutputFormatFromConfig(config);
            try {
                OutputCommitter outputCommitter = outputFormat.getOutputCommitter(cleanupTaskContext);
                outputCommitter.commitJob((JobContext)cleanupTaskContext);
            }
            catch (Exception e) {
                throw new RuntimeException("Unable to commit job.", e);
            }
        }
    }

    private static class SetupJobFn<KeyT, ValueT>
    extends DoFn<KV<KeyT, ValueT>, KV<KeyT, ValueT>> {
        private final ExternalSynchronization externalSynchronization;
        private final PCollectionView<Configuration> configView;
        private final TypeDescriptor<KV<KeyT, ValueT>> inputTypeDescriptor;
        private boolean isSetupJobAttempted;

        SetupJobFn(ExternalSynchronization externalSynchronization, PCollectionView<Configuration> configView, TypeDescriptor<KV<KeyT, ValueT>> inputTypeDescriptor) {
            this.externalSynchronization = externalSynchronization;
            this.configView = configView;
            this.inputTypeDescriptor = inputTypeDescriptor;
        }

        @DoFn.Setup
        public void setup() {
            this.isSetupJobAttempted = false;
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element KV<KeyT, ValueT> element, DoFn.OutputReceiver<KV<KeyT, ValueT>> receiver, BoundedWindow window, DoFn.ProcessContext c) {
            receiver.output(element);
            if (this.isSetupJobAttempted) {
                return;
            }
            Configuration conf = (Configuration)c.sideInput(this.configView);
            this.validateConfiguration(conf);
            this.validateInputData(conf);
            boolean isJobLockAcquired = this.externalSynchronization.tryAcquireJobLock(conf);
            this.isSetupJobAttempted = true;
            if (!isJobLockAcquired) {
                return;
            }
            try {
                JobID jobId = HadoopFormats.getJobId(conf);
                this.trySetupJob(jobId, conf, window);
            }
            catch (Exception e) {
                throw new IllegalStateException(e);
            }
        }

        private void validateConfiguration(Configuration conf) {
            Preconditions.checkArgument((conf != null ? 1 : 0) != 0, (Object)"Configuration can not be null");
            Preconditions.checkArgument((conf.get(HadoopFormatIO.OUTPUT_FORMAT_CLASS_ATTR) != null ? 1 : 0) != 0, (Object)"Configuration must contain \"mapreduce.job.outputformat.class\"");
            Preconditions.checkArgument((conf.get(HadoopFormatIO.OUTPUT_KEY_CLASS) != null ? 1 : 0) != 0, (Object)"Configuration must contain \"mapreduce.job.output.key.class\"");
            Preconditions.checkArgument((conf.get(HadoopFormatIO.OUTPUT_VALUE_CLASS) != null ? 1 : 0) != 0, (Object)"Configuration must contain \"mapreduce.job.output.value.class\"");
            Preconditions.checkArgument((conf.get(HadoopFormatIO.JOB_ID) != null ? 1 : 0) != 0, (Object)"Configuration must contain \"mapreduce.job.id\"");
        }

        private void validateInputData(Configuration conf) {
            TypeDescriptor outputFormatKeyClass = TypeDescriptor.of((Class)conf.getClass(HadoopFormatIO.OUTPUT_KEY_CLASS, null));
            TypeDescriptor outputFormatValueClass = TypeDescriptor.of((Class)conf.getClass(HadoopFormatIO.OUTPUT_VALUE_CLASS, null));
            Preconditions.checkArgument((this.inputTypeDescriptor != null ? 1 : 0) != 0, (String)"Input %s must be set!", (Object)TypeDescriptor.class.getSimpleName());
            Preconditions.checkArgument((boolean)KV.class.equals((Object)this.inputTypeDescriptor.getRawType()), (String)"%s expects %s as input type.", (Object)Write.class.getSimpleName(), KV.class);
            Preconditions.checkArgument((boolean)this.inputTypeDescriptor.equals((Object)TypeDescriptors.kvs((TypeDescriptor)outputFormatKeyClass, (TypeDescriptor)outputFormatValueClass)), (String)"%s expects following %ss: KV(Key: %s, Value: %s) but following %ss are set: KV(Key: %s, Value: %s)", (Object[])new Object[]{Write.class.getSimpleName(), TypeDescriptor.class.getSimpleName(), outputFormatKeyClass.getRawType(), outputFormatValueClass.getRawType(), TypeDescriptor.class.getSimpleName(), this.inputTypeDescriptor.resolveType(KV.class.getTypeParameters()[0]), this.inputTypeDescriptor.resolveType(KV.class.getTypeParameters()[1])});
        }

        private void trySetupJob(JobID jobId, Configuration conf, BoundedWindow window) {
            try {
                TaskAttemptContext setupTaskContext = HadoopFormats.createSetupTaskContext(conf, jobId);
                OutputFormat jobOutputFormat = HadoopFormats.createOutputFormatFromConfig(conf);
                jobOutputFormat.checkOutputSpecs((JobContext)setupTaskContext);
                jobOutputFormat.getOutputCommitter(setupTaskContext).setupJob((JobContext)setupTaskContext);
                LOG.info("Job with id {} successfully configured from window with max timestamp {}.", (Object)jobId.getJtIdentifier(), (Object)window.maxTimestamp());
            }
            catch (FileAlreadyExistsException e) {
                LOG.info("Job was already set by other worker. Skipping rest of the setup.");
            }
            catch (Exception e) {
                throw new RuntimeException("Unable to setup job.", e);
            }
        }
    }

    private static class ConfigurationCoder
    extends AtomicCoder<Configuration> {
        private ConfigurationCoder() {
        }

        public void encode(Configuration value, OutputStream outStream) throws IOException {
            DataOutputStream dataOutputStream = new DataOutputStream(outStream);
            value.write((DataOutput)dataOutputStream);
            dataOutputStream.flush();
        }

        public Configuration decode(InputStream inStream) throws IOException {
            DataInputStream dataInputStream = new DataInputStream(inStream);
            Configuration config = new Configuration(false);
            config.readFields((DataInput)dataInputStream);
            return config;
        }
    }

    private static class TaskContext<KeyT, ValueT> {
        private final RecordWriter<KeyT, ValueT> recordWriter;
        private final OutputCommitter outputCommitter;
        private final TaskAttemptContext taskAttemptContext;

        TaskContext(TaskAttemptID taskAttempt, Configuration conf) {
            this.taskAttemptContext = HadoopFormats.createTaskAttemptContext(conf, taskAttempt);
            OutputFormat outputFormatObj = HadoopFormats.createOutputFormatFromConfig(conf);
            this.outputCommitter = TaskContext.initOutputCommitter(outputFormatObj, conf, this.taskAttemptContext);
            this.recordWriter = this.initRecordWriter(outputFormatObj, this.taskAttemptContext);
        }

        RecordWriter<KeyT, ValueT> getRecordWriter() {
            return this.recordWriter;
        }

        OutputCommitter getOutputCommitter() {
            return this.outputCommitter;
        }

        TaskAttemptContext getTaskAttemptContext() {
            return this.taskAttemptContext;
        }

        int getTaskId() {
            return this.taskAttemptContext.getTaskAttemptID().getTaskID().getId();
        }

        String getJobId() {
            return this.taskAttemptContext.getJobID().getJtIdentifier();
        }

        void abortTask() {
            try {
                this.outputCommitter.abortTask(this.taskAttemptContext);
            }
            catch (IOException e) {
                throw new IllegalStateException(String.format("Unable to abort task %s of job %s", this.getTaskId(), this.getJobId()));
            }
        }

        private RecordWriter<KeyT, ValueT> initRecordWriter(OutputFormat<KeyT, ValueT> outputFormatObj, TaskAttemptContext taskAttemptContext) throws IllegalStateException {
            try {
                LOG.info("Creating new RecordWriter for task {} of Job with id {}.", (Object)taskAttemptContext.getTaskAttemptID().getTaskID().getId(), (Object)taskAttemptContext.getJobID().getJtIdentifier());
                return outputFormatObj.getRecordWriter(taskAttemptContext);
            }
            catch (IOException | InterruptedException e) {
                throw new IllegalStateException("Unable to create RecordWriter object: ", e);
            }
        }

        private static OutputCommitter initOutputCommitter(OutputFormat<?, ?> outputFormatObj, Configuration conf, TaskAttemptContext taskAttemptContext) throws IllegalStateException {
            OutputCommitter outputCommitter;
            try {
                outputCommitter = outputFormatObj.getOutputCommitter(taskAttemptContext);
                if (outputCommitter != null) {
                    outputCommitter.setupJob((JobContext)new JobContextImpl(conf, taskAttemptContext.getJobID()));
                }
            }
            catch (Exception e) {
                throw new IllegalStateException("Unable to create OutputCommitter object: ", e);
            }
            return outputCommitter;
        }

        public String toString() {
            return "TaskContext{jobId=" + this.getJobId() + ", taskId=" + this.getTaskId() + ", attemptId=" + this.taskAttemptContext.getTaskAttemptID().getId() + '}';
        }
    }

    public static class Write<KeyT, ValueT>
    extends PTransform<PCollection<KV<KeyT, ValueT>>, PDone> {
        private final transient @Nullable Configuration configuration;
        private final @Nullable PTransform<PCollection<? extends KV<KeyT, ValueT>>, PCollectionView<Configuration>> configTransform;
        private final ExternalSynchronization externalSynchronization;
        private final boolean withPartitioning;

        Write(@Nullable Configuration configuration, @Nullable PTransform<PCollection<? extends KV<KeyT, ValueT>>, PCollectionView<Configuration>> configTransform, ExternalSynchronization externalSynchronization, boolean withPartitioning) {
            this.configuration = configuration;
            this.configTransform = configTransform;
            this.externalSynchronization = externalSynchronization;
            this.withPartitioning = withPartitioning;
        }

        public void validate(PipelineOptions pipelineOptions) {
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            Configuration hadoopConfig = this.configuration;
            if (hadoopConfig != null) {
                builder.addIfNotNull(DisplayData.item((String)HadoopFormatIO.OUTPUT_FORMAT_CLASS_ATTR, (String)hadoopConfig.get(HadoopFormatIO.OUTPUT_FORMAT_CLASS_ATTR)).withLabel("OutputFormat Class"));
                builder.addIfNotNull(DisplayData.item((String)HadoopFormatIO.OUTPUT_KEY_CLASS, (String)hadoopConfig.get(HadoopFormatIO.OUTPUT_KEY_CLASS)).withLabel("OutputFormat Key Class"));
                builder.addIfNotNull(DisplayData.item((String)HadoopFormatIO.OUTPUT_VALUE_CLASS, (String)hadoopConfig.get(HadoopFormatIO.OUTPUT_VALUE_CLASS)).withLabel("OutputFormat Value Class"));
                builder.addIfNotNull(DisplayData.item((String)HadoopFormatIO.PARTITIONER_CLASS_ATTR, (String)hadoopConfig.get(HadoopFormatIO.PARTITIONER_CLASS_ATTR, HadoopFormats.DEFAULT_PARTITIONER_CLASS_ATTR.getName())).withLabel("Partitioner Class"));
            }
        }

        public PDone expand(PCollection<KV<KeyT, ValueT>> input) {
            if (input.isBounded().equals((Object)PCollection.IsBounded.UNBOUNDED) || !input.getWindowingStrategy().equals((Object)WindowingStrategy.globalDefault())) {
                Preconditions.checkArgument((this.configTransform != null ? 1 : 0) != 0, (String)"Writing of unbounded data can be processed only with configuration transformation provider. See %s.withConfigurationTransform()", Write.class);
            }
            this.verifyInputWindowing(input);
            TypeDescriptor<Configuration> configType = new TypeDescriptor<Configuration>(){};
            input.getPipeline().getCoderRegistry().registerCoderForType((TypeDescriptor)configType, (Coder)new ConfigurationCoder());
            PCollectionView<Configuration> configView = this.createConfigurationView(input);
            return this.processJob(input, configView);
        }

        private PDone processJob(PCollection<KV<KeyT, ValueT>> input, PCollectionView<Configuration> configView) {
            TypeDescriptor iterableIntType = TypeDescriptors.iterables((TypeDescriptor)TypeDescriptors.integers());
            PCollection validatedInput = (PCollection)input.apply((PTransform)ParDo.of(new SetupJobFn(this.externalSynchronization, configView, input.getTypeDescriptor())).withSideInputs(new PCollectionView[]{configView}));
            PCollection writeData = this.withPartitioning ? (PCollection)validatedInput.apply("GroupDataByPartition", new GroupDataByPartition(configView)) : (PCollection)validatedInput.apply("PrepareNonPartitionedTasks", (PTransform)ParDo.of(new PrepareNonPartitionedTasksFn(configView, this.externalSynchronization)).withSideInputs(new PCollectionView[]{configView}));
            PCollection collectedFinishedWrites = ((PCollection)((PCollection)writeData.apply("Write", (PTransform)ParDo.of(new WriteFn(configView, this.externalSynchronization)).withSideInputs(new PCollectionView[]{configView}))).setTypeDescriptor(TypeDescriptors.integers()).apply("CollectWriteTasks", (PTransform)Combine.globally(new IterableCombinerFn(TypeDescriptors.integers())).withoutDefaults())).setTypeDescriptor(iterableIntType);
            return PDone.in((Pipeline)((PCollection)collectedFinishedWrites.apply("CommitWriteJob", (PTransform)ParDo.of(new CommitJobFn(configView, this.externalSynchronization)).withSideInputs(new PCollectionView[]{configView}))).getPipeline());
        }

        private void verifyInputWindowing(PCollection<KV<KeyT, ValueT>> input) {
            if (input.isBounded().equals((Object)PCollection.IsBounded.UNBOUNDED)) {
                Preconditions.checkArgument((!input.getWindowingStrategy().equals((Object)WindowingStrategy.globalDefault()) ? 1 : 0) != 0, (String)"Cannot work with %s and GLOBAL %s", (Object)PCollection.IsBounded.UNBOUNDED, (Object)WindowingStrategy.class.getSimpleName());
                Preconditions.checkArgument((boolean)input.getWindowingStrategy().getTrigger().getClass().equals(DefaultTrigger.class), (String)"Cannot work with %s trigger. Write works correctly only with %s", (Object)input.getWindowingStrategy().getTrigger().getClass().getSimpleName(), (Object)DefaultTrigger.class.getSimpleName());
                Preconditions.checkArgument((boolean)input.getWindowingStrategy().getAllowedLateness().equals((Object)Duration.ZERO), (Object)"Write does not allow late data.");
            }
        }

        private PCollectionView<Configuration> createConfigurationView(PCollection<KV<KeyT, ValueT>> input) {
            PCollectionView config = this.configuration != null ? (PCollectionView)((PCollection)input.getPipeline().apply("CreateOutputConfig", (PTransform)Create.of((Object)this.configuration, (Object[])new Configuration[0]))).apply((PTransform)View.asSingleton().withDefaultValue((Object)this.configuration)) : (PCollectionView)input.apply("TransformDataIntoConfig", this.configTransform);
            return config;
        }

        static class Builder<KeyT, ValueT>
        implements WriteBuilder<KeyT, ValueT>,
        PartitionedWriterBuilder<KeyT, ValueT>,
        ExternalSynchronizationBuilder<KeyT, ValueT> {
            private Configuration configuration;
            private PTransform<PCollection<? extends KV<KeyT, ValueT>>, PCollectionView<Configuration>> configTransform;
            private boolean isWithPartitioning;

            Builder() {
            }

            @Override
            public PartitionedWriterBuilder<KeyT, ValueT> withConfiguration(Configuration config) {
                Preconditions.checkNotNull((Object)config, (Object)"Hadoop configuration cannot be null");
                this.configuration = config;
                return this;
            }

            @Override
            public ExternalSynchronizationBuilder<KeyT, ValueT> withConfigurationTransform(PTransform<PCollection<? extends KV<KeyT, ValueT>>, PCollectionView<Configuration>> configTransform) {
                Preconditions.checkNotNull(configTransform, (Object)"Configuration transformation cannot be null");
                this.isWithPartitioning = true;
                this.configTransform = configTransform;
                return this;
            }

            @Override
            public ExternalSynchronizationBuilder<KeyT, ValueT> withPartitioning() {
                this.isWithPartitioning = true;
                return this;
            }

            @Override
            public ExternalSynchronizationBuilder<KeyT, ValueT> withoutPartitioning() {
                this.isWithPartitioning = false;
                return this;
            }

            @Override
            public Write<KeyT, ValueT> withExternalSynchronization(ExternalSynchronization externalSynchronization) {
                Preconditions.checkNotNull((Object)externalSynchronization, (Object)"External synchronization cannot be null");
                return new Write(this.configuration, this.configTransform, externalSynchronization, this.isWithPartitioning);
            }
        }

        public static interface WriteBuilder<KeyT, ValueT> {
            public PartitionedWriterBuilder<KeyT, ValueT> withConfiguration(Configuration var1);

            public ExternalSynchronizationBuilder<KeyT, ValueT> withConfigurationTransform(PTransform<PCollection<? extends KV<KeyT, ValueT>>, PCollectionView<Configuration>> var1);
        }

        public static interface ExternalSynchronizationBuilder<KeyT, ValueT> {
            public Write<KeyT, ValueT> withExternalSynchronization(ExternalSynchronization var1);
        }

        public static interface PartitionedWriterBuilder<KeyT, ValueT> {
            public ExternalSynchronizationBuilder<KeyT, ValueT> withPartitioning();

            public ExternalSynchronizationBuilder<KeyT, ValueT> withoutPartitioning();
        }
    }

    private static class FlattenGroupedTasks<KeyT, ValueT>
    extends DoFn<KV<Integer, Iterable<KV<KeyT, ValueT>>>, KV<Integer, KV<KeyT, ValueT>>> {
        private FlattenGroupedTasks() {
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element KV<Integer, Iterable<KV<KeyT, ValueT>>> input, DoFn.OutputReceiver<KV<Integer, KV<KeyT, ValueT>>> outputReceiver) {
            Integer key = (Integer)input.getKey();
            for (KV element : Objects.requireNonNull((Iterable)input.getValue(), "Iterable can not be null.")) {
                outputReceiver.output((Object)KV.of((Object)key, (Object)element));
            }
        }
    }

    private static class GroupDataByPartition<KeyT, ValueT>
    extends PTransform<PCollection<KV<KeyT, ValueT>>, PCollection<KV<Integer, KV<KeyT, ValueT>>>> {
        private final PCollectionView<Configuration> configView;

        private GroupDataByPartition(PCollectionView<Configuration> configView) {
            this.configView = configView;
        }

        public PCollection<KV<Integer, KV<KeyT, ValueT>>> expand(PCollection<KV<KeyT, ValueT>> input) {
            return (PCollection)((PCollection)((PCollection)input.apply("AssignTask", (PTransform)ParDo.of(new AssignTaskFn(this.configView)).withSideInputs(new PCollectionView[]{this.configView}))).setTypeDescriptor(TypeDescriptors.kvs((TypeDescriptor)TypeDescriptors.integers(), (TypeDescriptor)input.getTypeDescriptor())).apply("GroupByTaskId", (PTransform)GroupByKey.create())).apply("FlattenGroupedTasks", (PTransform)ParDo.of(new FlattenGroupedTasks()));
        }
    }

    public static class SerializableSplit
    implements Serializable {
        InputSplit inputSplit;

        public SerializableSplit() {
        }

        public SerializableSplit(InputSplit split) {
            Preconditions.checkArgument((boolean)(split instanceof Writable), (Object)String.format("Split is not of type Writable: %s", split));
            this.inputSplit = split;
        }

        public InputSplit getSplit() {
            return this.inputSplit;
        }

        private void readObject(ObjectInputStream in) throws IOException {
            ObjectWritable ow = new ObjectWritable();
            ow.setConf(new Configuration(false));
            ow.readFields((DataInput)in);
            this.inputSplit = (InputSplit)ow.get();
        }

        private void writeObject(ObjectOutputStream out) throws IOException {
            new ObjectWritable((Object)this.inputSplit).write((DataOutput)out);
        }
    }

    public static class HadoopInputFormatBoundedSource<K, V>
    extends BoundedSource<KV<K, V>>
    implements Serializable {
        private final SerializableConfiguration conf;
        private final Coder<K> keyCoder;
        private final Coder<V> valueCoder;
        private final @Nullable SimpleFunction<?, K> keyTranslationFunction;
        private final @Nullable SimpleFunction<?, V> valueTranslationFunction;
        private final SerializableSplit inputSplit;
        private transient List<SerializableSplit> inputSplits;
        private long boundedSourceEstimatedSize = 0L;
        private transient InputFormat<?, ?> inputFormatObj;
        private transient TaskAttemptContext taskAttemptContext;
        private static final Set<Class<?>> immutableTypes = new HashSet<Class>(Arrays.asList(String.class, Byte.class, Short.class, Integer.class, Long.class, Float.class, Double.class, Boolean.class, BigInteger.class, BigDecimal.class));

        HadoopInputFormatBoundedSource(SerializableConfiguration conf, Coder<K> keyCoder, Coder<V> valueCoder, @Nullable SimpleFunction<?, K> keyTranslationFunction, @Nullable SimpleFunction<?, V> valueTranslationFunction) {
            this(conf, keyCoder, valueCoder, keyTranslationFunction, valueTranslationFunction, null);
        }

        protected HadoopInputFormatBoundedSource(SerializableConfiguration conf, Coder<K> keyCoder, Coder<V> valueCoder, @Nullable SimpleFunction<?, K> keyTranslationFunction, @Nullable SimpleFunction<?, V> valueTranslationFunction, SerializableSplit inputSplit) {
            this.conf = conf;
            this.inputSplit = inputSplit;
            this.keyCoder = keyCoder;
            this.valueCoder = valueCoder;
            this.keyTranslationFunction = keyTranslationFunction;
            this.valueTranslationFunction = valueTranslationFunction;
        }

        public SerializableConfiguration getConfiguration() {
            return this.conf;
        }

        public void validate() {
            Preconditions.checkArgument((this.conf != null ? 1 : 0) != 0, (Object)"conf can not be null");
            Preconditions.checkArgument((this.keyCoder != null ? 1 : 0) != 0, (Object)"keyCoder can not be null");
            Preconditions.checkArgument((this.valueCoder != null ? 1 : 0) != 0, (Object)"valueCoder can not be null");
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            Configuration hadoopConfig = this.getConfiguration().get();
            if (hadoopConfig != null) {
                builder.addIfNotNull(DisplayData.item((String)"mapreduce.job.inputformat.class", (String)hadoopConfig.get("mapreduce.job.inputformat.class")).withLabel("InputFormat Class"));
                builder.addIfNotNull(DisplayData.item((String)"key.class", (String)hadoopConfig.get("key.class")).withLabel("Key Class"));
                builder.addIfNotNull(DisplayData.item((String)"value.class", (String)hadoopConfig.get("value.class")).withLabel("Value Class"));
            }
        }

        public List<BoundedSource<KV<K, V>>> split(long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
            if (this.inputSplit != null) {
                LOG.info("Not splitting source {} because source is already split.", (Object)this);
                return ImmutableList.of((Object)this);
            }
            this.computeSplitsIfNecessary();
            LOG.info("Generated {} splits. Size of first split is {} ", (Object)this.inputSplits.size(), (Object)this.inputSplits.get(0).getSplit().getLength());
            return this.inputSplits.stream().map(serializableInputSplit -> new HadoopInputFormatBoundedSource<K, V>(this.conf, this.keyCoder, this.valueCoder, this.keyTranslationFunction, this.valueTranslationFunction, (SerializableSplit)serializableInputSplit)).collect(Collectors.toList());
        }

        public long getEstimatedSizeBytes(PipelineOptions po) throws Exception {
            if (this.inputSplit == null) {
                this.computeSplitsIfNecessary();
                return this.boundedSourceEstimatedSize;
            }
            return this.inputSplit.getSplit().getLength();
        }

        @VisibleForTesting
        void computeSplitsIfNecessary() throws IOException, InterruptedException {
            if (this.inputSplits != null) {
                return;
            }
            this.createInputFormatInstance();
            List splits = this.inputFormatObj.getSplits((JobContext)Job.getInstance((Configuration)this.conf.get()));
            if (splits == null) {
                throw new IOException("Error in computing splits, getSplits() returns null.");
            }
            if (splits.isEmpty()) {
                throw new IOException("Error in computing splits, getSplits() returns a empty list");
            }
            this.boundedSourceEstimatedSize = 0L;
            this.inputSplits = new ArrayList<SerializableSplit>();
            for (InputSplit inputSplit : splits) {
                if (inputSplit == null) {
                    throw new IOException("Error in computing splits, split is null in InputSplits list populated by getSplits() : ");
                }
                this.boundedSourceEstimatedSize += inputSplit.getLength();
                this.inputSplits.add(new SerializableSplit(inputSplit));
            }
        }

        protected void createInputFormatInstance() throws IOException {
            if (this.inputFormatObj == null) {
                try {
                    this.taskAttemptContext = new TaskAttemptContextImpl(this.conf.get(), new TaskAttemptID());
                    this.inputFormatObj = (InputFormat)this.conf.get().getClassByName(this.conf.get().get("mapreduce.job.inputformat.class")).getConstructor(new Class[0]).newInstance(new Object[0]);
                    if (Configurable.class.isAssignableFrom(this.inputFormatObj.getClass())) {
                        ((Configurable)this.inputFormatObj).setConf(this.conf.get());
                    }
                }
                catch (ClassNotFoundException | IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
                    throw new IOException("Unable to create InputFormat object: ", e);
                }
            }
        }

        @VisibleForTesting
        InputFormat<?, ?> getInputFormat() {
            return this.inputFormatObj;
        }

        @VisibleForTesting
        void setInputFormatObj(InputFormat<?, ?> inputFormatObj) {
            this.inputFormatObj = inputFormatObj;
        }

        public Coder<KV<K, V>> getOutputCoder() {
            return KvCoder.of(this.keyCoder, this.valueCoder);
        }

        public BoundedSource.BoundedReader<KV<K, V>> createReader(PipelineOptions options) throws IOException {
            this.validate();
            if (this.inputSplit == null) {
                throw new IOException("Cannot create reader as source is not split yet.");
            }
            this.createInputFormatInstance();
            return new HadoopInputFormatReader(this, this.keyTranslationFunction, this.valueTranslationFunction, this.inputSplit, this.inputFormatObj, this.taskAttemptContext);
        }

        class HadoopInputFormatReader<T1, T2>
        extends BoundedSource.BoundedReader<KV<K, V>> {
            private final HadoopInputFormatBoundedSource<K, V> source;
            private final @Nullable SimpleFunction<T1, K> keyTranslationFunction;
            private final @Nullable SimpleFunction<T2, V> valueTranslationFunction;
            private final SerializableSplit split;
            private RecordReader<T1, T2> recordReader;
            private volatile boolean doneReading = false;
            private final AtomicLong recordsReturned = new AtomicLong();
            private final AtomicDouble progressValue = new AtomicDouble();
            private final transient InputFormat<T1, T2> inputFormatObj;
            private final transient TaskAttemptContext taskAttemptContext;

            private HadoopInputFormatReader(@Nullable HadoopInputFormatBoundedSource<K, V> source, @Nullable SimpleFunction keyTranslationFunction, SimpleFunction valueTranslationFunction, SerializableSplit split, InputFormat inputFormatObj, TaskAttemptContext taskAttemptContext) {
                this.source = source;
                this.keyTranslationFunction = keyTranslationFunction;
                this.valueTranslationFunction = valueTranslationFunction;
                this.split = split;
                this.inputFormatObj = inputFormatObj;
                this.taskAttemptContext = taskAttemptContext;
            }

            public HadoopInputFormatBoundedSource<K, V> getCurrentSource() {
                return this.source;
            }

            public boolean start() throws IOException {
                try {
                    this.recordsReturned.set(0L);
                    this.recordReader = this.inputFormatObj.createRecordReader(this.split.getSplit(), this.taskAttemptContext);
                    if (this.recordReader != null) {
                        this.recordReader.initialize(this.split.getSplit(), this.taskAttemptContext);
                        this.progressValue.set(this.getProgress().doubleValue());
                        if (this.recordReader.nextKeyValue()) {
                            this.recordsReturned.incrementAndGet();
                            this.doneReading = false;
                            return true;
                        }
                    } else {
                        throw new IOException(String.format("Null RecordReader object returned by %s", this.inputFormatObj.getClass()));
                    }
                    this.recordReader = null;
                }
                catch (InterruptedException e) {
                    throw new IOException("Could not read because the thread got interrupted while reading the records with an exception: ", e);
                }
                this.doneReading = true;
                return false;
            }

            public boolean advance() throws IOException {
                try {
                    this.progressValue.set(this.getProgress().doubleValue());
                    if (this.recordReader.nextKeyValue()) {
                        this.recordsReturned.incrementAndGet();
                        return true;
                    }
                    this.doneReading = true;
                }
                catch (InterruptedException e) {
                    throw new IOException("Unable to read data: ", e);
                }
                return false;
            }

            public KV<K, V> getCurrent() {
                Object value;
                Object key;
                try {
                    key = this.transformKeyOrValue(this.recordReader.getCurrentKey(), this.keyTranslationFunction, HadoopInputFormatBoundedSource.this.keyCoder);
                    value = this.transformKeyOrValue(this.recordReader.getCurrentValue(), this.valueTranslationFunction, HadoopInputFormatBoundedSource.this.valueCoder);
                }
                catch (IOException | InterruptedException e) {
                    LOG.error("Unable to read data: ", (Throwable)e);
                    throw new IllegalStateException("Unable to read data: {}", e);
                }
                return KV.of(key, value);
            }

            private <T, T3> T3 transformKeyOrValue(T input, @Nullable SimpleFunction<T, T3> simpleFunction, Coder<T3> coder) throws CoderException, ClassCastException {
                Object output = null != simpleFunction ? simpleFunction.apply(input) : input;
                return (T3)this.cloneIfPossiblyMutable(output, coder);
            }

            private <T> T cloneIfPossiblyMutable(T input, Coder<T> coder) throws CoderException, ClassCastException {
                if (!this.isKnownImmutable(input)) {
                    input = CoderUtils.clone(coder, input);
                }
                return input;
            }

            private boolean isKnownImmutable(Object o) {
                return immutableTypes.contains(o.getClass());
            }

            public void close() throws IOException {
                LOG.info("Closing reader after reading {} records.", (Object)this.recordsReturned);
                if (this.recordReader != null) {
                    this.recordReader.close();
                    this.recordReader = null;
                }
            }

            public Double getFractionConsumed() {
                if (this.doneReading) {
                    return 1.0;
                }
                if (this.recordReader == null || this.recordsReturned.get() == 0L) {
                    return 0.0;
                }
                if (this.progressValue.get() == 0.0) {
                    return null;
                }
                return this.progressValue.doubleValue();
            }

            private Double getProgress() throws IOException, InterruptedException {
                try {
                    float progress = this.recordReader.getProgress();
                    return (double)progress < 0.0 || progress > 1.0f ? 0.0 : (double)progress;
                }
                catch (IOException e) {
                    LOG.error("Error in computing the fractions consumed as RecordReader.getProgress() throws an exception : ", (Throwable)e);
                    throw new IOException("Error in computing the fractions consumed as RecordReader.getProgress() throws an exception : " + e.getMessage(), e);
                }
            }

            public final long getSplitPointsRemaining() {
                if (this.doneReading) {
                    return 0L;
                }
                return 1L;
            }
        }
    }

    @AutoValue
    public static abstract class Read<K, V>
    extends PTransform<PBegin, PCollection<KV<K, V>>> {
        public abstract @Nullable SerializableConfiguration getConfiguration();

        public abstract @Nullable SimpleFunction<?, K> getKeyTranslationFunction();

        public abstract @Nullable SimpleFunction<?, V> getValueTranslationFunction();

        public abstract @Nullable TypeDescriptor<K> getKeyTypeDescriptor();

        public abstract @Nullable TypeDescriptor<V> getValueTypeDescriptor();

        public abstract @Nullable TypeDescriptor<?> getinputFormatClass();

        public abstract @Nullable TypeDescriptor<?> getinputFormatKeyClass();

        public abstract @Nullable TypeDescriptor<?> getinputFormatValueClass();

        public abstract Builder<K, V> toBuilder();

        public Read<K, V> withConfiguration(Configuration configuration) {
            this.validateConfiguration(configuration);
            TypeDescriptor inputFormatClass = TypeDescriptor.of((Class)configuration.getClass("mapreduce.job.inputformat.class", null));
            TypeDescriptor inputFormatKeyClass = TypeDescriptor.of((Class)configuration.getClass("key.class", null));
            TypeDescriptor inputFormatValueClass = TypeDescriptor.of((Class)configuration.getClass("value.class", null));
            Builder<K, V> builder = this.toBuilder().setConfiguration(new SerializableConfiguration(configuration));
            builder.setInputFormatClass(inputFormatClass);
            builder.setInputFormatKeyClass(inputFormatKeyClass);
            builder.setInputFormatValueClass(inputFormatValueClass);
            if (this.getKeyTranslationFunction() == null) {
                builder.setKeyTypeDescriptor(inputFormatKeyClass);
            }
            if (this.getValueTranslationFunction() == null) {
                builder.setValueTypeDescriptor(inputFormatValueClass);
            }
            return builder.build();
        }

        public Read<K, V> withKeyTranslation(SimpleFunction<?, K> function) {
            Preconditions.checkArgument((function != null ? 1 : 0) != 0, (Object)"function can not be null");
            return this.toBuilder().setKeyTranslationFunction(function).setKeyTypeDescriptor(function.getOutputTypeDescriptor()).build();
        }

        public Read<K, V> withValueTranslation(SimpleFunction<?, V> function) {
            Preconditions.checkArgument((function != null ? 1 : 0) != 0, (Object)"function can not be null");
            return this.toBuilder().setValueTranslationFunction(function).setValueTypeDescriptor(function.getOutputTypeDescriptor()).build();
        }

        public PCollection<KV<K, V>> expand(PBegin input) {
            this.validateTransform();
            CoderRegistry coderRegistry = input.getPipeline().getCoderRegistry();
            Coder keyCoder = this.getDefaultCoder(this.getKeyTypeDescriptor(), coderRegistry);
            Coder valueCoder = this.getDefaultCoder(this.getValueTypeDescriptor(), coderRegistry);
            HadoopInputFormatBoundedSource source = new HadoopInputFormatBoundedSource(this.getConfiguration(), keyCoder, valueCoder, this.getKeyTranslationFunction(), this.getValueTranslationFunction());
            return (PCollection)input.getPipeline().apply((PTransform)org.apache.beam.sdk.io.Read.from(source));
        }

        private void validateConfiguration(Configuration configuration) {
            Preconditions.checkArgument((configuration != null ? 1 : 0) != 0, (Object)"configuration can not be null");
            Preconditions.checkArgument((configuration.get("mapreduce.job.inputformat.class") != null ? 1 : 0) != 0, (Object)"Configuration must contain \"mapreduce.job.inputformat.class\"");
            Preconditions.checkArgument((configuration.get("key.class") != null ? 1 : 0) != 0, (Object)"configuration must contain \"key.class\"");
            Preconditions.checkArgument((configuration.get("value.class") != null ? 1 : 0) != 0, (Object)"configuration must contain \"value.class\"");
            if (configuration.get("mapreduce.job.inputformat.class").endsWith("DBInputFormat")) {
                Preconditions.checkArgument((configuration.get("mapreduce.jdbc.input.orderby") != null ? 1 : 0) != 0, (Object)"Configuration must contain \"mapreduce.jdbc.input.orderby\" when using DBInputFormat");
            }
        }

        public void validateTransform() {
            Preconditions.checkArgument((this.getConfiguration() != null ? 1 : 0) != 0, (Object)"withConfiguration() is required");
            this.validateTranslationFunction(this.getinputFormatKeyClass(), this.getKeyTranslationFunction(), "Key translation's input type is not same as hadoop InputFormat : %s key class : %s");
            this.validateTranslationFunction(this.getinputFormatValueClass(), this.getValueTranslationFunction(), "Value translation's input type is not same as hadoop InputFormat :  %s value class : %s");
        }

        private void validateTranslationFunction(TypeDescriptor<?> inputType, SimpleFunction<?, ?> simpleFunction, String errorMsg) {
            if (simpleFunction != null && !simpleFunction.getInputTypeDescriptor().equals(inputType)) {
                throw new IllegalArgumentException(String.format(errorMsg, this.getinputFormatClass().getRawType(), inputType.getRawType()));
            }
        }

        public <T> Coder<T> getDefaultCoder(TypeDescriptor<?> typeDesc, CoderRegistry coderRegistry) {
            Class classType = typeDesc.getRawType();
            try {
                return coderRegistry.getCoder(typeDesc);
            }
            catch (CannotProvideCoderException e) {
                if (Writable.class.isAssignableFrom(classType)) {
                    return WritableCoder.of((Class)classType);
                }
                throw new IllegalStateException(String.format("Cannot find coder for %s  : ", typeDesc) + e.getMessage(), e);
            }
        }

        @AutoValue.Builder
        static abstract class Builder<K, V> {
            Builder() {
            }

            abstract Builder<K, V> setConfiguration(SerializableConfiguration var1);

            abstract Builder<K, V> setKeyTranslationFunction(SimpleFunction<?, K> var1);

            abstract Builder<K, V> setValueTranslationFunction(SimpleFunction<?, V> var1);

            abstract Builder<K, V> setKeyTypeDescriptor(TypeDescriptor<K> var1);

            abstract Builder<K, V> setValueTypeDescriptor(TypeDescriptor<V> var1);

            abstract Builder<K, V> setInputFormatClass(TypeDescriptor<?> var1);

            abstract Builder<K, V> setInputFormatKeyClass(TypeDescriptor<?> var1);

            abstract Builder<K, V> setInputFormatValueClass(TypeDescriptor<?> var1);

            abstract Read<K, V> build();
        }
    }
}

