/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.dcp;

import com.couchbase.client.core.ClusterFacade;
import com.couchbase.client.core.annotations.InterfaceAudience;
import com.couchbase.client.core.annotations.InterfaceStability;
import com.couchbase.client.core.config.CouchbaseBucketConfig;
import com.couchbase.client.core.dcp.BucketStreamAggregatorState;
import com.couchbase.client.core.dcp.BucketStreamState;
import com.couchbase.client.core.message.cluster.GetClusterConfigRequest;
import com.couchbase.client.core.message.cluster.GetClusterConfigResponse;
import com.couchbase.client.core.message.dcp.DCPRequest;
import com.couchbase.client.core.message.dcp.OpenConnectionRequest;
import com.couchbase.client.core.message.dcp.OpenConnectionResponse;
import com.couchbase.client.core.message.dcp.StreamRequestRequest;
import com.couchbase.client.core.message.dcp.StreamRequestResponse;
import java.util.List;
import rx.Observable;
import rx.functions.Func1;

@InterfaceStability.Experimental
@InterfaceAudience.Public
public class BucketStreamAggregator {
    public static String DEFAULT_CONNECTION_NAME = "jvmCore";
    private final ClusterFacade core;
    private final String bucket;

    public BucketStreamAggregator(ClusterFacade core, String bucket) {
        this.core = core;
        this.bucket = bucket;
    }

    public Observable<DCPRequest> feed() {
        BucketStreamAggregatorState state = new BucketStreamAggregatorState(DEFAULT_CONNECTION_NAME);
        short s = ((Integer)this.partitionSize().toBlocking().first()).intValue();
        for (short partition = 0; partition < s; partition = (short)(partition + 1)) {
            state.put(new BucketStreamState(partition, 0L, 0L, -1L, 0L, -1L));
        }
        return this.feed(state);
    }

    public Observable<DCPRequest> feed(final BucketStreamAggregatorState aggregatorState) {
        final String connectionName = aggregatorState.name();
        return this.core.send(new OpenConnectionRequest(connectionName, this.bucket)).flatMap((Func1)new Func1<OpenConnectionResponse, Observable<DCPRequest>>(){

            public Observable<DCPRequest> call(final OpenConnectionResponse response) {
                return Observable.from((Iterable)aggregatorState).flatMap((Func1)new Func1<BucketStreamState, Observable<StreamRequestResponse>>(){

                    public Observable<StreamRequestResponse> call(final BucketStreamState feed) {
                        Observable res = BucketStreamAggregator.this.core.send(new StreamRequestRequest(connectionName, feed.partition(), feed.vbucketUUID(), feed.startSequenceNumber(), feed.endSequenceNumber(), feed.snapshotStartSequenceNumber(), feed.snapshotEndSequenceNumber(), BucketStreamAggregator.this.bucket));
                        return res.flatMap((Func1)new Func1<StreamRequestResponse, Observable<StreamRequestResponse>>(){

                            public Observable<StreamRequestResponse> call(StreamRequestResponse response) {
                                long rollbackSequenceNumber;
                                switch (response.status()) {
                                    case RANGE_ERROR: {
                                        rollbackSequenceNumber = 0L;
                                        break;
                                    }
                                    case ROLLBACK: {
                                        rollbackSequenceNumber = response.rollbackToSequenceNumber();
                                        break;
                                    }
                                    default: {
                                        return Observable.just((Object)response);
                                    }
                                }
                                return BucketStreamAggregator.this.core.send(new StreamRequestRequest(connectionName, feed.partition(), feed.vbucketUUID(), rollbackSequenceNumber, feed.endSequenceNumber(), feed.snapshotStartSequenceNumber(), feed.snapshotEndSequenceNumber(), BucketStreamAggregator.this.bucket));
                            }
                        });
                    }
                }).toList().flatMap((Func1)new Func1<List<StreamRequestResponse>, Observable<DCPRequest>>(){

                    public Observable<DCPRequest> call(List<StreamRequestResponse> streamRequestResponses) {
                        return response.connection().subject();
                    }
                });
            }
        });
    }

    private Observable<Integer> partitionSize() {
        return this.core.send(new GetClusterConfigRequest()).map((Func1)new Func1<GetClusterConfigResponse, Integer>(){

            public Integer call(GetClusterConfigResponse response) {
                CouchbaseBucketConfig config = (CouchbaseBucketConfig)response.config().bucketConfig(BucketStreamAggregator.this.bucket);
                return config.numberOfPartitions();
            }
        });
    }
}

