/*
 * Decompiled with CFR 0.152.
 */
package com.hurence.opc.ua;

import com.hurence.opc.OpcData;
import com.hurence.opc.OpcSession;
import com.hurence.opc.OperationStatus;
import com.hurence.opc.SubscriptionConfiguration;
import com.hurence.opc.exception.OpcException;
import com.hurence.opc.ua.OpcUaQualityExtractor;
import com.hurence.opc.ua.OpcUaSessionProfile;
import com.hurence.opc.ua.OpcUaTemplate;
import com.hurence.opc.ua.UaVariantMarshaller;
import java.lang.ref.WeakReference;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription;
import org.eclipse.milo.opcua.stack.core.AttributeId;
import org.eclipse.milo.opcua.stack.core.StatusCodes;
import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime;
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
import org.eclipse.milo.opcua.stack.core.types.builtin.QualifiedName;
import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
import org.eclipse.milo.opcua.stack.core.types.enumerated.MonitoringMode;
import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
import org.eclipse.milo.opcua.stack.core.types.structured.MonitoredItemCreateRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.MonitoringParameters;
import org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OpcUaSession
implements OpcSession {
    private static final Logger logger = LoggerFactory.getLogger(OpcUaSession.class);
    private static final AtomicInteger clientHandleCounter = new AtomicInteger();
    private final Duration publicationInterval;
    private final WeakReference<OpcUaClient> client;
    private final WeakReference<OpcUaTemplate> creatingOperations;
    private UaSubscription subscription;

    private OpcUaSession(OpcUaTemplate creatingOperations, OpcUaClient client, Duration publicationInterval) {
        this.client = new WeakReference<OpcUaClient>(client);
        this.creatingOperations = new WeakReference<OpcUaTemplate>(creatingOperations);
        this.publicationInterval = publicationInterval;
    }

    static OpcUaSession create(OpcUaTemplate creatingOperations, OpcUaClient client, OpcUaSessionProfile sessionProfile) {
        try {
            return new OpcUaSession(creatingOperations, client, sessionProfile.getDefaultPublicationInterval());
        }
        catch (Exception e) {
            throw new OpcException("Unable to create an OPC-UA session", e);
        }
    }

    private synchronized UaSubscription subscription() {
        try {
            if (this.subscription == null && this.client.get() != null) {
                this.subscription = (UaSubscription)((OpcUaClient)this.client.get()).getSubscriptionManager().createSubscription((double)Math.round((double)this.publicationInterval.toNanos() / 1000000.0)).get();
            }
        }
        catch (Exception e) {
            throw new OpcException("Unable to create subscription", e);
        }
        return this.subscription;
    }

    public void cleanup() {
        logger.info("Destroying UA session");
        try {
            if (this.client.get() != null && this.subscription != null) {
                ((OpcUaClient)this.client.get()).getSubscriptionManager().deleteSubscription(this.subscription.getSubscriptionId()).get();
                logger.info("Released subscription {}", (Object)this.subscription.getSubscriptionId());
            }
        }
        catch (Exception e) {
            logger.warn("Unable to properly clear subscription " + this.subscription.getSubscriptionId(), (Throwable)e);
        }
        finally {
            this.subscription = null;
            this.client.clear();
        }
    }

    private OpcUaClient fetchValidClient() {
        if (this.client.get() == null) {
            throw new OpcException("Unable to read items. OPC-UA Client has been garbage collected. Please use a fresher instance");
        }
        return (OpcUaClient)this.client.get();
    }

    private OpcData<?> opcData(String tag, DataValue dataValue) {
        Instant instant = Instant.now();
        DateTime dt = null;
        double picos = 0.0;
        if (dataValue.getSourceTime() != null) {
            dt = dataValue.getSourceTime();
            if (dataValue.getSourcePicoseconds() != null) {
                picos = dataValue.getSourcePicoseconds().doubleValue();
            }
        } else if (dataValue.getServerTime() != null) {
            dt = dataValue.getServerTime();
            if (dataValue.getServerPicoseconds() != null) {
                picos = dataValue.getServerPicoseconds().doubleValue();
            }
        }
        if (dt != null) {
            instant = dt.getJavaDate().toInstant().plusNanos(Math.round(picos / 1000.0));
        }
        return new OpcData<Object>(tag, instant, OpcUaQualityExtractor.quality(dataValue.getStatusCode()), UaVariantMarshaller.toJavaType(dataValue.getValue()), OpcUaQualityExtractor.operationStatus(dataValue.getStatusCode()));
    }

    @Override
    public List<OpcData> read(String ... tags) {
        OpcUaClient c = this.fetchValidClient();
        try {
            return (List)((CompletableFuture)c.readValues(0.0, TimestampsToReturn.Both, Arrays.stream(tags).map(NodeId::parseSafe).map(Optional::get).collect(Collectors.toList())).thenApply(dataValues -> {
                if (dataValues.size() != tags.length) {
                    throw new OpcException("Input tags does not match received tags. Aborting");
                }
                ArrayList ret = new ArrayList();
                for (int i = 0; i < dataValues.size(); ++i) {
                    try {
                        ret.add(this.opcData(tags[i], (DataValue)dataValues.get(i)));
                        continue;
                    }
                    catch (Exception e) {
                        logger.warn("Unable to properly map tag " + tags[i] + ". Skipping!", (Throwable)e);
                    }
                }
                return ret;
            })).get();
        }
        catch (Exception e) {
            throw new OpcException("Unable to successfully read tags", e);
        }
    }

    @Override
    public List<OperationStatus> write(OpcData ... data) {
        try {
            return (List)((CompletableFuture)this.fetchValidClient().writeValues(Arrays.stream(data).map(OpcData::getTag).map(NodeId::parse).collect(Collectors.toList()), Arrays.stream(data).map(OpcData::getValue).map(Variant::new).map(DataValue::valueOnly).collect(Collectors.toList())).thenApply(statusCodes -> statusCodes.stream().map(OpcUaQualityExtractor::operationStatus).collect(Collectors.toList()))).get();
        }
        catch (Exception e) {
            throw new OpcException("Unable to successfully read tags", e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Stream<OpcData> stream(SubscriptionConfiguration subscriptionConfiguration, String ... tags) {
        SynchronousQueue transferQueue = new SynchronousQueue();
        ArrayList<UaMonitoredItem> results = new ArrayList<UaMonitoredItem>();
        Map handles = Arrays.stream(tags).collect(Collectors.toMap(Function.identity(), tag -> clientHandleCounter.incrementAndGet()));
        Map<Integer, String> reverseHandles = handles.entrySet().stream().collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey));
        try {
            BiConsumer<UaMonitoredItem, DataValue> callback = (uaMonitoredItem1, dataValue) -> {
                String tag = (String)reverseHandles.get(uaMonitoredItem1.getClientHandle().intValue());
                try {
                    transferQueue.put(this.opcData(tag, (DataValue)dataValue));
                }
                catch (Exception e) {
                    logger.warn("Unable to properly map value for item " + tag, (Throwable)e);
                }
            };
            results.addAll((Collection)this.subscription().createMonitoredItems(TimestampsToReturn.Both, Arrays.stream(tags).map(tag -> new MonitoredItemCreateRequest(new ReadValueId(NodeId.parse((String)tag), AttributeId.Value.uid(), null, QualifiedName.NULL_VALUE), MonitoringMode.Reporting, new MonitoringParameters(UInteger.valueOf((int)((Integer)handles.get(tag))), Double.valueOf((double)subscriptionConfiguration.samplingIntervalForTag((String)tag).toNanos() / 1000000.0), null, UInteger.valueOf((long)Math.round(Math.ceil((double)this.publicationInterval.toNanos() / (double)subscriptionConfiguration.samplingIntervalForTag((String)tag).toNanos()))), Boolean.valueOf(true)))).collect(Collectors.toList()), (uaMonitoredItem, integer) -> {
                logger.info("Subscription for item {} with revised polling time {}", reverseHandles.get(uaMonitoredItem.getClientHandle().intValue()), (Object)uaMonitoredItem.getRevisedSamplingInterval());
                uaMonitoredItem.setValueConsumer(callback);
            }).get());
            results.stream().filter(uaMonitoredItem -> !uaMonitoredItem.getStatusCode().isGood()).findFirst().ifPresent(uaMonitoredItem -> {
                throw new OpcException(String.format("Failure trying to monitoring item %s : %s", reverseHandles.get(uaMonitoredItem.getClientHandle().intValue()), Arrays.toString((Object[])StatusCodes.lookup((long)uaMonitoredItem.getStatusCode().getValue()).orElse(null))));
            });
            return (Stream)Stream.generate(() -> {
                try {
                    OpcData ret = null;
                    while ((ret = (OpcData)transferQueue.poll(this.publicationInterval.toNanos(), TimeUnit.NANOSECONDS)) == null) {
                        if (this.subscription != null) continue;
                        throw new OpcException("EOF reading tags. Disconnected");
                    }
                    return ret;
                }
                catch (InterruptedException ie) {
                    throw new OpcException("Stream interrupted", ie);
                }
            }).onClose(() -> this.removeSubscriptions(results));
        }
        catch (Exception e) {
            try {
                this.removeSubscriptions(results);
            }
            finally {
                throw new OpcException("Unable to stream requested tags", e);
            }
        }
    }

    private void removeSubscriptions(List<UaMonitoredItem> results) {
        try {
            List removeResult = (List)this.subscription.deleteMonitoredItems(results).get();
            for (int i = 0; i < removeResult.size(); ++i) {
                if (((StatusCode)removeResult.get(i)).isGood()) continue;
                logger.warn("Unable to properly unsubscribe for item {}: {}", (Object)results.get(i).getReadValueId().getNodeId().toParseableString(), (Object)StatusCodes.lookup((long)((StatusCode)removeResult.get(i)).getValue()));
            }
        }
        catch (Exception e) {
            logger.error("Unable to properly removed monitored items", (Throwable)e);
        }
    }

    @Override
    public void close() throws Exception {
        if (this.creatingOperations.get() != null) {
            try {
                ((OpcUaTemplate)this.creatingOperations.get()).releaseSession(this);
            }
            finally {
                this.creatingOperations.clear();
            }
        }
    }
}

