/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.tools;

import com.google.common.base.Joiner;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import java.io.File;
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.config.YamlConfigurationLoader;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.sstable.SSTableLoader;
import org.apache.cassandra.streaming.ProgressInfo;
import org.apache.cassandra.streaming.SessionInfo;
import org.apache.cassandra.streaming.StreamConnectionFactory;
import org.apache.cassandra.streaming.StreamEvent;
import org.apache.cassandra.streaming.StreamEventHandler;
import org.apache.cassandra.streaming.StreamResultFuture;
import org.apache.cassandra.streaming.StreamState;
import org.apache.cassandra.thrift.AuthenticationRequest;
import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.Compression;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.CqlResult;
import org.apache.cassandra.thrift.CqlRow;
import org.apache.cassandra.thrift.ITransportFactory;
import org.apache.cassandra.thrift.TFramedTransportFactory;
import org.apache.cassandra.thrift.TokenRange;
import org.apache.cassandra.tools.BulkLoadConnectionFactory;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.OutputHandler;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.lang3.StringUtils;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TTransport;

public class BulkLoader {
    private static final String TOOL_NAME = "sstableloader";
    private static final String VERBOSE_OPTION = "verbose";
    private static final String HELP_OPTION = "help";
    private static final String NOPROGRESS_OPTION = "no-progress";
    private static final String IGNORE_NODES_OPTION = "ignore";
    private static final String INITIAL_HOST_ADDRESS_OPTION = "nodes";
    private static final String RPC_PORT_OPTION = "port";
    private static final String USER_OPTION = "username";
    private static final String PASSWD_OPTION = "password";
    private static final String THROTTLE_MBITS = "throttle";
    private static final String INTER_DC_THROTTLE_MBITS = "inter-dc-throttle";
    private static final String TRANSPORT_FACTORY = "transport-factory";
    private static final String SSL_TRUSTSTORE = "truststore";
    private static final String SSL_TRUSTSTORE_PW = "truststore-password";
    private static final String SSL_KEYSTORE = "keystore";
    private static final String SSL_KEYSTORE_PW = "keystore-password";
    private static final String SSL_PROTOCOL = "ssl-protocol";
    private static final String SSL_ALGORITHM = "ssl-alg";
    private static final String SSL_STORE_TYPE = "store-type";
    private static final String SSL_CIPHER_SUITES = "ssl-ciphers";
    private static final String CONNECTIONS_PER_HOST = "connections-per-host";
    private static final String CONFIG_PATH = "conf-path";

    public static void main(String[] args) {
        Config.setClientMode(true);
        LoaderOptions options = LoaderOptions.parseArgs(args);
        OutputHandler.SystemOutput handler = new OutputHandler.SystemOutput(options.verbose, options.debug);
        SSTableLoader loader = new SSTableLoader(options.directory, new ExternalClient(options.hosts, options.rpcPort, options.user, options.passwd, options.transportFactory, options.storagePort, options.sslStoragePort, options.serverEncOptions), handler, options.connectionsPerHost);
        DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(options.throttle);
        DatabaseDescriptor.setInterDCStreamThroughputOutboundMegabitsPerSec(options.interDcThrottle);
        StreamResultFuture future = null;
        ProgressIndicator indicator = new ProgressIndicator();
        try {
            future = options.noProgress ? loader.stream(options.ignores, new StreamEventHandler[0]) : loader.stream(options.ignores, indicator);
        }
        catch (Exception e) {
            JVMStabilityInspector.inspectThrowable(e);
            System.err.println(e.getMessage());
            if (e.getCause() != null) {
                System.err.println(e.getCause());
            }
            e.printStackTrace(System.err);
            System.exit(1);
        }
        try {
            future.get();
            if (!options.noProgress) {
                indicator.printSummary(options.connectionsPerHost);
            }
            Thread.sleep(1000L);
            System.exit(0);
        }
        catch (Exception e) {
            System.err.println("Streaming to the following hosts failed:");
            System.err.println(loader.getFailedHosts());
            e.printStackTrace(System.err);
            System.exit(1);
        }
    }

