/*
 * Decompiled with CFR 0.152.
 */
package io.reactivex.netty.protocol.http.client;

import io.reactivex.netty.channel.ObservableConnection;
import io.reactivex.netty.client.ClientMetricsEvent;
import io.reactivex.netty.metrics.Clock;
import io.reactivex.netty.metrics.MetricEventsSubject;
import io.reactivex.netty.protocol.http.client.HttpClientMetricsEvent;
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
import io.reactivex.netty.protocol.http.client.HttpClientResponse;
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.subscriptions.CompositeSubscription;

class RequestProcessingOperator<I, O>
implements Observable.Operator<HttpClientResponse<O>, ObservableConnection<HttpClientResponse<O>, HttpClientRequest<I>>> {
    private final HttpClientRequest<I> request;
    private final MetricEventsSubject<ClientMetricsEvent<?>> eventsSubject;
    private final long responseSubscriptionTimeoutMs;

    RequestProcessingOperator(HttpClientRequest<I> request, MetricEventsSubject<ClientMetricsEvent<?>> eventsSubject, long responseSubscriptionTimeoutMs) {
        this.request = request;
        this.eventsSubject = eventsSubject;
        this.responseSubscriptionTimeoutMs = responseSubscriptionTimeoutMs;
    }

    public Subscriber<? super ObservableConnection<HttpClientResponse<O>, HttpClientRequest<I>>> call(final Subscriber<? super HttpClientResponse<O>> child) {
        final long startTimeMillis = Clock.newStartTimeMillis();
        this.eventsSubject.onEvent(HttpClientMetricsEvent.REQUEST_SUBMITTED);
        final CompositeSubscription cs = new CompositeSubscription();
        child.add((Subscription)cs);
        Subscriber toReturn = new Subscriber<ObservableConnection<HttpClientResponse<O>, HttpClientRequest<I>>>(){

            public void onCompleted() {
            }

            public void onError(Throwable e) {
                child.onError(e);
            }

            public void onNext(final ObservableConnection<HttpClientResponse<O>, HttpClientRequest<I>> connection) {
                cs.add(connection.getInput().doOnNext(new Action1<HttpClientResponse<O>>(){

                    public void call(HttpClientResponse<O> response) {
                        response.updateNoContentSubscriptionTimeoutIfNotScheduled(RequestProcessingOperator.this.responseSubscriptionTimeoutMs, TimeUnit.MILLISECONDS);
                    }
                }).doOnError((Action1)new Action1<Throwable>(){

                    public void call(Throwable throwable) {
                        RequestProcessingOperator.this.eventsSubject.onEvent(HttpClientMetricsEvent.RESPONSE_FAILED, Clock.onEndMillis(startTimeMillis), throwable);
                    }
                }).subscribe(child));
                RequestProcessingOperator.this.request.doOnWriteFailed(new Action1<Throwable>(){

                    public void call(Throwable throwable) {
                        RequestProcessingOperator.this.eventsSubject.onEvent(HttpClientMetricsEvent.REQUEST_WRITE_FAILED, Clock.onEndMillis(startTimeMillis), throwable);
                        child.onError(throwable);
                    }
                });
                RequestProcessingOperator.this.request.doOnWriteComplete(new Action0(){

                    public void call() {
                        connection.flush().subscribe((Observer)new Observer<Void>(){

                            public void onCompleted() {
                                RequestProcessingOperator.this.eventsSubject.onEvent(HttpClientMetricsEvent.REQUEST_WRITE_COMPLETE, Clock.onEndMillis(startTimeMillis));
                            }

                            public void onError(Throwable e) {
                                RequestProcessingOperator.this.eventsSubject.onEvent(HttpClientMetricsEvent.REQUEST_WRITE_FAILED, Clock.onEndMillis(startTimeMillis), e);
                                child.onError(e);
                            }

                            public void onNext(Void aVoid) {
                            }
                        });
                    }
                });
                connection.write(RequestProcessingOperator.this.request);
            }
        };
        return toReturn;
    }
}

