/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.runtime;

import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.samza.runtime.KeyedInternals;
import org.apache.beam.runners.samza.runtime.KeyedTimerData;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.joda.time.Instant;

public class DoFnRunnerWithKeyedInternals<InputT, OutputT>
implements DoFnRunner<InputT, OutputT> {
    private final DoFnRunner<InputT, OutputT> underlying;
    private final KeyedInternals keyedInternals;

    DoFnRunnerWithKeyedInternals(DoFnRunner<InputT, OutputT> doFnRunner, KeyedInternals keyedInternals) {
        this.underlying = doFnRunner;
        this.keyedInternals = keyedInternals;
    }

    public void startBundle() {
        this.underlying.startBundle();
    }

    public void processElement(WindowedValue<InputT> elem) {
        this.setKeyedInternals(elem.getValue());
        try {
            this.underlying.processElement(elem);
        }
        finally {
            this.clearKeyedInternals();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public <KeyT> void onTimer(String timerId, String timerFamilyId, KeyT key, BoundedWindow window, Instant timestamp, Instant outputTimestamp, TimeDomain timeDomain) {
        this.setKeyedInternals(KV.of(key, null));
        try {
            this.underlying.onTimer(timerId, timerFamilyId, key, window, timestamp, outputTimestamp, timeDomain);
        }
        finally {
            this.clearKeyedInternals();
        }
    }

    public void finishBundle() {
        this.underlying.finishBundle();
    }

    public <KeyT> void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {
        this.underlying.onWindowExpiration(window, timestamp, key);
    }

    public DoFn<InputT, OutputT> getFn() {
        return this.underlying.getFn();
    }

    private void setKeyedInternals(Object value) {
        if (value instanceof KeyedWorkItem) {
            this.keyedInternals.setKey(((KeyedWorkItem)value).key());
        } else if (value instanceof KeyedTimerData) {
            Object key = ((KeyedTimerData)value).getKey();
            if (key != null) {
                this.keyedInternals.setKey(key);
            }
        } else if (value instanceof KV) {
            this.keyedInternals.setKey(((KV)value).getKey());
        } else {
            throw new UnsupportedOperationException(String.format("%s is not supported in %s", value.getClass(), DoFnRunnerWithKeyedInternals.class));
        }
    }

    private void clearKeyedInternals() {
        this.keyedInternals.clearKey();
    }
}