    public static class CmdLineOptions
    extends Options {
        public Options addOption(String opt, String longOpt, String argName, String description) {
            Option option = new Option(opt, longOpt, true, description);
            option.setArgName(argName);
            return this.addOption(option);
        }

        public Options addOption(String opt, String longOpt, String description) {
            return this.addOption(new Option(opt, longOpt, false, description));
        }
    }

    static class LoaderOptions {
        public final File directory;
        public boolean debug;
        public boolean verbose;
        public boolean noProgress;
        public int rpcPort = 9160;
        public String user;
        public String passwd;
        public int throttle = 0;
        public int interDcThrottle = 0;
        public int storagePort;
        public int sslStoragePort;
        public ITransportFactory transportFactory = new TFramedTransportFactory();
        public EncryptionOptions encOptions = new EncryptionOptions.ClientEncryptionOptions();
        public int connectionsPerHost = 1;
        public EncryptionOptions.ServerEncryptionOptions serverEncOptions = new EncryptionOptions.ServerEncryptionOptions();
        public final Set<InetAddress> hosts = new HashSet<InetAddress>();
        public final Set<InetAddress> ignores = new HashSet<InetAddress>();

        LoaderOptions(File directory) {
            this.directory = directory;
        }

        public static LoaderOptions parseArgs(String[] cmdArgs) {
            GnuParser parser = new GnuParser();
            CmdLineOptions options = LoaderOptions.getCmdLineOptions();
            try {
                Config config;
                String[] nodes;
                String dirname;
                File dir;
                String[] args;
                CommandLine cmd = parser.parse((Options)options, cmdArgs, false);
                if (cmd.hasOption(BulkLoader.HELP_OPTION)) {
                    LoaderOptions.printUsage(options);
                    System.exit(0);
                }
                if ((args = cmd.getArgs()).length == 0) {
                    System.err.println("Missing sstable directory argument");
                    LoaderOptions.printUsage(options);
                    System.exit(1);
                }
                if (args.length > 1) {
                    System.err.println("Too many arguments");
                    LoaderOptions.printUsage(options);
                    System.exit(1);
                }
                if (!(dir = new File(dirname = args[0])).exists()) {
                    LoaderOptions.errorMsg("Unknown directory: " + dirname, options);
                }
                if (!dir.isDirectory()) {
                    LoaderOptions.errorMsg(dirname + " is not a directory", options);
                }
                LoaderOptions opts = new LoaderOptions(dir);
                opts.verbose = cmd.hasOption(BulkLoader.VERBOSE_OPTION);
                opts.noProgress = cmd.hasOption(BulkLoader.NOPROGRESS_OPTION);
                if (cmd.hasOption(BulkLoader.RPC_PORT_OPTION)) {
                    opts.rpcPort = Integer.parseInt(cmd.getOptionValue(BulkLoader.RPC_PORT_OPTION));
                }
                if (cmd.hasOption(BulkLoader.USER_OPTION)) {
                    opts.user = cmd.getOptionValue(BulkLoader.USER_OPTION);
                }
                if (cmd.hasOption(BulkLoader.PASSWD_OPTION)) {
                    opts.passwd = cmd.getOptionValue(BulkLoader.PASSWD_OPTION);
                }
                if (cmd.hasOption(BulkLoader.INITIAL_HOST_ADDRESS_OPTION)) {
                    nodes = cmd.getOptionValue(BulkLoader.INITIAL_HOST_ADDRESS_OPTION).split(",");
                    try {
                        for (String node : nodes) {
                            opts.hosts.add(InetAddress.getByName(node.trim()));
                        }
                    }
                    catch (UnknownHostException e) {
                        LoaderOptions.errorMsg("Unknown host: " + e.getMessage(), options);
                    }
                } else {
                    System.err.println("Initial hosts must be specified (-d)");
                    LoaderOptions.printUsage(options);
                    System.exit(1);
                }
                if (cmd.hasOption(BulkLoader.IGNORE_NODES_OPTION)) {
                    nodes = cmd.getOptionValue(BulkLoader.IGNORE_NODES_OPTION).split(",");
                    try {
                        for (String node : nodes) {
                            opts.ignores.add(InetAddress.getByName(node.trim()));
                        }
                    }
                    catch (UnknownHostException e) {
                        LoaderOptions.errorMsg("Unknown host: " + e.getMessage(), options);
                    }
                }
                if (cmd.hasOption(BulkLoader.CONNECTIONS_PER_HOST)) {
                    opts.connectionsPerHost = Integer.parseInt(cmd.getOptionValue(BulkLoader.CONNECTIONS_PER_HOST));
                }
                if (cmd.hasOption(BulkLoader.CONFIG_PATH)) {
                    File configFile = new File(cmd.getOptionValue(BulkLoader.CONFIG_PATH));
                    if (!configFile.exists()) {
                        LoaderOptions.errorMsg("Config file not found", options);
                    }
                    config = new YamlConfigurationLoader().loadConfig(configFile.toURI().toURL());
                } else {
                    config = new Config();
                    config.stream_throughput_outbound_megabits_per_sec = 0;
                    config.inter_dc_stream_throughput_outbound_megabits_per_sec = 0;
                }
                opts.storagePort = config.storage_port;
                opts.sslStoragePort = config.ssl_storage_port;
                opts.throttle = config.stream_throughput_outbound_megabits_per_sec;
                opts.interDcThrottle = config.inter_dc_stream_throughput_outbound_megabits_per_sec;
                opts.encOptions = config.client_encryption_options;
                opts.serverEncOptions = config.server_encryption_options;
                if (cmd.hasOption(BulkLoader.THROTTLE_MBITS)) {
                    opts.throttle = Integer.parseInt(cmd.getOptionValue(BulkLoader.THROTTLE_MBITS));
                }
                if (cmd.hasOption(BulkLoader.INTER_DC_THROTTLE_MBITS)) {
                    opts.interDcThrottle = Integer.parseInt(cmd.getOptionValue(BulkLoader.INTER_DC_THROTTLE_MBITS));
                }
                if (cmd.hasOption(BulkLoader.SSL_TRUSTSTORE)) {
                    opts.encOptions.truststore = cmd.getOptionValue(BulkLoader.SSL_TRUSTSTORE);
                }
                if (cmd.hasOption(BulkLoader.SSL_TRUSTSTORE_PW)) {
                    opts.encOptions.truststore_password = cmd.getOptionValue(BulkLoader.SSL_TRUSTSTORE_PW);
                }
                if (cmd.hasOption(BulkLoader.SSL_KEYSTORE)) {
                    opts.encOptions.keystore = cmd.getOptionValue(BulkLoader.SSL_KEYSTORE);
                    opts.encOptions.require_client_auth = true;
                }
                if (cmd.hasOption(BulkLoader.SSL_KEYSTORE_PW)) {
                    opts.encOptions.keystore_password = cmd.getOptionValue(BulkLoader.SSL_KEYSTORE_PW);
                }
                if (cmd.hasOption(BulkLoader.SSL_PROTOCOL)) {
                    opts.encOptions.protocol = cmd.getOptionValue(BulkLoader.SSL_PROTOCOL);
                }
                if (cmd.hasOption(BulkLoader.SSL_ALGORITHM)) {
                    opts.encOptions.algorithm = cmd.getOptionValue(BulkLoader.SSL_ALGORITHM);
                }
                if (cmd.hasOption(BulkLoader.SSL_STORE_TYPE)) {
                    opts.encOptions.store_type = cmd.getOptionValue(BulkLoader.SSL_STORE_TYPE);
                }
                if (cmd.hasOption(BulkLoader.SSL_CIPHER_SUITES)) {
                    opts.encOptions.cipher_suites = cmd.getOptionValue(BulkLoader.SSL_CIPHER_SUITES).split(",");
                }
                if (cmd.hasOption(BulkLoader.TRANSPORT_FACTORY)) {
                    ITransportFactory transportFactory = LoaderOptions.getTransportFactory(cmd.getOptionValue(BulkLoader.TRANSPORT_FACTORY));
                    LoaderOptions.configureTransportFactory(transportFactory, opts);
                    opts.transportFactory = transportFactory;
                }
                return opts;
            }
            catch (MalformedURLException | ConfigurationException | ParseException e) {
                LoaderOptions.errorMsg(e.getMessage(), options);
                return null;
            }
        }

