/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.driver.internal.cursor;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.neo4j.driver.Bookmark;
import org.neo4j.driver.Logger;
import org.neo4j.driver.Logging;
import org.neo4j.driver.Query;
import org.neo4j.driver.Record;
import org.neo4j.driver.Value;
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.exceptions.Neo4jException;
import org.neo4j.driver.exceptions.TransactionNestingException;
import org.neo4j.driver.internal.DatabaseBookmark;
import org.neo4j.driver.internal.InternalRecord;
import org.neo4j.driver.internal.bolt.api.BoltConnection;
import org.neo4j.driver.internal.bolt.api.BoltProtocolVersion;
import org.neo4j.driver.internal.bolt.api.GqlStatusError;
import org.neo4j.driver.internal.bolt.api.ResponseHandler;
import org.neo4j.driver.internal.bolt.api.summary.DiscardSummary;
import org.neo4j.driver.internal.bolt.api.summary.PullSummary;
import org.neo4j.driver.internal.bolt.api.summary.RunSummary;
import org.neo4j.driver.internal.cursor.AbstractRecordStateResponseHandler;
import org.neo4j.driver.internal.cursor.RxResultCursor;
import org.neo4j.driver.internal.types.InternalTypeSystem;
import org.neo4j.driver.internal.util.Futures;
import org.neo4j.driver.internal.util.MetadataExtractor;
import org.neo4j.driver.summary.ResultSummary;

