/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kudu.mapreduce;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.Operation;
import org.apache.kudu.client.OperationResponse;
import org.apache.kudu.client.RowError;
import org.apache.kudu.client.SessionConfiguration;
import org.apache.kudu.mapreduce.KuduTableMapReduceUtil;
import org.apache.kudu.mapreduce.KuduTableOutputCommitter;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Public
@InterfaceStability.Evolving
public class KuduTableOutputFormat
extends OutputFormat<NullWritable, Operation>
implements Configurable {
    private static final Logger LOG = LoggerFactory.getLogger(KuduTableOutputFormat.class);
    static final String OUTPUT_TABLE_KEY = "kudu.mapreduce.output.table";
    static final String MASTER_ADDRESSES_KEY = "kudu.mapreduce.master.addresses";
    static final String OPERATION_TIMEOUT_MS_KEY = "kudu.mapreduce.operation.timeout.ms";
    static final String BUFFER_ROW_COUNT_KEY = "kudu.mapreduce.buffer.row.count";
    static final String MULTITON_KEY = "kudu.mapreduce.multitonkey";
    private static final ConcurrentHashMap<String, KuduTableOutputFormat> MULTITON = new ConcurrentHashMap();
    private Configuration conf = null;
    private KuduClient client;
    private KuduTable table;
    private KuduSession session;
    private long operationTimeoutMs;

    public void setConf(Configuration entries) {
        this.conf = new Configuration(entries);
        String masterAddress = this.conf.get(MASTER_ADDRESSES_KEY);
        String tableName = this.conf.get(OUTPUT_TABLE_KEY);
        this.operationTimeoutMs = this.conf.getLong(OPERATION_TIMEOUT_MS_KEY, 30000L);
        this.client = new KuduClient.KuduClientBuilder(masterAddress).defaultOperationTimeoutMs(this.operationTimeoutMs).build();
        KuduTableMapReduceUtil.importCredentialsFromCurrentSubject(this.client);
        try {
            this.table = this.client.openTable(tableName);
        }
        catch (Exception ex) {
            throw new RuntimeException("Could not obtain the table from the master, is the master running and is this table created? tablename=" + tableName + " and master address= " + masterAddress, ex);
        }
        this.session = this.client.newSession();
        this.session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
        this.session.setMutationBufferSpace(this.conf.getInt(BUFFER_ROW_COUNT_KEY, 1000));
        this.session.setIgnoreAllDuplicateRows(true);
        String multitonKey = String.valueOf(Thread.currentThread().getId());
        assert (MULTITON.get(multitonKey) == null);
        MULTITON.put(multitonKey, this);
        entries.set(MULTITON_KEY, multitonKey);
    }

    private void shutdownClient() throws IOException {
        try {
            this.client.shutdown();
        }
        catch (Exception e) {
            throw new IOException(e);
        }
    }

    public static KuduTable getKuduTable(String multitonKey) {
        return MULTITON.get(multitonKey).getKuduTable();
    }

    private KuduTable getKuduTable() {
        return this.table;
    }

    public Configuration getConf() {
        return this.conf;
    }

    public RecordWriter<NullWritable, Operation> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        return new TableRecordWriter(this.session);
    }

    public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException {
        this.shutdownClient();
    }

    public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        return new KuduTableOutputCommitter();
    }

    protected class TableRecordWriter
    extends RecordWriter<NullWritable, Operation> {
        private final AtomicLong rowsWithErrors = new AtomicLong();
        private final KuduSession session;

        public TableRecordWriter(KuduSession session) {
            this.session = session;
        }

        public void write(NullWritable key, Operation operation) throws IOException, InterruptedException {
            try {
                this.session.apply(operation);
            }
            catch (Exception e) {
                throw new IOException("Encountered an error while writing", e);
            }
        }

        public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
            try {
                this.processRowErrors(this.session.close());
                KuduTableOutputFormat.this.shutdownClient();
            }
            catch (Exception e) {
                throw new IOException("Encountered an error while closing this task", e);
            }
            finally {
                if (taskAttemptContext != null) {
                    taskAttemptContext.getCounter((Enum)Counters.ROWS_WITH_ERRORS).setValue(this.rowsWithErrors.get());
                }
            }
        }

        private void processRowErrors(List<OperationResponse> responses) {
            List errors = OperationResponse.collectErrors(responses);
            if (!errors.isEmpty()) {
                int rowErrorsCount = errors.size();
                this.rowsWithErrors.addAndGet(rowErrorsCount);
                LOG.warn("Got per errors for {} rows, the first one being {}", (Object)rowErrorsCount, (Object)((RowError)errors.get(0)).getErrorStatus());
            }
        }
    }

    public static enum Counters {
        ROWS_WITH_ERRORS;

    }
}