        private static ITransportFactory getTransportFactory(String transportFactory) {
            try {
                Class<?> factory = Class.forName(transportFactory);
                if (!ITransportFactory.class.isAssignableFrom(factory)) {
                    throw new IllegalArgumentException(String.format("transport factory '%s' not derived from ITransportFactory", transportFactory));
                }
                return (ITransportFactory)factory.newInstance();
            }
            catch (Exception e) {
                throw new IllegalArgumentException(String.format("Cannot create a transport factory '%s'.", transportFactory), e);
            }
        }

        private static void configureTransportFactory(ITransportFactory transportFactory, LoaderOptions opts) {
            HashMap<String, String> options = new HashMap<String, String>();
            if (transportFactory.supportedOptions().contains("enc.truststore")) {
                options.put("enc.truststore", opts.encOptions.truststore);
            }
            if (transportFactory.supportedOptions().contains("enc.truststore.password")) {
                options.put("enc.truststore.password", opts.encOptions.truststore_password);
            }
            if (transportFactory.supportedOptions().contains("enc.protocol")) {
                options.put("enc.protocol", opts.encOptions.protocol);
            }
            if (transportFactory.supportedOptions().contains("enc.cipher.suites")) {
                options.put("enc.cipher.suites", Joiner.on((char)',').join((Object[])opts.encOptions.cipher_suites));
            }
            if (transportFactory.supportedOptions().contains("enc.keystore") && opts.encOptions.require_client_auth) {
                options.put("enc.keystore", opts.encOptions.keystore);
            }
            if (transportFactory.supportedOptions().contains("enc.keystore.password") && opts.encOptions.require_client_auth) {
                options.put("enc.keystore.password", opts.encOptions.keystore_password);
            }
            for (String optionKey : transportFactory.supportedOptions()) {
                if (System.getProperty(optionKey) == null) continue;
                options.put(optionKey, System.getProperty(optionKey));
            }
            transportFactory.setOptions(options);
        }

