/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.util.test;

import com.google.common.base.Charsets;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.source.Source;
import org.apache.gobblin.source.extractor.DataRecordException;
import org.apache.gobblin.source.extractor.Extractor;
import org.apache.gobblin.source.workunit.Extract;
import org.apache.gobblin.source.workunit.WorkUnit;

public class StressTestingSource
implements Source<String, byte[]> {
    public static final String CONFIG_NAMESPACE = "stressTest";
    public static final String NUM_WORK_UNITS_KEY = "stressTest.numWorkUnits";
    public static final int DEFAULT_NUM_WORK_UNITS = 1;
    public static final String RUN_DURATION_KEY = "stressTest.runDurationSecs";
    public static final int DEFAULT_RUN_DURATION = 0;
    public static final String COMPUTE_TIME_MICRO_KEY = "stressTest.computeTimeMicro";
    public static final int DEFAULT_COMPUTE_TIME_MICRO = 0;
    public static final String SLEEP_TIME_MICRO_KEY = "stressTest.sleepTimeMicro";
    public static final int DEFAULT_SLEEP_TIME = 0;
    public static final String NUM_RECORDS_KEY = "stressTest.numRecords";
    public static final int DEFAULT_NUM_RECORDS = 1;
    public static final String MEM_ALLOC_BYTES_KEY = "stressTest.memAllocBytes";
    public static final int DEFAULT_MEM_ALLOC_BYTES = 8;
    private static final long INVALID_TIME = -1L;

    public List<WorkUnit> getWorkunits(SourceState state) {
        int numWorkUnits = state.getPropAsInt(NUM_WORK_UNITS_KEY, 1);
        Extract extract = new Extract(Extract.TableType.APPEND_ONLY, StressTestingSource.class.getPackage().getName(), StressTestingSource.class.getSimpleName());
        ArrayList<WorkUnit> wus = new ArrayList<WorkUnit>(numWorkUnits);
        for (int i = 1; i <= numWorkUnits; ++i) {
            WorkUnit wu = new WorkUnit(extract);
            wus.add(wu);
        }
        return wus;
    }

    public Extractor<String, byte[]> getExtractor(WorkUnitState state) {
        return new ExtractorImpl(state);
    }

    public void shutdown(SourceState state) {
    }

    public static class ExtractorImpl
    implements Extractor<String, byte[]> {
        private int recordsEmitted = 0;
        private final long startTime;
        private final long endTime;
        private final int computeTimeNano;
        private final int sleepTimeMicro;
        private final int numRecords;
        private final int memAllocBytes;
        private final Random random = new Random();

        public ExtractorImpl(WorkUnitState state) {
            this.startTime = System.currentTimeMillis();
            int runDuration = state.getPropAsInt(StressTestingSource.RUN_DURATION_KEY, 0);
            this.endTime = runDuration > 0 ? this.startTime + (long)(runDuration * 1000) : -1L;
            this.computeTimeNano = state.getPropAsInt(StressTestingSource.COMPUTE_TIME_MICRO_KEY, 0) * 1000;
            this.sleepTimeMicro = state.getPropAsInt(StressTestingSource.SLEEP_TIME_MICRO_KEY, 0);
            this.numRecords = this.endTime == -1L ? state.getPropAsInt(StressTestingSource.NUM_RECORDS_KEY, 1) : 0;
            this.memAllocBytes = state.getPropAsInt(StressTestingSource.MEM_ALLOC_BYTES_KEY, 8);
        }

        public void close() throws IOException {
        }

        public String getSchema() throws IOException {
            return "string";
        }

        public byte[] readRecord(byte[] reuse) throws DataRecordException, IOException {
            if (this.endTime != -1L && System.currentTimeMillis() > this.endTime || this.numRecords > 0 && this.recordsEmitted >= this.numRecords) {
                return null;
            }
            if (this.computeTimeNano > 0) {
                long startComputeNanoTime = System.nanoTime();
                byte[] bytes = new byte[100];
                while (System.nanoTime() - startComputeNanoTime < (long)this.computeTimeNano) {
                    this.random.nextBytes(bytes);
                }
            }
            if (this.sleepTimeMicro > 0) {
                try {
                    TimeUnit.MICROSECONDS.sleep(this.sleepTimeMicro);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            }
            ++this.recordsEmitted;
            return this.newMessage(this.memAllocBytes);
        }

        public long getExpectedRecordCount() {
            return this.numRecords;
        }

        public long getHighWatermark() {
            return 0L;
        }

        private byte[] newMessage(int numBytes) {
            byte[] stringBytes = String.valueOf(this.recordsEmitted).getBytes(Charsets.UTF_8);
            return Arrays.copyOf(stringBytes, numBytes);
        }
    }
}

