/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.adapter;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.beam.repackaged.beam_runners_samza.com.google.common.collect.ImmutableMap;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.metrics.FnWithMetricsWrapper;
import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
import org.apache.beam.runners.samza.runtime.OpMessage;
import org.apache.beam.runners.samza.util.Base64Serializer;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemFactory;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BoundedSourceSystem {
    public static <T> Map<String, String> createConfigFor(String id, BoundedSource<T> source, Coder<WindowedValue<T>> coder, String stepName) {
        HashMap<String, String> config = new HashMap<String, String>();
        String streamPrefix = "systems." + id;
        config.put(streamPrefix + ".samza.factory", Factory.class.getName());
        config.put(streamPrefix + ".source", Base64Serializer.serializeUnchecked(source));
        config.put(streamPrefix + ".coder", Base64Serializer.serializeUnchecked(coder));
        config.put(streamPrefix + ".stepName", stepName);
        config.put("streams." + id + ".samza.system", id);
        config.put("streams." + id + ".samza.bounded", "true");
        return config;
    }

    private static <T> List<BoundedSource<T>> split(BoundedSource<T> source, SamzaPipelineOptions pipelineOptions) throws Exception {
        long estimatedSize;
        long bundleSize;
        List splits;
        int numSplits = pipelineOptions.getMaxSourceParallelism();
        if (numSplits > 1 && !(splits = source.split(bundleSize = ((estimatedSize = source.getEstimatedSizeBytes((PipelineOptions)pipelineOptions)) + (long)numSplits - 1L) / (long)numSplits, (PipelineOptions)pipelineOptions)).isEmpty()) {
            return splits;
        }
        return Collections.singletonList(source);
    }

    public static class Factory<T>
    implements SystemFactory {
        public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) {
            String streamPrefix = "systems." + systemName;
            Config scopedConfig = config.subset(streamPrefix + ".", true);
            return new Consumer<T>(Factory.getBoundedSource(scopedConfig), Factory.getPipelineOptions(config), new SamzaMetricsContainer((MetricsRegistryMap)registry), (String)scopedConfig.get((Object)"stepName"));
        }

        public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) {
            throw new UnsupportedOperationException("Cannot create a producer for an input system");
        }

        public SystemAdmin getAdmin(String systemName, Config config) {
            Config scopedConfig = config.subset("systems." + systemName + ".", true);
            return new Admin<T>(Factory.getBoundedSource(scopedConfig), Factory.getPipelineOptions(config));
        }

        private static <T> BoundedSource<T> getBoundedSource(Config config) {
            BoundedSource source = Base64Serializer.deserializeUnchecked((String)config.get((Object)"source"), BoundedSource.class);
            return source;
        }

        private static <T> Coder<WindowedValue<T>> getCoder(Config config) {
            return Base64Serializer.deserializeUnchecked((String)config.get((Object)"coder"), Coder.class);
        }

        private static SamzaPipelineOptions getPipelineOptions(Config config) {
            return (SamzaPipelineOptions)Base64Serializer.deserializeUnchecked((String)config.get((Object)"beamPipelineOptions"), SerializablePipelineOptions.class).get().as(SamzaPipelineOptions.class);
        }
    }

    public static class Consumer<T>
    implements SystemConsumer {
        private static final Logger LOG = LoggerFactory.getLogger(Consumer.class);
        private static final AtomicInteger NEXT_ID = new AtomicInteger();
        private final List<BoundedSource<T>> splits;
        private final SamzaPipelineOptions pipelineOptions;
        private final Map<BoundedSource.BoundedReader<T>, SystemStreamPartition> readerToSsp = new HashMap<BoundedSource.BoundedReader<T>, SystemStreamPartition>();
        private final SamzaMetricsContainer metricsContainer;
        private final String stepName;
        private ReaderTask<T> readerTask;

        Consumer(BoundedSource<T> source, SamzaPipelineOptions pipelineOptions, SamzaMetricsContainer metricsContainer, String stepName) {
            try {
                this.splits = BoundedSourceSystem.split(source, pipelineOptions);
            }
            catch (Exception e) {
                throw new SamzaException("Fail to split source", (Throwable)e);
            }
            this.pipelineOptions = pipelineOptions;
            this.metricsContainer = metricsContainer;
            this.stepName = stepName;
        }

        public void start() {
            if (this.readerToSsp.isEmpty()) {
                throw new IllegalArgumentException("Attempted to call start without assigned system stream partitions");
            }
            int capacity = this.pipelineOptions.getSystemBufferSize();
            this.readerTask = new ReaderTask(this.readerToSsp, capacity, new FnWithMetricsWrapper(this.metricsContainer, this.stepName));
            Thread thread = new Thread(this.readerTask, "bounded-source-system-consumer-" + NEXT_ID.getAndIncrement());
            thread.start();
        }

        public void stop() {
            if (this.readerTask != null) {
                ((ReaderTask)this.readerTask).stop();
            }
        }

        public void register(SystemStreamPartition ssp, String offset) {
            int partitionId = ssp.getPartition().getPartitionId();
            try {
                BoundedSource.BoundedReader reader = this.splits.get(partitionId).createReader((PipelineOptions)this.pipelineOptions);
                this.readerToSsp.put(reader, ssp);
            }
            catch (Exception e) {
                throw new SamzaException("Error while creating source reader for ssp: " + ssp, (Throwable)e);
            }
        }

        public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException {
            assert (!this.readerToSsp.isEmpty());
            HashMap<SystemStreamPartition, List<IncomingMessageEnvelope>> envelopes = new HashMap<SystemStreamPartition, List<IncomingMessageEnvelope>>();
            for (SystemStreamPartition ssp : systemStreamPartitions) {
                envelopes.put(ssp, ((ReaderTask)this.readerTask).getNextMessages(ssp, timeout));
            }
            return envelopes;
        }

        private static class ReaderTask<T>
        implements Runnable {
            private final Map<BoundedSource.BoundedReader<T>, SystemStreamPartition> readerToSsp;
            private final Map<SystemStreamPartition, LinkedBlockingQueue<IncomingMessageEnvelope>> queues;
            private final Semaphore available;
            private final FnWithMetricsWrapper metricsWrapper;
            private long offset;
            private volatile Thread readerThread;
            private volatile boolean stopInvoked = false;
            private volatile Exception lastException;

            private ReaderTask(Map<BoundedSource.BoundedReader<T>, SystemStreamPartition> readerToSsp, int capacity, FnWithMetricsWrapper metricsWrapper) {
                this.readerToSsp = readerToSsp;
                this.available = new Semaphore(capacity);
                this.metricsWrapper = metricsWrapper;
                HashMap qs = new HashMap();
                readerToSsp.values().forEach(ssp -> qs.put(ssp, new LinkedBlockingQueue()));
                this.queues = ImmutableMap.copyOf(qs);
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                this.readerThread = Thread.currentThread();
                HashSet<BoundedSource.BoundedReader<BoundedSource.BoundedReader>> availableReaders = new HashSet<BoundedSource.BoundedReader<BoundedSource.BoundedReader>>(this.readerToSsp.keySet());
                try {
                    boolean hasData;
                    for (BoundedSource.BoundedReader<T> boundedReader : this.readerToSsp.keySet()) {
                        hasData = this.metricsWrapper.wrap(() -> boundedReader.start());
                        if (hasData) {
                            this.enqueueMessage(boundedReader);
                            continue;
                        }
                        this.enqueueMaxWatermarkAndEndOfStream(boundedReader);
                        boundedReader.close();
                        availableReaders.remove(boundedReader);
                    }
                    while (!this.stopInvoked && !availableReaders.isEmpty()) {
                        Iterator iter = availableReaders.iterator();
                        while (iter.hasNext()) {
                            BoundedSource.BoundedReader boundedReader = (BoundedSource.BoundedReader)iter.next();
                            hasData = this.metricsWrapper.wrap(() -> ((BoundedSource.BoundedReader)boundedReader).advance());
                            if (hasData) {
                                this.enqueueMessage(boundedReader);
                                continue;
                            }
                            this.enqueueMaxWatermarkAndEndOfStream(boundedReader);
                            boundedReader.close();
                            iter.remove();
                        }
                    }
                }
                catch (InterruptedException iter) {
                }
                catch (Exception e) {
                    this.setError(e);
                }
                finally {
                    availableReaders.forEach(reader -> {
                        try {
                            reader.close();
                        }
                        catch (IOException e) {
                            LOG.error("Reader task failed to close reader for ssp {}", (Object)this.readerToSsp.get(reader), (Object)e);
                        }
                    });
                }
            }

            private void enqueueMessage(BoundedSource.BoundedReader<T> reader) throws InterruptedException {
                Object value = reader.getCurrent();
                WindowedValue windowedValue = WindowedValue.timestampedValueInGlobalWindow((Object)value, (Instant)reader.getCurrentTimestamp());
                SystemStreamPartition ssp = this.readerToSsp.get(reader);
                IncomingMessageEnvelope envelope = new IncomingMessageEnvelope(ssp, Long.toString(this.offset++), null, OpMessage.ofElement(windowedValue));
                this.available.acquire();
                this.queues.get(ssp).put(envelope);
            }

            private void enqueueMaxWatermarkAndEndOfStream(BoundedSource.BoundedReader<T> reader) {
                SystemStreamPartition ssp = this.readerToSsp.get(reader);
                IncomingMessageEnvelope watermarkEnvelope = IncomingMessageEnvelope.buildWatermarkEnvelope((SystemStreamPartition)ssp, (long)BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
                this.enqueueUninterruptibly(watermarkEnvelope);
                IncomingMessageEnvelope endOfStreamEnvelope = IncomingMessageEnvelope.buildEndOfStreamEnvelope((SystemStreamPartition)ssp);
                this.enqueueUninterruptibly(endOfStreamEnvelope);
            }

            private void stop() {
                this.stopInvoked = true;
                Thread readerThread = this.readerThread;
                if (readerThread != null) {
                    readerThread.interrupt();
                }
            }

            private List<IncomingMessageEnvelope> getNextMessages(SystemStreamPartition ssp, long timeoutMillis) throws InterruptedException {
                if (this.lastException != null) {
                    throw new RuntimeException(this.lastException);
                }
                ArrayList<IncomingMessageEnvelope> envelopes = new ArrayList<IncomingMessageEnvelope>();
                BlockingQueue queue = this.queues.get(ssp);
                IncomingMessageEnvelope envelope = (IncomingMessageEnvelope)queue.poll(timeoutMillis, TimeUnit.MILLISECONDS);
                if (envelope != null) {
                    envelopes.add(envelope);
                    queue.drainTo(envelopes);
                }
                this.available.release(envelopes.size());
                if (this.lastException != null) {
                    throw new RuntimeException(this.lastException);
                }
                return envelopes;
            }

            private void setError(Exception exception) {
                this.lastException = exception;
                this.readerToSsp.values().forEach(ssp -> {
                    IncomingMessageEnvelope checkLastExceptionEvelope = new IncomingMessageEnvelope(ssp, null, null, null);
                    this.enqueueUninterruptibly(checkLastExceptionEvelope);
                });
            }

            private void enqueueUninterruptibly(IncomingMessageEnvelope envelope) {
                BlockingQueue queue = this.queues.get(envelope.getSystemStreamPartition());
                while (true) {
                    try {
                        queue.put(envelope);
                        return;
                    }
                    catch (InterruptedException interruptedException) {
                        continue;
                    }
                    break;
                }
            }
        }
    }

    public static class Admin<T>
    implements SystemAdmin {
        private final BoundedSource<T> source;
        private final SamzaPipelineOptions pipelineOptions;

        public Admin(BoundedSource<T> source, SamzaPipelineOptions pipelineOptions) {
            this.source = source;
            this.pipelineOptions = pipelineOptions;
        }

        public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
            return offsets.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, null));
        }

        public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) {
            return streamNames.stream().collect(Collectors.toMap(Function.identity(), streamName -> {
                try {
                    List splits = BoundedSourceSystem.split(this.source, this.pipelineOptions);
                    HashMap<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetaData = new HashMap<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata>();
                    for (int i = 0; i < splits.size(); ++i) {
                        partitionMetaData.put(new Partition(i), new SystemStreamMetadata.SystemStreamPartitionMetadata(null, null, null));
                    }
                    return new SystemStreamMetadata(streamName, partitionMetaData);
                }
                catch (Exception e) {
                    throw new SamzaException("Fail to read stream metadata", (Throwable)e);
                }
            }));
        }

        public Integer offsetComparator(String offset1, String offset2) {
            if (offset1 == null) {
                return offset2 == null ? 0 : -1;
            }
            if (offset2 == null) {
                return 1;
            }
            return Long.valueOf(offset1).compareTo(Long.valueOf(offset2));
        }
    }
}