        private static void errorMsg(String msg, CmdLineOptions options) {
            System.err.println(msg);
            LoaderOptions.printUsage(options);
            System.exit(1);
        }

        private static CmdLineOptions getCmdLineOptions() {
            CmdLineOptions options = new CmdLineOptions();
            options.addOption("v", BulkLoader.VERBOSE_OPTION, "verbose output");
            options.addOption("h", BulkLoader.HELP_OPTION, "display this help message");
            options.addOption(null, BulkLoader.NOPROGRESS_OPTION, "don't display progress");
            options.addOption("i", BulkLoader.IGNORE_NODES_OPTION, "NODES", "don't stream to this (comma separated) list of nodes");
            options.addOption("d", BulkLoader.INITIAL_HOST_ADDRESS_OPTION, "initial hosts", "Required. try to connect to these hosts (comma separated) initially for ring information");
            options.addOption("p", BulkLoader.RPC_PORT_OPTION, "rpc port", "port used for rpc (default 9160)");
            options.addOption("t", BulkLoader.THROTTLE_MBITS, BulkLoader.THROTTLE_MBITS, "throttle speed in Mbits (default unlimited)");
            options.addOption("idct", BulkLoader.INTER_DC_THROTTLE_MBITS, BulkLoader.INTER_DC_THROTTLE_MBITS, "inter-datacenter throttle speed in Mbits (default unlimited)");
            options.addOption("u", BulkLoader.USER_OPTION, BulkLoader.USER_OPTION, "username for cassandra authentication");
            options.addOption("pw", BulkLoader.PASSWD_OPTION, BulkLoader.PASSWD_OPTION, "password for cassandra authentication");
            options.addOption("tf", BulkLoader.TRANSPORT_FACTORY, "transport factory", "Fully-qualified ITransportFactory class name for creating a connection to cassandra");
            options.addOption("cph", BulkLoader.CONNECTIONS_PER_HOST, "connectionsPerHost", "number of concurrent connections-per-host.");
            options.addOption("ts", BulkLoader.SSL_TRUSTSTORE, "TRUSTSTORE", "Client SSL: full path to truststore");
            options.addOption("tspw", BulkLoader.SSL_TRUSTSTORE_PW, "TRUSTSTORE-PASSWORD", "Client SSL: password of the truststore");
            options.addOption("ks", BulkLoader.SSL_KEYSTORE, "KEYSTORE", "Client SSL: full path to keystore");
            options.addOption("kspw", BulkLoader.SSL_KEYSTORE_PW, "KEYSTORE-PASSWORD", "Client SSL: password of the keystore");
            options.addOption("prtcl", BulkLoader.SSL_PROTOCOL, "PROTOCOL", "Client SSL: connections protocol to use (default: TLS)");
            options.addOption("alg", BulkLoader.SSL_ALGORITHM, "ALGORITHM", "Client SSL: algorithm (default: SunX509)");
            options.addOption("st", BulkLoader.SSL_STORE_TYPE, "STORE-TYPE", "Client SSL: type of store");
            options.addOption("ciphers", BulkLoader.SSL_CIPHER_SUITES, "CIPHER-SUITES", "Client SSL: comma-separated list of encryption suites to use");
            options.addOption("f", BulkLoader.CONFIG_PATH, "path to config file", "cassandra.yaml file path for streaming throughput and client/server SSL.");
            return options;
        }

