/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.translation;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.beam.runners.core.serialization.Base64Serializer;
import org.apache.beam.runners.samza.runtime.OpMessage;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemFactory;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Instant;

public class SamzaTestStreamSystemFactory
implements SystemFactory {
    private static final @UnknownKeyFor @NonNull @Initialized String DUMMY_OFFSET = "0";

    public @UnknownKeyFor @NonNull @Initialized SystemConsumer getConsumer(@UnknownKeyFor @NonNull @Initialized String systemName, @UnknownKeyFor @NonNull @Initialized Config config, @UnknownKeyFor @NonNull @Initialized MetricsRegistry registry) {
        String streamPrefix = String.format("systems.%s.", systemName);
        Config scopedConfig = config.subset(streamPrefix, true);
        return new SamzaTestStreamSystemConsumer(SamzaTestStreamSystemFactory.getTestStream(scopedConfig));
    }

    public @UnknownKeyFor @NonNull @Initialized SystemProducer getProducer(@UnknownKeyFor @NonNull @Initialized String systemName, @UnknownKeyFor @NonNull @Initialized Config config, @UnknownKeyFor @NonNull @Initialized MetricsRegistry registry) {
        throw new UnsupportedOperationException("SamzaTestStreamSystem doesn't support producing");
    }

    public @UnknownKeyFor @NonNull @Initialized SystemAdmin getAdmin(@UnknownKeyFor @NonNull @Initialized String systemName, @UnknownKeyFor @NonNull @Initialized Config config) {
        return new SamzaTestStreamSystemAdmin();
    }

    private static <T> @UnknownKeyFor @NonNull @Initialized TestStream<T> getTestStream(@UnknownKeyFor @NonNull @Initialized Config config) {
        SerializableFunction testStreamDecoder = (SerializableFunction)Base64Serializer.deserializeUnchecked((String)((String)config.get((Object)"testStreamDecoder")), SerializableFunction.class);
        return (TestStream)testStreamDecoder.apply((Object)((String)config.get((Object)"encodedTestStream")));
    }

    public static class SamzaTestStreamSystemConsumer<@UnknownKeyFor T>
    implements SystemConsumer {
        @UnknownKeyFor @NonNull @Initialized TestStream<T> testStream;

        public SamzaTestStreamSystemConsumer(@UnknownKeyFor @NonNull @Initialized TestStream<T> testStream) {
            this.testStream = testStream;
        }

        public void start() {
        }

        public void stop() {
        }

        public void register(@UnknownKeyFor @NonNull @Initialized SystemStreamPartition systemStreamPartition, @UnknownKeyFor @NonNull @Initialized String offset) {
        }

        public @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized SystemStreamPartition, @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized IncomingMessageEnvelope>> poll(@UnknownKeyFor @NonNull @Initialized Set<@UnknownKeyFor @NonNull @Initialized SystemStreamPartition> systemStreamPartitions, @UnknownKeyFor @NonNull @Initialized long timeout) {
            SystemStreamPartition ssp = systemStreamPartitions.iterator().next();
            ArrayList<IncomingMessageEnvelope> messages = new ArrayList<IncomingMessageEnvelope>();
            for (TestStream.Event event : this.testStream.getEvents()) {
                if (event.getType().equals((Object)TestStream.EventType.ELEMENT)) {
                    for (TimestampedValue element : ((TestStream.ElementEvent)event).getElements()) {
                        WindowedValue windowedValue = WindowedValue.timestampedValueInGlobalWindow((Object)element.getValue(), (Instant)element.getTimestamp());
                        OpMessage opMessage = OpMessage.ofElement(windowedValue);
                        IncomingMessageEnvelope envelope = new IncomingMessageEnvelope(ssp, SamzaTestStreamSystemFactory.DUMMY_OFFSET, null, opMessage);
                        messages.add(envelope);
                    }
                    continue;
                }
                if (event.getType().equals((Object)TestStream.EventType.WATERMARK)) {
                    long watermarkMillis = ((TestStream.WatermarkEvent)event).getWatermark().getMillis();
                    IncomingMessageEnvelope envelope = IncomingMessageEnvelope.buildWatermarkEnvelope((SystemStreamPartition)ssp, (long)watermarkMillis);
                    messages.add(envelope);
                    if (watermarkMillis != BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) continue;
                    IncomingMessageEnvelope endOfStreamMessage = IncomingMessageEnvelope.buildEndOfStreamEnvelope((SystemStreamPartition)ssp);
                    messages.add(endOfStreamMessage);
                    break;
                }
                if (event.getType().equals((Object)TestStream.EventType.PROCESSING_TIME)) {
                    throw new UnsupportedOperationException("Advancing Processing time is not supported by the Samza Runner.");
                }
                throw new SamzaException("Unknown event type " + event.getType());
            }
            return ImmutableMap.of((Object)ssp, messages);
        }
    }

    public static class SamzaTestStreamSystemAdmin
    implements SystemAdmin {
        public @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized SystemStreamPartition, @UnknownKeyFor @NonNull @Initialized String> getOffsetsAfter(@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized SystemStreamPartition, @UnknownKeyFor @NonNull @Initialized String> offsets) {
            return offsets.keySet().stream().collect(Collectors.toMap(Function.identity(), k -> SamzaTestStreamSystemFactory.DUMMY_OFFSET));
        }

        public @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized SystemStreamMetadata> getSystemStreamMetadata(@UnknownKeyFor @NonNull @Initialized Set<@UnknownKeyFor @NonNull @Initialized String> streamNames) {
            return streamNames.stream().collect(Collectors.toMap(Function.identity(), stream -> {
                Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata = Collections.singletonMap(new Partition(0), new SystemStreamMetadata.SystemStreamPartitionMetadata(SamzaTestStreamSystemFactory.DUMMY_OFFSET, SamzaTestStreamSystemFactory.DUMMY_OFFSET, SamzaTestStreamSystemFactory.DUMMY_OFFSET));
                return new SystemStreamMetadata(stream, partitionMetadata);
            }));
        }

        public @UnknownKeyFor @NonNull @Initialized Integer offsetComparator(@UnknownKeyFor @NonNull @Initialized String offset1, @UnknownKeyFor @NonNull @Initialized String offset2) {
            return 0;
        }
    }
}

