/*
 * Decompiled with CFR 0.152.
 */
package com.marklogic.client.dataservices.impl;

import com.marklogic.client.DatabaseClient;
import com.marklogic.client.SessionState;
import com.marklogic.client.dataservices.InputEndpoint;
import com.marklogic.client.dataservices.impl.IOEndpointImpl;
import com.marklogic.client.dataservices.impl.InputCallerImpl;
import com.marklogic.client.io.marker.JSONWriteHandle;
import java.io.InputStream;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InputEndpointImpl
extends IOEndpointImpl
implements InputEndpoint {
    private static Logger logger = LoggerFactory.getLogger(InputEndpointImpl.class);
    private InputCallerImpl caller;
    private int batchSize;

    public InputEndpointImpl(DatabaseClient client, JSONWriteHandle apiDecl) {
        this(client, new InputCallerImpl(apiDecl));
    }

    private InputEndpointImpl(DatabaseClient client, InputCallerImpl caller) {
        super(client, caller);
        this.caller = caller;
        this.batchSize = this.initBatchSize(caller);
    }

    private InputCallerImpl getCaller() {
        return this.caller;
    }

    private int getBatchSize() {
        return this.batchSize;
    }

    @Override
    public void call(InputStream[] input) {
        this.call(null, null, null, input);
    }

    @Override
    public InputStream call(InputStream endpointState, SessionState session, InputStream workUnit, InputStream[] input) {
        this.checkAllowedArgs(endpointState, session, workUnit);
        return this.getCaller().arrayCall(this.getClient(), endpointState, session, workUnit, input);
    }

    @Override
    public InputEndpoint.BulkInputCaller bulkCaller() {
        return new BulkInputCallerImpl(this, this.getBatchSize());
    }

    static final class BulkInputCallerImpl
    extends IOEndpointImpl.BulkIOEndpointCallerImpl
    implements InputEndpoint.BulkInputCaller {
        private InputEndpointImpl endpoint;
        private int batchSize;
        private LinkedBlockingQueue<InputStream> queue;

        private BulkInputCallerImpl(InputEndpointImpl endpoint, int batchSize) {
            super(endpoint);
            this.endpoint = endpoint;
            this.batchSize = batchSize;
            this.queue = new LinkedBlockingQueue();
        }

        private InputEndpointImpl getEndpoint() {
            return this.endpoint;
        }

        private int getBatchSize() {
            return this.batchSize;
        }

        private LinkedBlockingQueue<InputStream> getQueue() {
            return this.queue;
        }

        @Override
        public void accept(InputStream input) {
            boolean hasBatch = this.queueInput(input, this.getQueue(), this.getBatchSize());
            if (hasBatch) {
                this.processInput();
            }
        }

        @Override
        public void acceptAll(InputStream[] input) {
            boolean hasBatch = this.queueAllInput(input, this.getQueue(), this.getBatchSize());
            if (hasBatch) {
                this.processInput();
            }
        }

        @Override
        public void awaitCompletion() {
            if (this.getQueue() == null) {
                return;
            }
            while (!this.getQueue().isEmpty()) {
                this.processInput();
            }
        }

        private void processInput() {
            logger.trace("input endpoint running endpoint={} count={} state={}", new Object[]{this.getEndpointPath(), this.getCallCount(), this.getEndpointState()});
            InputStream output = null;
            try {
                output = this.getEndpoint().getCaller().arrayCall(this.getClient(), this.getEndpointState(), this.getSession(), this.getWorkUnit(), this.getInputBatch(this.getQueue(), this.getBatchSize()));
            }
            catch (Throwable throwable) {
                throw new RuntimeException("error while calling " + this.getEndpoint().getEndpointPath(), throwable);
            }
            this.incrementCallCount();
            if (this.allowsEndpointState()) {
                this.setEndpointState(output);
            }
        }
    }
}