        public static void printUsage(Options options) {
            String usage = String.format("%s [options] <dir_path>", BulkLoader.TOOL_NAME);
            String header = System.lineSeparator() + "Bulk load the sstables found in the directory <dir_path> to the configured cluster.The parent directories of <dir_path> are used as the target keyspace/table name. So for instance, to load an sstable named Standard1-g-1-Data.db into Keyspace1/Standard1, you will need to have the files Standard1-g-1-Data.db and Standard1-g-1-Index.db into a directory /path/to/Keyspace1/Standard1/.";
            String footer = System.lineSeparator() + "You can provide cassandra.yaml file with -f command line option to set up streaming throughput, client and server encryption options. Only stream_throughput_outbound_megabits_per_sec, inter_dc_stream_throughput_outbound_megabits_per_sec, server_encryption_options and client_encryption_options are read from yaml. You can override options read from cassandra.yaml with corresponding command line options.";
            new HelpFormatter().printHelp(usage, header, options, footer);
        }
    }

    public static class ExternalClient
    extends SSTableLoader.Client {
        private final Map<String, CFMetaData> knownCfs = new HashMap<String, CFMetaData>();
        private final Set<InetAddress> hosts;
        private final int rpcPort;
        private final String user;
        private final String passwd;
        private final ITransportFactory transportFactory;
        private final int storagePort;
        private final int sslStoragePort;
        private final EncryptionOptions.ServerEncryptionOptions serverEncOptions;

        public ExternalClient(Set<InetAddress> hosts, int port, String user, String passwd, ITransportFactory transportFactory, int storagePort, int sslStoragePort, EncryptionOptions.ServerEncryptionOptions serverEncryptionOptions) {
            this.hosts = hosts;
            this.rpcPort = port;
            this.user = user;
            this.passwd = passwd;
            this.transportFactory = transportFactory;
            this.storagePort = storagePort;
            this.sslStoragePort = sslStoragePort;
            this.serverEncOptions = serverEncryptionOptions;
        }

        @Override
        public void init(String keyspace) {
            Iterator<InetAddress> hostiter = this.hosts.iterator();
            while (hostiter.hasNext()) {
                try {
                    InetAddress host = hostiter.next();
                    Cassandra.Client client = ExternalClient.createThriftClient(host.getHostAddress(), this.rpcPort, this.user, this.passwd, this.transportFactory);
                    this.setPartitioner(client.describe_partitioner());
                    Token.TokenFactory tkFactory = this.getPartitioner().getTokenFactory();
                    for (TokenRange tr : client.describe_ring(keyspace)) {
                        Range<Token> range = new Range<Token>(tkFactory.fromString(tr.start_token), tkFactory.fromString(tr.end_token), this.getPartitioner());
                        for (String ep : tr.endpoints) {
                            this.addRangeForEndpoint(range, InetAddress.getByName(ep));
                        }
                    }
                    String cfQuery = String.format("SELECT %s FROM %s.%s WHERE keyspace_name = '%s'", StringUtils.join(this.getCFColumnsWithoutCollections(), (String)","), "system", "schema_columnfamilies", keyspace);
                    CqlResult cfRes = client.execute_cql3_query(ByteBufferUtil.bytes(cfQuery), Compression.NONE, ConsistencyLevel.ONE);
                    for (CqlRow row : cfRes.rows) {
                        String columnFamily = UTF8Type.instance.getString(((Column)row.columns.get(1)).bufferForName());
                        String columnsQuery = String.format("SELECT * FROM %s.%s WHERE keyspace_name = '%s' AND columnfamily_name = '%s'", "system", "schema_columns", keyspace, columnFamily);
                        CqlResult columnsRes = client.execute_cql3_query(ByteBufferUtil.bytes(columnsQuery), Compression.NONE, ConsistencyLevel.ONE);
                        CFMetaData metadata = CFMetaData.fromThriftCqlRow(row, columnsRes);
                        this.knownCfs.put(metadata.cfName, metadata);
                    }
                    break;
                }
                catch (Exception e) {
                    if (hostiter.hasNext()) continue;
                    throw new RuntimeException("Could not retrieve endpoint ranges: ", e);
                }
            }
        }

        List<String> getCFColumnsWithoutCollections() {
            Iterator<ColumnDefinition> allColumns = CFMetaData.SchemaColumnFamiliesCf.allColumnsInSelectOrder();
            ArrayList<String> selectedColumns = new ArrayList<String>();
            while (allColumns.hasNext()) {
                ColumnDefinition def = allColumns.next();
                if (def.type.isCollection()) continue;
                selectedColumns.add(UTF8Type.instance.getString(def.name.bytes));
            }
            return selectedColumns;
        }

        @Override
        public StreamConnectionFactory getConnectionFactory() {
            return new BulkLoadConnectionFactory(this.storagePort, this.sslStoragePort, this.serverEncOptions, false);
        }

        @Override
        public CFMetaData getCFMetaData(String keyspace, String cfName) {
            return this.knownCfs.get(cfName);
        }

        private static Cassandra.Client createThriftClient(String host, int port, String user, String passwd, ITransportFactory transportFactory) throws Exception {
            TTransport trans = transportFactory.openTransport(host, port);
            TBinaryProtocol protocol = new TBinaryProtocol(trans);
            Cassandra.Client client = new Cassandra.Client((TProtocol)protocol);
            if (user != null && passwd != null) {
                HashMap<String, String> credentials = new HashMap<String, String>();
                credentials.put(BulkLoader.USER_OPTION, user);
                credentials.put(BulkLoader.PASSWD_OPTION, passwd);
                AuthenticationRequest authenticationRequest = new AuthenticationRequest(credentials);
                client.login(authenticationRequest);
            }
            return client;
        }
    }

