/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.hystrix.examples.basic;

import com.netflix.hystrix.HystrixCollapser;
import com.netflix.hystrix.HystrixObservableCollapser;
import com.netflix.hystrix.HystrixObservableCommand;
import com.netflix.hystrix.HystrixRequestLog;
import com.netflix.hystrix.examples.basic.ObservableCommandNumbersToWords;
import com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;

public class ObservableCollapserGetWordForNumber
extends HystrixObservableCollapser<Integer, ObservableCommandNumbersToWords.NumberWord, String, Integer> {
    private final Integer number;
    private static final AtomicInteger counter = new AtomicInteger();

    public static void resetCmdCounter() {
        counter.set(0);
    }

    public static int getCmdCount() {
        return counter.get();
    }

    public ObservableCollapserGetWordForNumber(Integer number) {
        this.number = number;
    }

    public Integer getRequestArgument() {
        return this.number;
    }

    protected HystrixObservableCommand<ObservableCommandNumbersToWords.NumberWord> createCommand(Collection<HystrixCollapser.CollapsedRequest<String, Integer>> requests) {
        int count = counter.incrementAndGet();
        System.out.println("Creating batch for " + requests.size() + " requests. Total invocations so far: " + count);
        ArrayList<Integer> numbers = new ArrayList<Integer>();
        for (HystrixCollapser.CollapsedRequest<String, Integer> request : requests) {
            numbers.add((Integer)request.getArgument());
        }
        return new ObservableCommandNumbersToWords(numbers);
    }

    protected Func1<ObservableCommandNumbersToWords.NumberWord, Integer> getBatchReturnTypeKeySelector() {
        return new Func1<ObservableCommandNumbersToWords.NumberWord, Integer>(){

            public Integer call(ObservableCommandNumbersToWords.NumberWord nw) {
                return nw.getNumber();
            }
        };
    }

    protected Func1<Integer, Integer> getRequestArgumentKeySelector() {
        return new Func1<Integer, Integer>(){

            public Integer call(Integer no) {
                return no;
            }
        };
    }

    protected Func1<ObservableCommandNumbersToWords.NumberWord, String> getBatchReturnTypeToResponseTypeMapper() {
        return new Func1<ObservableCommandNumbersToWords.NumberWord, String>(){

            public String call(ObservableCommandNumbersToWords.NumberWord nw) {
                return nw.getWord();
            }
        };
    }

    protected void onMissingResponse(HystrixCollapser.CollapsedRequest<String, Integer> request) {
        request.setException(new Exception("No word"));
    }

    public static class ObservableCollapserGetWordForNumberTest {
        private HystrixRequestContext ctx;

        @Before
        public void before() {
            this.ctx = HystrixRequestContext.initializeContext();
            ObservableCollapserGetWordForNumber.resetCmdCounter();
        }

        @After
        public void after() {
            System.out.println(HystrixRequestLog.getCurrentRequest().getExecutedCommandsAsString());
            this.ctx.shutdown();
        }

        @Test
        public void shouldCollapseRequestsSync() {
            TestSubscriber subscriber;
            int noOfRequests = 10;
            HashMap<Integer, TestSubscriber> subscribersByNumber = new HashMap<Integer, TestSubscriber>(10);
            for (int number = 0; number < 10; ++number) {
                subscriber = new TestSubscriber();
                new ObservableCollapserGetWordForNumber(number).toObservable().subscribe((Subscriber)subscriber);
                subscribersByNumber.put(number, subscriber);
                if (number != 5) continue;
                this.sleep(1000L);
            }
            Assert.assertThat((Object)subscribersByNumber.size(), (Matcher)CoreMatchers.is((Object)10));
            for (Map.Entry subscriberByNumber : subscribersByNumber.entrySet()) {
                subscriber = (TestSubscriber)subscriberByNumber.getValue();
                subscriber.awaitTerminalEvent(10L, TimeUnit.SECONDS);
                Assert.assertThat((String)subscriber.getOnErrorEvents().toString(), (Object)subscriber.getOnErrorEvents().size(), (Matcher)CoreMatchers.is((Object)0));
                Assert.assertThat((Object)subscriber.getOnNextEvents().size(), (Matcher)CoreMatchers.is((Object)1));
                String word = (String)subscriber.getOnNextEvents().get(0);
                System.out.println("Translated " + subscriberByNumber.getKey() + " to " + word);
                Assert.assertThat((Object)word, (Matcher)CoreMatchers.equalTo((Object)this.numberToWord((Integer)subscriberByNumber.getKey())));
            }
            Assert.assertTrue((ObservableCollapserGetWordForNumber.getCmdCount() > 1 ? 1 : 0) != 0);
            Assert.assertTrue((ObservableCollapserGetWordForNumber.getCmdCount() < 10 ? 1 : 0) != 0);
        }

        @Test
        public void shouldCollapseRequestsAsync() {
            TestSubscriber subscriber;
            HystrixContextScheduler contextAwareScheduler = new HystrixContextScheduler(Schedulers.computation());
            int noOfRequests = 10;
            HashMap<Integer, TestSubscriber> subscribersByNumber = new HashMap<Integer, TestSubscriber>(10);
            for (int number = 0; number < 10; ++number) {
                subscriber = new TestSubscriber();
                final int finalNumber = number;
                Observable.defer((Func0)new Func0<Observable<String>>(){

                    public Observable<String> call() {
                        return new ObservableCollapserGetWordForNumber(finalNumber).toObservable();
                    }
                }).subscribeOn((Scheduler)contextAwareScheduler).subscribe((Subscriber)subscriber);
                subscribersByNumber.put(number, subscriber);
                if (number != 5) continue;
                this.sleep(1000L);
            }
            Assert.assertThat((Object)subscribersByNumber.size(), (Matcher)CoreMatchers.is((Object)10));
            for (Map.Entry subscriberByNumber : subscribersByNumber.entrySet()) {
                subscriber = (TestSubscriber)subscriberByNumber.getValue();
                subscriber.awaitTerminalEvent(10L, TimeUnit.SECONDS);
                Assert.assertThat((String)subscriber.getOnErrorEvents().toString(), (Object)subscriber.getOnErrorEvents().size(), (Matcher)CoreMatchers.is((Object)0));
                Assert.assertThat((Object)subscriber.getOnNextEvents().size(), (Matcher)CoreMatchers.is((Object)1));
                String word = (String)subscriber.getOnNextEvents().get(0);
                System.out.println("Translated " + subscriberByNumber.getKey() + " to " + word);
                Assert.assertThat((Object)word, (Matcher)CoreMatchers.equalTo((Object)this.numberToWord((Integer)subscriberByNumber.getKey())));
            }
            Assert.assertTrue((ObservableCollapserGetWordForNumber.getCmdCount() > 1 ? 1 : 0) != 0);
            Assert.assertTrue((ObservableCollapserGetWordForNumber.getCmdCount() < 10 ? 1 : 0) != 0);
        }

        @Test
        public void shouldCollapseSameRequests() {
            HystrixContextScheduler contextAwareScheduler = new HystrixContextScheduler(Schedulers.computation());
            TestSubscriber<String> subscriber1 = this.getWordForNumber(contextAwareScheduler, 0);
            TestSubscriber<String> subscriber2 = this.getWordForNumber(contextAwareScheduler, 0);
            this.subscriberReceived(subscriber1, 0);
            this.subscriberReceived(subscriber2, 0);
        }

        private TestSubscriber<String> getWordForNumber(HystrixContextScheduler contextAwareScheduler, final int number) {
            TestSubscriber subscriber = new TestSubscriber();
            Observable.defer((Func0)new Func0<Observable<String>>(){

                public Observable<String> call() {
                    return new ObservableCollapserGetWordForNumber(number).toObservable();
                }
            }).subscribeOn((Scheduler)contextAwareScheduler).subscribe((Subscriber)subscriber);
            return subscriber;
        }

        private void subscriberReceived(TestSubscriber<String> subscriber, int number) {
            subscriber.awaitTerminalEvent(10L, TimeUnit.SECONDS);
            Assert.assertThat((String)subscriber.getOnErrorEvents().toString(), (Object)subscriber.getOnErrorEvents().size(), (Matcher)CoreMatchers.is((Object)0));
            Assert.assertThat((Object)subscriber.getOnNextEvents().size(), (Matcher)CoreMatchers.is((Object)1));
            Assert.assertThat(subscriber.getOnNextEvents().get(0), (Matcher)CoreMatchers.equalTo((Object)this.numberToWord(number)));
        }

        private String numberToWord(int number) {
            return ObservableCommandNumbersToWords.dict.get(number);
        }

        private void sleep(long ms) {
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
        }
    }
}

