/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.adapter;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.serialization.Base64Serializer;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.metrics.FnWithMetricsWrapper;
import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
import org.apache.beam.runners.samza.runtime.OpMessage;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemFactory;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UnboundedSourceSystem {
    private static final IncomingMessageEnvelope CHECK_LAST_EXCEPTION_ENVELOPE = new IncomingMessageEnvelope(null, null, null, null);

    private static <T, CheckpointMarkT extends UnboundedSource.CheckpointMark> List<UnboundedSource<T, CheckpointMarkT>> split(UnboundedSource<T, CheckpointMarkT> source, SamzaPipelineOptions pipelineOptions) throws Exception {
        List splits;
        int numSplits = pipelineOptions.getMaxSourceParallelism();
        if (numSplits > 1 && !(splits = source.split(numSplits, (PipelineOptions)pipelineOptions)).isEmpty()) {
            return splits;
        }
        return Collections.singletonList(source);
    }

    public static class Factory<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
    implements SystemFactory {
        public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) {
            String streamPrefix = "systems." + systemName;
            Config scopedConfig = config.subset(streamPrefix + ".", true);
            return new Consumer<T, CheckpointMarkT>(Factory.getUnboundedSource(scopedConfig), Factory.getPipelineOptions(config), new SamzaMetricsContainer((MetricsRegistryMap)registry), (String)scopedConfig.get((Object)"stepName"));
        }

        public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) {
            throw new UnsupportedOperationException("Cannot create a producer for an input system");
        }

        public SystemAdmin getAdmin(String systemName, Config config) {
            Config scopedConfig = config.subset("systems." + systemName + ".", true);
            return new Admin<T, CheckpointMarkT>(Factory.getUnboundedSource(scopedConfig), Factory.getPipelineOptions(config));
        }

        private static <T, CheckpointMarkT extends UnboundedSource.CheckpointMark> UnboundedSource<T, CheckpointMarkT> getUnboundedSource(Config config) {
            UnboundedSource source = (UnboundedSource)Base64Serializer.deserializeUnchecked((String)((String)config.get((Object)"source")), UnboundedSource.class);
            return source;
        }

        private static <T> Coder<WindowedValue<T>> getCoder(Config config) {
            return (Coder)Base64Serializer.deserializeUnchecked((String)((String)config.get((Object)"coder")), Coder.class);
        }

        private static SamzaPipelineOptions getPipelineOptions(Config config) {
            return (SamzaPipelineOptions)((SerializablePipelineOptions)Base64Serializer.deserializeUnchecked((String)((String)config.get((Object)"beamPipelineOptions")), SerializablePipelineOptions.class)).get().as(SamzaPipelineOptions.class);
        }
    }

    public static class Consumer<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
    implements SystemConsumer {
        private static final Logger LOG = LoggerFactory.getLogger(Consumer.class);
        private static final AtomicInteger NEXT_ID = new AtomicInteger();
        private final Coder<CheckpointMarkT> checkpointMarkCoder;
        private final List<UnboundedSource<T, CheckpointMarkT>> splits;
        private final SamzaPipelineOptions pipelineOptions;
        private final Map<UnboundedSource.UnboundedReader, SystemStreamPartition> readerToSsp = new HashMap<UnboundedSource.UnboundedReader, SystemStreamPartition>();
        private final SamzaMetricsContainer metricsContainer;
        private final String stepName;
        private ReaderTask<T, CheckpointMarkT> readerTask;

        Consumer(UnboundedSource<T, CheckpointMarkT> source, SamzaPipelineOptions pipelineOptions, SamzaMetricsContainer metricsContainer, String stepName) {
            try {
                this.splits = UnboundedSourceSystem.split(source, pipelineOptions);
            }
            catch (Exception e) {
                throw new SamzaException("Fail to split source", (Throwable)e);
            }
            this.checkpointMarkCoder = source.getCheckpointMarkCoder();
            this.pipelineOptions = pipelineOptions;
            this.metricsContainer = metricsContainer;
            this.stepName = stepName;
        }

        public void start() {
            if (this.readerToSsp.isEmpty()) {
                throw new IllegalArgumentException("Attempted to call start without assigned system stream partitions");
            }
            FnWithMetricsWrapper metricsWrapper = this.pipelineOptions.getEnableMetrics() != false ? new FnWithMetricsWrapper(this.metricsContainer, this.stepName) : null;
            this.readerTask = new ReaderTask(this.readerToSsp, this.checkpointMarkCoder, this.pipelineOptions.getSystemBufferSize(), this.pipelineOptions.getWatermarkInterval(), metricsWrapper);
            Thread thread = new Thread(this.readerTask, "unbounded-source-system-consumer-" + NEXT_ID.getAndIncrement());
            thread.start();
        }

        public void stop() {
            this.readerTask.stop();
        }

        public void register(SystemStreamPartition ssp, String offset) {
            UnboundedSource.CheckpointMark checkpoint = null;
            if (StringUtils.isNoneEmpty((CharSequence[])new CharSequence[]{offset})) {
                byte[] offsetBytes = Base64.getDecoder().decode(offset);
                ByteArrayInputStream bais = new ByteArrayInputStream(offsetBytes);
                try {
                    checkpoint = (UnboundedSource.CheckpointMark)this.checkpointMarkCoder.decode((InputStream)bais);
                }
                catch (Exception e) {
                    throw new SamzaException("Error in decode offset", (Throwable)e);
                }
            }
            int partitionId = ssp.getPartition().getPartitionId();
            try {
                UnboundedSource.UnboundedReader reader = this.splits.get(partitionId).createReader((PipelineOptions)this.pipelineOptions, checkpoint);
                this.readerToSsp.put(reader, ssp);
            }
            catch (Exception e) {
                throw new SamzaException("Error while creating source reader for ssp: " + ssp, (Throwable)e);
            }
        }

        public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException {
            assert (!this.readerToSsp.isEmpty());
            HashMap<SystemStreamPartition, List<IncomingMessageEnvelope>> envelopes = new HashMap<SystemStreamPartition, List<IncomingMessageEnvelope>>();
            for (SystemStreamPartition ssp : systemStreamPartitions) {
                envelopes.put(ssp, this.readerTask.getNextMessages(ssp, timeout));
            }
            return envelopes;
        }

        private static class ReaderTask<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
        implements Runnable {
            private final Map<UnboundedSource.UnboundedReader, SystemStreamPartition> readerToSsp;
            private final List<UnboundedSource.UnboundedReader> readers;
            private final Coder<CheckpointMarkT> checkpointMarkCoder;
            private final Map<SystemStreamPartition, Instant> currentWatermarks = new HashMap<SystemStreamPartition, Instant>();
            private final Map<SystemStreamPartition, LinkedBlockingQueue<IncomingMessageEnvelope>> queues;
            private final long watermarkInterval;
            private final Semaphore available;
            private final FnWithMetricsWrapper metricsWrapper;
            private volatile boolean running;
            private volatile Exception lastException;
            private long lastWatermarkTime = 0L;

            private ReaderTask(Map<UnboundedSource.UnboundedReader, SystemStreamPartition> readerToSsp, Coder<CheckpointMarkT> checkpointMarkCoder, int capacity, long watermarkInterval, FnWithMetricsWrapper metricsWrapper) {
                this.readerToSsp = readerToSsp;
                this.checkpointMarkCoder = checkpointMarkCoder;
                this.readers = ImmutableList.copyOf(readerToSsp.keySet());
                this.watermarkInterval = watermarkInterval;
                this.available = new Semaphore(capacity);
                this.metricsWrapper = metricsWrapper;
                HashMap qs = new HashMap();
                readerToSsp.values().forEach(ssp -> qs.put(ssp, new LinkedBlockingQueue()));
                this.queues = ImmutableMap.copyOf(qs);
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                this.running = true;
                try {
                    for (UnboundedSource.UnboundedReader reader2 : this.readers) {
                        boolean hasData = this.invoke(() -> ((UnboundedSource.UnboundedReader)reader2).start());
                        if (!hasData) continue;
                        this.available.acquire();
                        this.enqueueMessage(reader2);
                    }
                    while (this.running) {
                        boolean elementAvailable = false;
                        for (UnboundedSource.UnboundedReader reader3 : this.readers) {
                            boolean hasData = this.invoke(() -> ((UnboundedSource.UnboundedReader)reader3).advance());
                            if (!hasData) continue;
                            while (!this.available.tryAcquire(1, Math.max(this.lastWatermarkTime + this.watermarkInterval - System.currentTimeMillis(), 1L), TimeUnit.MILLISECONDS)) {
                                this.updateWatermark();
                            }
                            this.enqueueMessage(reader3);
                            elementAvailable = true;
                        }
                        this.updateWatermark();
                        if (elementAvailable) continue;
                        Thread.sleep(50L);
                    }
                }
                catch (Exception e) {
                    this.lastException = e;
                    this.running = false;
                }
                finally {
                    this.readers.forEach(reader -> {
                        try {
                            reader.close();
                        }
                        catch (IOException e) {
                            LOG.error("Reader task failed to close reader", (Throwable)e);
                        }
                    });
                }
                if (this.lastException != null) {
                    this.queues.values().forEach(queue -> {
                        queue.clear();
                        queue.add(CHECK_LAST_EXCEPTION_ENVELOPE);
                    });
                }
            }

            private <X> X invoke(FnWithMetricsWrapper.SupplierWithException<X> fn) throws Exception {
                if (this.metricsWrapper != null) {
                    return this.metricsWrapper.wrap(fn);
                }
                return fn.get();
            }

            private void updateWatermark() throws InterruptedException {
                long time = System.currentTimeMillis();
                if (time - this.lastWatermarkTime > this.watermarkInterval) {
                    for (UnboundedSource.UnboundedReader reader : this.readers) {
                        Instant nextWatermark;
                        SystemStreamPartition ssp = this.readerToSsp.get(reader);
                        Instant currentWatermark = this.currentWatermarks.containsKey(ssp) ? this.currentWatermarks.get(ssp) : BoundedWindow.TIMESTAMP_MIN_VALUE;
                        if (!currentWatermark.isBefore((ReadableInstant)(nextWatermark = reader.getWatermark()))) continue;
                        this.currentWatermarks.put(ssp, nextWatermark);
                        this.enqueueWatermark(reader);
                    }
                    this.lastWatermarkTime = time;
                }
            }

            private void enqueueWatermark(UnboundedSource.UnboundedReader reader) throws InterruptedException {
                SystemStreamPartition ssp = this.readerToSsp.get(reader);
                IncomingMessageEnvelope envelope = IncomingMessageEnvelope.buildWatermarkEnvelope((SystemStreamPartition)ssp, (long)reader.getWatermark().getMillis());
                this.queues.get(ssp).put(envelope);
            }

            private void enqueueMessage(UnboundedSource.UnboundedReader reader) throws InterruptedException {
                Object value = reader.getCurrent();
                Instant time = reader.getCurrentTimestamp();
                SystemStreamPartition ssp = this.readerToSsp.get(reader);
                WindowedValue windowedValue = WindowedValue.timestampedValueInGlobalWindow((Object)value, (Instant)time);
                OpMessage opMessage = OpMessage.ofElement(windowedValue);
                IncomingMessageEnvelope envelope = new IncomingMessageEnvelope(ssp, this.getOffset(reader), null, opMessage);
                this.queues.get(ssp).put(envelope);
            }

            void stop() {
                this.running = false;
            }

            List<IncomingMessageEnvelope> getNextMessages(SystemStreamPartition ssp, long timeoutMillis) throws InterruptedException {
                if (this.lastException != null) {
                    throw new RuntimeException(this.lastException);
                }
                ArrayList<IncomingMessageEnvelope> envelopes = new ArrayList<IncomingMessageEnvelope>();
                BlockingQueue queue = this.queues.get(ssp);
                IncomingMessageEnvelope envelope = (IncomingMessageEnvelope)queue.poll(timeoutMillis, TimeUnit.MILLISECONDS);
                if (envelope != null) {
                    envelopes.add(envelope);
                    queue.drainTo(envelopes);
                }
                int numElements = (int)envelopes.stream().filter(ev -> ev.getMessage() instanceof OpMessage).count();
                this.available.release(numElements);
                if (this.lastException != null) {
                    throw new RuntimeException(this.lastException);
                }
                return envelopes;
            }

            private String getOffset(UnboundedSource.UnboundedReader reader) {
                try {
                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
                    UnboundedSource.CheckpointMark checkpointMark = this.invoke(() -> ((UnboundedSource.UnboundedReader)reader).getCheckpointMark());
                    this.checkpointMarkCoder.encode((Object)checkpointMark, (OutputStream)baos);
                    return Base64.getEncoder().encodeToString(baos.toByteArray());
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

    public static class Admin<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
    implements SystemAdmin {
        private final UnboundedSource<T, CheckpointMarkT> source;
        private final SamzaPipelineOptions pipelineOptions;

        public Admin(UnboundedSource<T, CheckpointMarkT> source, SamzaPipelineOptions pipelineOptions) {
            this.source = source;
            this.pipelineOptions = pipelineOptions;
        }

        public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
            return offsets;
        }

        public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) {
            return streamNames.stream().collect(Collectors.toMap(Function.identity(), streamName -> {
                try {
                    List splits = UnboundedSourceSystem.split(this.source, this.pipelineOptions);
                    HashMap<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetaData = new HashMap<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata>();
                    for (int i = 0; i < splits.size(); ++i) {
                        partitionMetaData.put(new Partition(i), new SystemStreamMetadata.SystemStreamPartitionMetadata(null, null, null));
                    }
                    return new SystemStreamMetadata(streamName, partitionMetaData);
                }
                catch (Exception e) {
                    throw new SamzaException("Fail to read stream metadata", (Throwable)e);
                }
            }));
        }

        public Integer offsetComparator(String offset1, String offset2) {
            return null;
        }
    }
}