public class RxResultCursorImpl
extends AbstractRecordStateResponseHandler
implements RxResultCursor,
ResponseHandler {
    private static final MetadataExtractor METADATA_EXTRACTOR = new MetadataExtractor("t_last");
    private static final ClientException IGNORED_ERROR = new ClientException(GqlStatusError.UNKNOWN.getStatus(), GqlStatusError.UNKNOWN.getStatusDescription("A message has been ignored during result streaming."), "N/A", "A message has been ignored during result streaming.", GqlStatusError.DIAGNOSTIC_RECORD, null);
    private static final Runnable NOOP_RUNNABLE = () -> {};
    private static final BiConsumer<Record, Throwable> NOOP_CONSUMER = (record, throwable) -> {};
    private static final RunSummary EMPTY_RUN_SUMMARY = new RunSummary(){

        @Override
        public long queryId() {
            return -1L;
        }

        @Override
        public List<String> keys() {
            return List.of();
        }

        @Override
        public long resultAvailableAfter() {
            return -1L;
        }
    };
    private final Logger log;
    private final BoltConnection boltConnection;
    private final Query query;
    private final RunSummary runSummary;
    private final Throwable runError;
    private final Consumer<DatabaseBookmark> bookmarkConsumer;
    private final Consumer<Throwable> throwableConsumer;
    private final Supplier<Throwable> interruptSupplier;
    private final boolean closeOnSummary;
    private final CompletableFuture<ResultSummary> summaryFuture = new CompletableFuture();
    private final CompletableFuture<Void> consumedFuture = new CompletableFuture();
    private final boolean legacyNotifications;
    private State state;
    private boolean discardPending;
    private boolean runErrorExposed;
    private boolean summaryExposed;
    private BiConsumer<Record, Throwable> recordConsumer;
    private long outstandingDemand;
    private boolean recordConsumerFinished;
    private boolean recordConsumerHadRequests;
    private PullSummary pullSummary;
    private DiscardSummary discardSummary;
    private Throwable error;
    private boolean interrupted;

    public RxResultCursorImpl(BoltConnection boltConnection, Query query, RunSummary runSummary, Throwable runError, Consumer<DatabaseBookmark> bookmarkConsumer, Consumer<Throwable> throwableConsumer, boolean closeOnSummary, Supplier<Throwable> interruptSupplier, Logging logging) {
        this.boltConnection = boltConnection;
        this.legacyNotifications = new BoltProtocolVersion(5, 5).compareTo(boltConnection.protocolVersion()) > 0;
        this.query = query;
        if (runError == null) {
            this.runSummary = runSummary;
            this.state = State.READY;
        } else {
            this.runSummary = EMPTY_RUN_SUMMARY;
            this.state = State.FAILED;
            this.summaryFuture.completeExceptionally(runError);
        }
        this.runError = runError;
        this.bookmarkConsumer = bookmarkConsumer;
        this.closeOnSummary = closeOnSummary;
        this.throwableConsumer = throwableConsumer;
        this.interruptSupplier = interruptSupplier;
        this.log = logging.getLog(this.getClass());
        String runErrorName = runError == null ? "null" : runError.getClass().getCanonicalName();
        this.log.trace("[%d] New instance (runError=%s)", this.hashCode(), runErrorName);
    }

    @Override
    public synchronized Throwable getRunError() {
        String name = this.runError == null ? "null" : this.runError.getClass().getCanonicalName();
        this.log.trace("[%d] Run error explicitly retrieved (value=%s)", this.hashCode(), name);
        this.runErrorExposed = true;
        return this.runError;
    }

    @Override
    public List<String> keys() {
        return this.runSummary.keys();
    }

    @Override
    public CompletionStage<Void> consumed() {
        return this.consumedFuture;
    }

    @Override
    public synchronized boolean isDone() {
        return switch (this.state) {
            default -> throw new IncompatibleClassChangeError();
            case State.DISCARDING, State.STREAMING, State.READY -> false;
            case State.FAILED -> {
                if (this.runError == null || this.runErrorExposed) {
                    yield true;
                }
                yield false;
            }
            case State.SUCCEEDED -> true;
        };
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void installRecordConsumer(BiConsumer<Record, Throwable> recordConsumer) {
        Objects.requireNonNull(recordConsumer);
        Runnable runnable = NOOP_RUNNABLE;
        RxResultCursorImpl rxResultCursorImpl = this;
        synchronized (rxResultCursorImpl) {
            if (this.recordConsumer == null) {
                this.recordConsumer = (record, throwable) -> {
                    String recordHash = record == null ? "null" : Integer.valueOf(record.hashCode());
                    String throwableName = throwable == null ? "null" : throwable.getClass().getCanonicalName();
                    try {
                        recordConsumer.accept((Record)record, (Throwable)throwable);
                        this.log.trace("[%d] Record consumer notified with (record=%s, throwable=%s)", this.hashCode(), recordHash, throwableName);
                    }
                    catch (Throwable unexpectedThrowable) {
                        this.log.error(String.format("[%d] Record consumer threw an error when notified with (record=%s, throwable=%s), this will be ignored", this.hashCode(), recordHash, throwableName), unexpectedThrowable);
                    }
                };
                this.log.trace("[%d] Record consumer installed", this.hashCode());
                if (this.runError != null && !this.runErrorExposed) {
                    runnable = this.setupRecordConsumerErrorNotificationRunnable(this.runError, true);
                }
            } else {
                this.log.warn("[%d] Only one record consumer is supported, this request will be ignored", this.hashCode());
            }
        }
        runnable.run();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void request(long n) {
        if (n > 0L) {
            Runnable runnable = NOOP_RUNNABLE;
            RxResultCursorImpl rxResultCursorImpl = this;
            synchronized (rxResultCursorImpl) {
                if (this.recordConsumerFinished) {
                    this.log.trace("[%d] Tried requesting more records after record consumer is finished, this request will be ignored", this.hashCode());
                    return;
                }
                this.recordConsumerHadRequests = true;
                this.updateRecordState(AbstractRecordStateResponseHandler.RecordState.NO_RECORD);
                this.log.trace("[%d] %d records requested in %s state", new Object[]{this.hashCode(), n, this.state});
                switch (this.state) {
                    case READY: {
                        runnable = this.executeIfNotInterrupted(() -> {
                            long request = this.appendDemand(n);
                            this.state = State.STREAMING;
                            return () -> this.boltConnection.pull(this.runSummary.queryId(), request).thenCompose(conn -> conn.flush(this)).whenComplete((ignored, throwable) -> {
                                if ((throwable = Futures.completionExceptionCause(throwable)) != null) {
                                    this.handleError((Throwable)throwable, false);
                                    this.onComplete();
                                }
                            });
                        });
                        break;
                    }
                    case STREAMING: {
                        this.appendDemand(n);
                        break;
                    }
                    case FAILED: {
                        runnable = this.runError != null ? this.setupRecordConsumerErrorNotificationRunnable(this.runError, true) : (this.error != null ? this.setupRecordConsumerErrorNotificationRunnable(this.error, false) : NOOP_RUNNABLE);
                        break;
                    }
                }
            }
            runnable.run();
        } else {
            this.log.warn("[%d] %d records requested, negative amounts will be ignored", this.hashCode(), n);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cancel() {
        Runnable runnable = NOOP_RUNNABLE;
        RxResultCursorImpl rxResultCursorImpl = this;
        synchronized (rxResultCursorImpl) {
            this.log.trace("[%d] Cancellation requested in %s state", new Object[]{this.hashCode(), this.state});
            switch (this.state) {
                case READY: {
                    runnable = this.executeIfNotInterrupted(this::setupDiscardRunnable);
                    break;
                }
                case STREAMING: {
                    this.discardPending = true;
                    break;
                }
            }
        }
        runnable.run();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletionStage<ResultSummary> summaryAsync() {
        Runnable runnable = NOOP_RUNNABLE;
        RxResultCursorImpl rxResultCursorImpl = this;
        synchronized (rxResultCursorImpl) {
            this.log.trace("[%d] Summary requested in %s state", new Object[]{this.hashCode(), this.state});
            if (this.summaryExposed) {
                return this.summaryFuture;
            }
            this.summaryExposed = true;
            switch (this.state) {
                case DISCARDING: 
                case FAILED: 
                case SUCCEEDED: {
                    break;
                }
                case READY: {
                    runnable = this.executeIfNotInterrupted(this::setupDiscardRunnable);
                    break;
                }
                case STREAMING: {
                    this.discardPending = true;
                }
            }
        }
        runnable.run();
        return this.summaryFuture;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletionStage<Void> rollback() {
        this.log.trace("[%d] Rolling back unpublished result", this.hashCode());
        RxResultCursorImpl rxResultCursorImpl = this;
        synchronized (rxResultCursorImpl) {
            this.state = State.SUCCEEDED;
        }
        this.completeSummaryFuture(null, null);
        final CompletableFuture resetFuture = new CompletableFuture();
        this.boltConnection.reset().thenCompose(conn -> conn.flush(new ResponseHandler(){
            Throwable throwable = null;

            @Override
            public void onError(Throwable throwable) {
                this.throwable = Futures.completionExceptionCause(throwable);
            }

            @Override
            public void onComplete() {
                if (this.throwable != null) {
                    resetFuture.completeExceptionally(this.throwable);
                } else {
                    resetFuture.complete(null);
                }
            }
        })).whenComplete((ignored, throwable) -> {
            if ((throwable = Futures.completionExceptionCause(throwable)) != null) {
                resetFuture.completeExceptionally((Throwable)throwable);
            }
        });
        return ((CompletableFuture)resetFuture.thenCompose(ignored -> this.boltConnection.close())).exceptionally(throwable -> null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onComplete() {
        Runnable runnable;
        this.log.trace("[%d] onComplete", this.hashCode());
        RxResultCursorImpl rxResultCursorImpl = this;
        synchronized (rxResultCursorImpl) {
            Throwable throwable = this.interruptSupplier.get();
            if (throwable != null) {
                this.handleError(throwable, true);
            } else {
                throwable = this.error;
            }
            runnable = throwable != null ? this.setupCompletionRunnableWithError(throwable) : (this.pullSummary != null ? this.setupCompletionRunnableWithPullSummary() : (this.discardSummary != null ? this.setupCompletionRunnableWithSummaryMetadata(this.discardSummary.metadata()) : () -> this.log.trace("[%d] onComplete resulted in no action", this.hashCode())));
        }
        runnable.run();
    }

    @Override
    public synchronized void onError(Throwable throwable) {
        if (this.log.isTraceEnabled()) {
            this.log.error(String.format("[%d] onError", this.hashCode()), throwable);
        }
        this.handleError(throwable, false);
    }

    @Override
    public synchronized void onIgnored() {
        this.log.trace("[%d] onIgnored", this.hashCode());
        Throwable throwable = this.interruptSupplier.get();
        if (throwable == null) {
            throwable = IGNORED_ERROR;
        }
        this.onError(throwable);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onRecord(Value[] fields) {
        this.log.trace("[%d] onRecord", this.hashCode());
        RxResultCursorImpl rxResultCursorImpl = this;
        synchronized (rxResultCursorImpl) {
            this.updateRecordState(AbstractRecordStateResponseHandler.RecordState.HAD_RECORD);
            this.decrementDemand();
        }
        InternalRecord record = new InternalRecord(this.runSummary.keys(), fields);
        this.recordConsumer.accept(record, null);
    }

    @Override
    public synchronized void onPullSummary(PullSummary summary) {
        this.log.trace("[%d] onPullSummary", this.hashCode());
        this.pullSummary = summary;
    }

    @Override
    public synchronized void onDiscardSummary(DiscardSummary summary) {
        this.log.trace("[%d] onDiscardSummary", this.hashCode());
        this.discardSummary = summary;
    }

    @Override
    public synchronized CompletionStage<Throwable> discardAllFailureAsync() {
        this.log.trace("[%d] Discard all requested", this.hashCode());
        boolean summaryExposed = this.summaryExposed;
        boolean runErrorExposed = this.runErrorExposed;
        return this.summaryAsync().thenApply(ignored -> null).exceptionally(throwable -> runErrorExposed || summaryExposed ? null : throwable);
    }

    @Override
    public synchronized CompletionStage<Throwable> pullAllFailureAsync() {
        this.log.trace("[%d] Pull all failure requested", this.hashCode());
        if (this.recordConsumer != null && !this.isDone()) {
            return CompletableFuture.completedFuture(new TransactionNestingException("You cannot run another query or begin a new transaction in the same session before you've fully consumed the previous run result."));
        }
        return this.discardAllFailureAsync();
    }

    private synchronized long appendDemand(long n) {
        if (n == Long.MAX_VALUE) {
            this.outstandingDemand = -1L;
        } else {
            try {
                this.outstandingDemand = Math.addExact(this.outstandingDemand, n);
            }
            catch (ArithmeticException ex) {
                this.outstandingDemand = -1L;
            }
        }
        this.log.trace("[%d] Appended demand, outstanding is %d", this.hashCode(), this.outstandingDemand);
        return this.outstandingDemand;
    }

    private synchronized long getDemand() {
        this.log.trace("[%d] Get demand, outstanding is %d", this.hashCode(), this.outstandingDemand);
        return this.outstandingDemand;
    }

    private synchronized void decrementDemand() {
        if (this.outstandingDemand > 0L) {
            --this.outstandingDemand;
        }
        this.log.trace("[%d] Decremented demand, outstanding is %d", this.hashCode(), this.outstandingDemand);
    }

    private synchronized Runnable setupDiscardRunnable() {
        this.state = State.DISCARDING;
        return () -> this.boltConnection.discard(this.runSummary.queryId(), -1L).thenCompose(conn -> conn.flush(this)).whenComplete((ignored, throwable) -> {
            if ((throwable = Futures.completionExceptionCause(throwable)) != null) {
                this.handleError((Throwable)throwable, false);
                this.onComplete();
            }
        });
    }

    private synchronized Runnable executeIfNotInterrupted(Supplier<Runnable> runnableSupplier) {
        Runnable runnable = NOOP_RUNNABLE;
        Throwable throwable = this.interruptSupplier.get();
        if (throwable == null) {
            runnable = runnableSupplier.get();
        } else {
            this.log.trace("[%d] Interrupt signal detected upon handling request", this.hashCode());
            this.handleError(throwable, true);
            runnable = this::onComplete;
        }
        return runnable;
    }

    private synchronized Runnable setupRecordConsumerErrorNotificationRunnable(Throwable throwable, boolean runError) {
        Runnable runnable;
        if (this.recordConsumer != null) {
            if (!this.recordConsumerFinished) {
                if (runError) {
                    this.runErrorExposed = true;
                }
                this.recordConsumerFinished = true;
                BiConsumer<Record, Throwable> recordConsumerRef = this.recordConsumer;
                this.recordConsumer = NOOP_CONSUMER;
                runnable = () -> recordConsumerRef.accept(null, throwable);
            } else {
                runnable = () -> this.log.trace("[%d] Record consumer will not be notified as it has been finished", this.hashCode());
            }
        } else {
            runnable = () -> this.log.trace("[%d] Record consumer will not be notified as it has not been installed", this.hashCode());
        }
        return runnable;
    }

    private synchronized Runnable setupCompletionRunnableWithPullSummary() {
        this.log.trace("[%d] Setting up completion with pull summary", this.hashCode());
        Runnable runnable = NOOP_RUNNABLE;
        if (this.pullSummary.hasMore()) {
            this.pullSummary = null;
            if (this.discardPending) {
                this.discardPending = false;
                this.state = State.DISCARDING;
                runnable = () -> this.boltConnection.discard(this.runSummary.queryId(), -1L).thenCompose(conn -> conn.flush(this)).whenComplete((ignored, flushThrowable) -> {
                    Throwable error = Futures.completionExceptionCause(flushThrowable);
                    if (error != null) {
                        this.handleError(error, false);
                        this.onComplete();
                    }
                });
            } else {
                long demand = this.getDemand();
                if (demand != 0L) {
                    this.state = State.STREAMING;
                    runnable = () -> this.boltConnection.pull(this.runSummary.queryId(), demand > 0L ? demand : -1L).thenCompose(conn -> conn.flush(this)).whenComplete((ignored, flushThrowable) -> {
                        Throwable error = Futures.completionExceptionCause(flushThrowable);
                        if (error != null) {
                            this.handleError(error, false);
                            this.onComplete();
                        }
                    });
                } else {
                    this.state = State.READY;
                }
            }
        } else {
            runnable = this.setupCompletionRunnableWithSummaryMetadata(this.pullSummary.metadata());
        }
        return runnable;
    }

    private synchronized Runnable setupCompletionRunnableWithSummaryMetadata(Map<String, Value> metadata) {
        this.log.trace("[%d] Setting up completion with summary metadata", this.hashCode());
        Runnable runnable = NOOP_RUNNABLE;
        ResultSummary resultSummary = null;
        try {
            resultSummary = this.resultSummary(metadata);
            this.state = State.SUCCEEDED;
        }
        catch (Throwable summaryThrowable) {
            this.handleError(summaryThrowable, false);
        }
        if (resultSummary != null) {
            Optional<DatabaseBookmark> bookmarkOpt = RxResultCursorImpl.databaseBookmark(metadata);
            boolean recordConsumerFinished = this.recordConsumerFinished;
            this.recordConsumerFinished = true;
            BiConsumer<Record, Throwable> recordConsumerRef = this.recordConsumer;
            this.recordConsumer = NOOP_CONSUMER;
            boolean recordConsumerHadRequests = this.recordConsumerHadRequests;
            ResultSummary resultSummaryRef = resultSummary;
            runnable = () -> {
                bookmarkOpt.ifPresent(this.bookmarkConsumer);
                CompletionStage<Object> closeStage = this.closeOnSummary ? this.boltConnection.close() : CompletableFuture.completedStage(null);
                closeStage.whenComplete((ignored, closeThrowable) -> {
                    Throwable error = Futures.completionExceptionCause(closeThrowable);
                    if (error != null && this.log.isTraceEnabled()) {
                        this.log.error(String.format("[%d] Failed to close connection while publishing summary", this.hashCode()), error);
                    }
                    if (recordConsumerFinished) {
                        this.log.trace("[%d] Won't publish summary because recordConsumer is finished", this.hashCode());
                    } else if (recordConsumerRef != null) {
                        if (recordConsumerHadRequests) {
                            recordConsumerRef.accept(null, null);
                        } else {
                            this.log.trace("[%d] Record consumer will not be notified as it had no requests", this.hashCode());
                        }
                    } else {
                        this.log.trace("[%d] Record consumer will not be notified as it has not been installed", this.hashCode());
                    }
                    this.completeSummaryFuture(resultSummaryRef, null);
                });
            };
        } else {
            runnable = this::onComplete;
        }
        return runnable;
    }

    private ResultSummary resultSummary(Map<String, Value> metadata) {
        return METADATA_EXTRACTOR.extractSummary(this.query, this.boltConnection, this.runSummary.resultAvailableAfter(), metadata, this.legacyNotifications, this.generateGqlStatusObject(this.runSummary.keys()));
    }

    private static Optional<DatabaseBookmark> databaseBookmark(Map<String, Value> metadata) {
        String bookmarkStr;
        DatabaseBookmark databaseBookmark = null;
        Value bookmarkValue = metadata.get("bookmark");
        if (bookmarkValue != null && !bookmarkValue.isNull() && bookmarkValue.hasType(InternalTypeSystem.TYPE_SYSTEM.STRING()) && !(bookmarkStr = bookmarkValue.asString()).isEmpty()) {
            databaseBookmark = new DatabaseBookmark(null, Bookmark.from(bookmarkStr));
        }
        return Optional.ofNullable(databaseBookmark);
    }

    private synchronized Runnable setupCompletionRunnableWithError(Throwable throwable) {
        this.log.trace("[%d] Setting up completion with error %s", this.hashCode(), throwable.getClass().getCanonicalName());
        boolean recordConsumerPresent = this.recordConsumer != null;
        boolean recordConsumerFinished = this.recordConsumerFinished;
        Runnable recordConsumerErrorNotificationRunnable = this.setupRecordConsumerErrorNotificationRunnable(throwable, false);
        boolean interrupted = this.interrupted;
        return () -> {
            ResultSummary summary;
            block4: {
                summary = null;
                try {
                    summary = this.resultSummary(Collections.emptyMap());
                }
                catch (Throwable summaryThrowable) {
                    if (interrupted) break block4;
                    throwable.addSuppressed(summaryThrowable);
                }
            }
            if (summary != null && recordConsumerPresent && !recordConsumerFinished) {
                ResultSummary summaryRef = summary;
                this.closeBoltConnection(throwable, interrupted, () -> {
                    recordConsumerErrorNotificationRunnable.run();
                    this.completeSummaryFuture(summaryRef, null);
                });
            } else {
                this.closeBoltConnection(throwable, interrupted, () -> this.completeSummaryFuture(null, throwable));
            }
        };
    }

    private void closeBoltConnection(Throwable throwable, boolean interrupted, Runnable runnable) {
        CompletionStage<Object> closeStage = this.closeOnSummary ? this.boltConnection.close() : CompletableFuture.completedStage(null);
        closeStage.whenComplete((ignored, closeThrowable) -> {
            Throwable error = Futures.completionExceptionCause(closeThrowable);
            if (!interrupted) {
                if (error != null) {
                    throwable.addSuppressed(error);
                }
                this.throwableConsumer.accept(throwable);
            }
            runnable.run();
        });
    }

    private synchronized void handleError(Throwable throwable, boolean interrupted) {
        this.state = State.FAILED;
        throwable = Futures.completionExceptionCause(throwable);
        if (this.error == null) {
            this.error = throwable;
            this.interrupted = interrupted;
        } else if (!this.interrupted) {
            if (throwable == IGNORED_ERROR) {
                return;
            }
            if (interrupted) {
                this.error = throwable;
                this.interrupted = true;
            } else if (this.error instanceof Neo4jException && !(throwable instanceof Neo4jException)) {
                if (this.error != IGNORED_ERROR) {
                    throwable.addSuppressed(this.error);
                }
                this.error = throwable;
            } else {
                this.error.addSuppressed(throwable);
            }
        }
    }

    private void completeSummaryFuture(ResultSummary summary, Throwable throwable) {
        if ((throwable = Futures.completionExceptionCause(throwable)) != null) {
            this.consumedFuture.completeExceptionally(throwable);
            this.summaryFuture.completeExceptionally(throwable);
        } else {
            this.consumedFuture.complete(null);
            this.summaryFuture.complete(summary);
        }
    }

    private static enum State {
        READY,
        STREAMING,
        DISCARDING,
        FAILED,
        SUCCEEDED;

    }
}