    static class ProgressIndicator
    implements StreamEventHandler {
        private long start;
        private long lastProgress;
        private long lastTime;
        private int peak = 0;
        private int totalFiles = 0;
        private final Multimap<InetAddress, SessionInfo> sessionsByHost = HashMultimap.create();

        public ProgressIndicator() {
            this.start = this.lastTime = System.nanoTime();
        }

        public void onSuccess(StreamState finalState) {
        }

        public void onFailure(Throwable t) {
        }

        @Override
        public synchronized void handleStreamEvent(StreamEvent event) {
            if (event.eventType == StreamEvent.Type.STREAM_PREPARED) {
                SessionInfo session = ((StreamEvent.SessionPreparedEvent)event).session;
                this.sessionsByHost.put((Object)session.peer, (Object)session);
            } else if (event.eventType == StreamEvent.Type.FILE_PROGRESS || event.eventType == StreamEvent.Type.STREAM_COMPLETE) {
                ProgressInfo progressInfo = null;
                if (event.eventType == StreamEvent.Type.FILE_PROGRESS) {
                    progressInfo = ((StreamEvent.ProgressEvent)event).progress;
                }
                long time = System.nanoTime();
                long deltaTime = time - this.lastTime;
                StringBuilder sb = new StringBuilder();
                sb.append("\rprogress: ");
                long totalProgress = 0L;
                long totalSize = 0L;
                boolean updateTotalFiles = this.totalFiles == 0;
                for (InetAddress peer : this.sessionsByHost.keySet()) {
                    sb.append("[").append(peer.toString()).append("]");
                    for (SessionInfo session : this.sessionsByHost.get((Object)peer)) {
                        long size = session.getTotalSizeToSend();
                        long current = 0L;
                        int completed = 0;
                        if (progressInfo != null && session.peer.equals(progressInfo.peer) && session.sessionIndex == progressInfo.sessionIndex) {
                            session.updateProgress(progressInfo);
                        }
                        for (ProgressInfo progress : session.getSendingFiles()) {
                            if (progress.isCompleted()) {
                                ++completed;
                            }
                            current += progress.currentBytes;
                        }
                        totalProgress += current;
                        totalSize += size;
                        sb.append(session.sessionIndex).append(":");
                        sb.append(completed).append("/").append(session.getTotalFilesToSend());
                        sb.append(" ").append(String.format("%-3d", size == 0L ? 100L : current * 100L / size)).append("% ");
                        if (!updateTotalFiles) continue;
                        this.totalFiles = (int)((long)this.totalFiles + session.getTotalFilesToSend());
                    }
                }
                this.lastTime = time;
                long deltaProgress = totalProgress - this.lastProgress;
                this.lastProgress = totalProgress;
                sb.append("total: ").append(totalSize == 0L ? 100L : totalProgress * 100L / totalSize).append("% ");
                sb.append(String.format("%-3d", this.mbPerSec(deltaProgress, deltaTime))).append("MB/s");
                int average = this.mbPerSec(totalProgress, time - this.start);
                if (average > this.peak) {
                    this.peak = average;
                }
                sb.append("(avg: ").append(average).append(" MB/s)");
                System.out.print(sb.toString());
            }
        }

        private int mbPerSec(long bytes, long timeInNano) {
            double bytesPerNano = (double)bytes / (double)timeInNano;
            return (int)(bytesPerNano * 1000.0 * 1000.0 * 1000.0 / 1048576.0);
        }

        private void printSummary(int connectionsPerHost) {
            long end = System.nanoTime();
            long durationMS = (end - this.start) / 1000000L;
            int average = this.mbPerSec(this.lastProgress, end - this.start);
            StringBuilder sb = new StringBuilder();
            sb.append("\nSummary statistics: \n");
            sb.append(String.format("   %-30s: %-10d%n", "Connections per host: ", connectionsPerHost));
            sb.append(String.format("   %-30s: %-10d%n", "Total files transferred: ", this.totalFiles));
            sb.append(String.format("   %-30s: %-10d%n", "Total bytes transferred: ", this.lastProgress));
            sb.append(String.format("   %-30s: %-10d%n", "Total duration (ms): ", durationMS));
            sb.append(String.format("   %-30s: %-10d%n", "Average transfer rate (MB/s): ", average));
            sb.append(String.format("   %-30s: %-10d%n", "Peak transfer rate (MB/s): ", this.peak));
            System.out.println(sb.toString());
        }
    }
}

