/*
 * Decompiled with CFR 0.152.
 */
package org.kitesdk.data.spi.filesystem;

import java.io.IOException;
import java.util.concurrent.Callable;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.kitesdk.data.DatasetIOException;
import org.kitesdk.data.Formats;
import org.kitesdk.data.MiniDFSTest;
import org.kitesdk.data.TestHelpers;
import org.kitesdk.data.spi.filesystem.AvroAppender;
import org.kitesdk.data.spi.filesystem.FileSystemDatasetReader;

public class TestAvroAppender
extends MiniDFSTest {
    private static final Schema schema = Schema.create((Schema.Type)Schema.Type.STRING);

    @Test
    public void testAvroSyncDFS() throws Exception {
        String s;
        int i;
        String auth = TestAvroAppender.getDFS().getUri().getAuthority();
        final FileSystem fs = TestAvroAppender.getDFS();
        final Path path = new Path("hdfs://" + auth + "/tmp/test.avro");
        AvroAppender appender = new AvroAppender(fs, path, schema, Formats.AVRO.getDefaultCompressionType());
        appender.open();
        for (i = 0; i < 10; ++i) {
            s = "string-" + i;
            appender.append((Object)s);
        }
        TestHelpers.assertThrows("Should not be able to read file, nothing written", DatasetIOException.class, new Callable<Void>(){

            @Override
            public Void call() throws IOException {
                TestAvroAppender.this.count(fs, path);
                return null;
            }
        });
        appender.flush();
        appender.sync();
        Assert.assertEquals((String)"Should find the first 10 records", (long)10L, (long)this.count(fs, path));
        for (i = 10; i < 20; ++i) {
            s = "string-" + i;
            appender.append((Object)s);
        }
        Assert.assertEquals((String)"Newly written records should still be buffered", (long)10L, (long)this.count(fs, path));
        appender.close();
        appender.cleanup();
        Assert.assertEquals((String)"All records should be found after close", (long)20L, (long)this.count(fs, path));
    }

    @Test
    @Ignore(value="LocalFileSystem is broken!?")
    public void testAvroSyncLocalFS() throws Exception {
        String s;
        int i;
        final FileSystem fs = TestAvroAppender.getFS();
        final Path path = new Path("file:/tmp/test.avro");
        AvroAppender appender = new AvroAppender(fs, path, schema, Formats.AVRO.getDefaultCompressionType());
        appender.open();
        for (i = 0; i < 10; ++i) {
            s = "string-" + i;
            appender.append((Object)s);
        }
        TestHelpers.assertThrows("Should not be able to read file, nothing written", DatasetIOException.class, new Callable<Void>(){

            @Override
            public Void call() throws IOException {
                TestAvroAppender.this.count(fs, path);
                return null;
            }
        });
        appender.flush();
        appender.sync();
        Assert.assertEquals((String)"Should find the first 10 records", (long)10L, (long)this.count(fs, path));
        for (i = 10; i < 20; ++i) {
            s = "string-" + i;
            appender.append((Object)s);
        }
        Assert.assertEquals((String)"Newly written records should still be buffered", (long)10L, (long)this.count(fs, path));
        appender.close();
        appender.cleanup();
        Assert.assertEquals((String)"All records should be found after close", (long)20L, (long)this.count(fs, path));
    }

    public int count(FileSystem fs, Path path) {
        FileSystemDatasetReader reader = new FileSystemDatasetReader(fs, path, schema, String.class);
        int count = 0;
        reader.initialize();
        for (String s : reader) {
            ++count;
            System.err.println(s);
        }
        reader.close();
        return count;
    }
}

