/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.source;

import java.io.IOException;
import java.time.Instant;
import java.util.HashMap;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
import org.apache.seatunnel.connectors.seatunnel.source.Web3jSourceParameter;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.web3j.protocol.Web3j;
import org.web3j.protocol.http.HttpService;

public class Web3jSourceReader
extends AbstractSingleSplitReader<SeaTunnelRow> {
    private static final Logger log = LoggerFactory.getLogger(Web3jSourceReader.class);
    private final Web3jSourceParameter parameter;
    private final SingleSplitReaderContext context;
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private Web3j web3;

    Web3jSourceReader(Web3jSourceParameter parameter, SingleSplitReaderContext context) {
        this.parameter = parameter;
        this.context = context;
    }

    public void open() throws Exception {
        this.web3 = Web3j.build(new HttpService(this.parameter.getUrl()));
        log.info("connect Web3j server, url:[{}] ", (Object)this.parameter.getUrl());
    }

    public void close() throws IOException {
        if (this.web3 != null) {
            this.web3.shutdown();
        }
    }

    @Override
    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
        this.web3.ethBlockNumber().flowable().subscribe(blockNumber -> {
            HashMap<String, Object> data = new HashMap<String, Object>();
            data.put("timestamp", Instant.now().toString());
            data.put("blockNumber", blockNumber.getBlockNumber());
            String json = OBJECT_MAPPER.writeValueAsString(data);
            output.collect((Object)new SeaTunnelRow(new Object[]{json}));
            if (Boundedness.BOUNDED.equals((Object)this.context.getBoundedness())) {
                this.context.signalNoMoreElement();
            }
        });
    }
}

