diff --git a/distributedlog-io/dlfs/pom.xml b/distributedlog-io/dlfs/pom.xml
new file mode 100644
index 000000000..775f07be1
--- /dev/null
+++ b/distributedlog-io/dlfs/pom.xml
@@ -0,0 +1,117 @@
+
+
+
+ 4.0.0
+
+ distributedlog
+ org.apache.distributedlog
+ 0.6.0-SNAPSHOT
+ ../..
+
+ org.apache.distributedlog
+ dlfs
+ Apache DistributedLog :: IO :: FileSystem
+ http://maven.apache.org
+
+ UTF-8
+ ${basedir}/lib
+
+
+
+ org.projectlombok
+ lombok
+ ${lombok.version}
+ provided
+
+
+ org.apache.distributedlog
+ distributedlog-core
+ ${project.parent.version}
+
+
+ org.apache.hadoop
+ hadoop-common
+ 2.7.2
+
+
+ com.google.protobuf
+ protobuf-java
+
+
+ com.google.guava
+ guava
+
+
+
+
+ junit
+ junit
+ ${junit.version}
+ test
+
+
+ org.apache.distributedlog
+ distributedlog-core
+ ${project.parent.version}
+ tests
+ test
+
+
+
+
+
+ org.codehaus.mojo
+ findbugs-maven-plugin
+
+
+ org.apache.maven.plugins
+ maven-checkstyle-plugin
+ ${maven-checkstyle-plugin.version}
+
+
+ com.puppycrawl.tools
+ checkstyle
+ ${puppycrawl.checkstyle.version}
+
+
+ org.apache.distributedlog
+ distributedlog-build-tools
+ ${project.version}
+
+
+
+ distributedlog/checkstyle.xml
+ distributedlog/suppressions.xml
+ true
+ true
+ false
+ true
+
+
+
+ test-compile
+
+ check
+
+
+
+
+
+
+
diff --git a/distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/DLFileSystem.java b/distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/DLFileSystem.java
new file mode 100644
index 000000000..0670a4a31
--- /dev/null
+++ b/distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/DLFileSystem.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.fs;
+
+import com.google.common.collect.Lists;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.URI;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.DistributedLogConstants;
+import org.apache.distributedlog.api.AsyncLogWriter;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.LogReader;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+import org.apache.distributedlog.exceptions.LogEmptyException;
+import org.apache.distributedlog.exceptions.LogNotFoundException;
+import org.apache.distributedlog.util.Utils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BufferedFSInputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * A FileSystem Implementation powered by replicated logs.
+ */
+@Slf4j
+public class DLFileSystem extends FileSystem {
+
+ //
+ // Settings
+ //
+
+ public static final String DLFS_CONF_FILE = "dlog.configuration.file";
+
+
+ private URI rootUri;
+ private Namespace namespace;
+ private final DistributedLogConfiguration dlConf;
+ private Path workingDir;
+
+ public DLFileSystem() {
+ this.dlConf = new DistributedLogConfiguration();
+ setWorkingDirectory(new Path(System.getProperty("user.dir", "")));
+ }
+
+ @Override
+ public URI getUri() {
+ return rootUri;
+ }
+
+ //
+ // Initialization
+ //
+
+ @Override
+ public void initialize(URI name, Configuration conf) throws IOException {
+ super.initialize(name, conf);
+ setConf(conf);
+
+ // initialize
+
+ this.rootUri = name;
+ // load the configuration
+ String dlConfLocation = conf.get(DLFS_CONF_FILE);
+ if (null != dlConfLocation) {
+ try {
+ this.dlConf.loadConf(new File(dlConfLocation).toURI().toURL());
+ log.info("Loaded the distributedlog configuration from {}", dlConfLocation);
+ } catch (ConfigurationException e) {
+ log.error("Failed to load the distributedlog configuration from " + dlConfLocation, e);
+ throw new IOException("Failed to load distributedlog configuration from " + dlConfLocation);
+ }
+ }
+ log.info("Initializing the filesystem at {}", name);
+ // initialize the namespace
+ this.namespace = NamespaceBuilder.newBuilder()
+ .clientId("dlfs-client-" + InetAddress.getLocalHost().getHostName())
+ .conf(dlConf)
+ .regionId(DistributedLogConstants.LOCAL_REGION_ID)
+ .uri(name)
+ .build();
+ log.info("Initialized the filesystem at {}", name);
+ }
+
+ @Override
+ public void close() throws IOException {
+ // clean up the resource
+ namespace.close();
+ super.close();
+ }
+
+ //
+ // Util Functions
+ //
+
+ private Path makeAbsolute(Path f) {
+ if (f.isAbsolute()) {
+ return f;
+ } else {
+ return new Path(workingDir, f);
+ }
+ }
+
+ private String getStreamName(Path relativePath) {
+ return makeAbsolute(relativePath).toUri().getPath().substring(1);
+ }
+
+ //
+ // Home & Working Directory
+ //
+
+ @Override
+ public Path getHomeDirectory() {
+ return this.makeQualified(new Path(System.getProperty("user.home", "")));
+ }
+
+ protected Path getInitialWorkingDirectory() {
+ return this.makeQualified(new Path(System.getProperty("user.dir", "")));
+ }
+
+ @Override
+ public void setWorkingDirectory(Path path) {
+ workingDir = makeAbsolute(path);
+ checkPath(workingDir);
+ }
+
+ @Override
+ public Path getWorkingDirectory() {
+ return workingDir;
+ }
+
+
+ @Override
+ public FSDataInputStream open(Path path, int bufferSize)
+ throws IOException {
+ try {
+ DistributedLogManager dlm = namespace.openLog(getStreamName(path));
+ LogReader reader;
+ try {
+ reader = dlm.openLogReader(DLSN.InitialDLSN);
+ } catch (LogNotFoundException lnfe) {
+ throw new FileNotFoundException(path.toString());
+ } catch (LogEmptyException lee) {
+ throw new FileNotFoundException(path.toString());
+ }
+ return new FSDataInputStream(
+ new BufferedFSInputStream(
+ new DLInputStream(dlm, reader, 0L),
+ bufferSize));
+ } catch (LogNotFoundException e) {
+ throw new FileNotFoundException(path.toString());
+ }
+ }
+
+ @Override
+ public FSDataOutputStream create(Path path,
+ FsPermission fsPermission,
+ boolean overwrite,
+ int bufferSize,
+ short replication,
+ long blockSize,
+ Progressable progressable) throws IOException {
+ // for overwrite, delete the existing file first.
+ if (overwrite) {
+ delete(path, false);
+ }
+
+ DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
+ confLocal.addConfiguration(dlConf);
+ confLocal.setEnsembleSize(replication);
+ confLocal.setWriteQuorumSize(replication);
+ confLocal.setAckQuorumSize(replication);
+ confLocal.setMaxLogSegmentBytes(blockSize);
+ return append(path, bufferSize, Optional.of(confLocal));
+ }
+
+ @Override
+ public FSDataOutputStream append(Path path,
+ int bufferSize,
+ Progressable progressable) throws IOException {
+ return append(path, bufferSize, Optional.empty());
+ }
+
+ private FSDataOutputStream append(Path path,
+ int bufferSize,
+ Optional confLocal)
+ throws IOException {
+ try {
+ DistributedLogManager dlm = namespace.openLog(
+ getStreamName(path),
+ confLocal,
+ Optional.empty(),
+ Optional.empty());
+ AsyncLogWriter writer = Utils.ioResult(dlm.openAsyncLogWriter());
+ return new FSDataOutputStream(
+ new BufferedOutputStream(
+ new DLOutputStream(dlm, writer), bufferSize
+ ),
+ statistics,
+ writer.getLastTxId() < 0L ? 0L : writer.getLastTxId());
+ } catch (LogNotFoundException le) {
+ throw new FileNotFoundException(path.toString());
+ }
+ }
+
+ @Override
+ public boolean delete(Path path, boolean recursive) throws IOException {
+ try {
+ String logName = getStreamName(path);
+ if (recursive) {
+ Iterator logs = namespace.getLogs(logName);
+ while (logs.hasNext()) {
+ String child = logs.next();
+ Path childPath = new Path(path, child);
+ delete(childPath, recursive);
+ }
+ }
+ namespace.deleteLog(logName);
+ return true;
+ } catch (LogNotFoundException e) {
+ return true;
+ }
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path path) throws FileNotFoundException, IOException {
+ String logName = getStreamName(path);
+ try {
+ Iterator logs = namespace.getLogs(logName);
+ List statusList = Lists.newArrayList();
+ while (logs.hasNext()) {
+ String child = logs.next();
+ Path childPath = new Path(path, child);
+ statusList.add(getFileStatus(childPath));
+ }
+ Collections.sort(statusList, Comparator.comparing(FileStatus::getPath));
+ return statusList.toArray(new FileStatus[statusList.size()]);
+ } catch (LogNotFoundException e) {
+ throw new FileNotFoundException(path.toString());
+ }
+ }
+
+
+ @Override
+ public boolean mkdirs(Path path, FsPermission fsPermission) throws IOException {
+ String streamName = getStreamName(path);
+
+ // Create a dummy stream to make the path exists.
+ namespace.createLog(streamName);
+ return true;
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path path) throws IOException {
+ String logName = getStreamName(path);
+ boolean exists = namespace.logExists(logName);
+ if (!exists) {
+ throw new FileNotFoundException(path.toString());
+ }
+
+ long endPos;
+ try {
+ DistributedLogManager dlm = namespace.openLog(logName);
+ endPos = dlm.getLastTxId();
+ } catch (LogNotFoundException e) {
+ throw new FileNotFoundException(path.toString());
+ } catch (LogEmptyException e) {
+ endPos = 0L;
+ }
+
+ // we need to store more metadata information on logs for supporting filesystem-like use cases
+ return new FileStatus(
+ endPos,
+ false,
+ 3,
+ dlConf.getMaxLogSegmentBytes(),
+ 0L,
+ makeAbsolute(path));
+ }
+
+
+ @Override
+ public boolean rename(Path src, Path dst) throws IOException {
+ String srcLog = getStreamName(src);
+ String dstLog = getStreamName(dst);
+ namespace.renameLog(srcLog, dstLog);
+ return true;
+ }
+
+ //
+ // Not Supported
+ //
+
+ @Override
+ public boolean truncate(Path f, long newLength) throws IOException {
+ throw new UnsupportedOperationException("Truncate is not supported yet");
+ }
+}
diff --git a/distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/DLInputStream.java b/distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/DLInputStream.java
new file mode 100644
index 000000000..c5a810fff
--- /dev/null
+++ b/distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/DLInputStream.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.distributedlog.fs;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.IOException;
+import java.io.InputStream;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.distributedlog.LogRecordWithDLSN;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.LogReader;
+import org.apache.hadoop.fs.FSInputStream;
+
+/**
+ * The input stream for a distributedlog stream.
+ */
+@Slf4j
+class DLInputStream extends FSInputStream {
+
+ private static final long REOPEN_READER_SKIP_BYTES = 4 * 1024 * 1024; // 4MB
+
+ private static class RecordStream {
+
+ private final InputStream payloadStream;
+ private final LogRecordWithDLSN record;
+
+ RecordStream(LogRecordWithDLSN record) {
+ checkNotNull(record);
+
+ this.record = record;
+ this.payloadStream = record.getPayLoadInputStream();
+ }
+
+ }
+
+ private static RecordStream nextRecordStream(LogReader reader) throws IOException {
+ LogRecordWithDLSN record = reader.readNext(false);
+ if (null != record) {
+ return new RecordStream(record);
+ }
+ return null;
+ }
+
+ private final DistributedLogManager dlm;
+ private LogReader reader;
+ private long pos;
+ private long lastPos;
+ private RecordStream currentRecord = null;
+
+ DLInputStream(DistributedLogManager dlm,
+ LogReader reader,
+ long startPos)
+ throws IOException {
+ this.dlm = dlm;
+ this.reader = reader;
+ this.pos = startPos;
+ this.lastPos = readEndPos();
+ seek(startPos);
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ dlm.close();
+ }
+
+ private long readEndPos() throws IOException {
+ return dlm.getLastTxId();
+ }
+
+ //
+ // FSInputStream
+ //
+
+ @Override
+ public void seek(long pos) throws IOException {
+ if (this.pos == pos) {
+ return;
+ }
+
+ if (this.pos > pos || (pos - this.pos) >= REOPEN_READER_SKIP_BYTES) {
+ // close the previous reader
+ this.reader.close();
+ this.reader = dlm.openLogReader(pos);
+ this.currentRecord = null;
+ }
+
+ skipTo(pos);
+ }
+
+ private boolean skipTo(final long position) throws IOException {
+ while (true) {
+ if (null == currentRecord) {
+ currentRecord = nextRecordStream(reader);
+ }
+
+ if (null == currentRecord) { // the stream is empty now
+ return false;
+ }
+
+ long endPos = currentRecord.record.getTransactionId();
+ if (endPos < position) {
+ currentRecord = nextRecordStream(reader);
+ this.pos = endPos;
+ continue;
+ } else if (endPos == position){
+ // find the record, but we defer read next record when actual read happens
+ this.pos = position;
+ this.currentRecord = null;
+ return true;
+ } else {
+ this.currentRecord.payloadStream.skip(
+ this.currentRecord.payloadStream.available() - (endPos - position));
+ this.pos = position;
+ return true;
+ }
+ }
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return this.pos;
+ }
+
+ @Override
+ public boolean seekToNewSource(long targetPos) throws IOException {
+ return false;
+ }
+
+ //
+ // Input Stream
+ //
+
+ @Override
+ public int read(byte[] b, final int off, final int len) throws IOException {
+ int remaining = len;
+ int numBytesRead = 0;
+ while (remaining > 0) {
+ if (null == currentRecord) {
+ currentRecord = nextRecordStream(reader);
+ }
+
+ if (null == currentRecord) {
+ if (numBytesRead == 0) {
+ return -1;
+ }
+ break;
+ }
+
+ int bytesLeft = currentRecord.payloadStream.available();
+ if (bytesLeft <= 0) {
+ currentRecord.payloadStream.close();
+ currentRecord = null;
+ continue;
+ }
+
+ int numBytesToRead = Math.min(bytesLeft, remaining);
+ int numBytes = currentRecord.payloadStream.read(b, off + numBytesRead, numBytesToRead);
+ if (numBytes < 0) {
+ continue;
+ }
+ numBytesRead += numBytes;
+ remaining -= numBytes;
+ }
+ return numBytesRead;
+ }
+
+ @Override
+ public long skip(final long n) throws IOException {
+ if (n <= 0L) {
+ return 0L;
+ }
+
+ long remaining = n;
+ while (true) {
+ if (null == currentRecord) {
+ currentRecord = nextRecordStream(reader);
+ }
+
+ if (null == currentRecord) { // end of stream
+ return n - remaining;
+ }
+
+ int bytesLeft = currentRecord.payloadStream.available();
+ long endPos = currentRecord.record.getTransactionId();
+ if (remaining > bytesLeft) {
+ // skip the whole record
+ remaining -= bytesLeft;
+ this.pos = endPos;
+ this.currentRecord = nextRecordStream(reader);
+ continue;
+ } else if (remaining == bytesLeft) {
+ this.pos = endPos;
+ this.currentRecord = null;
+ return n;
+ } else {
+ currentRecord.payloadStream.skip(remaining);
+ this.pos = endPos - currentRecord.payloadStream.available();
+ return n;
+ }
+ }
+ }
+
+ @Override
+ public int available() throws IOException {
+ if (lastPos - pos == 0L) {
+ lastPos = readEndPos();
+ }
+ return (int) (lastPos - pos);
+ }
+
+ @Override
+ public boolean markSupported() {
+ return false;
+ }
+
+ @Override
+ public int read() throws IOException {
+ byte[] data = new byte[1];
+ int numBytes = read(data);
+ if (numBytes <= 0) {
+ return -1;
+ }
+ return data[0];
+ }
+
+}
diff --git a/distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/DLOutputStream.java b/distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/DLOutputStream.java
new file mode 100644
index 000000000..3670bc52b
--- /dev/null
+++ b/distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/DLOutputStream.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.distributedlog.fs;
+
+import static com.google.common.base.Charsets.UTF_8;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.LogRecord;
+import org.apache.distributedlog.api.AsyncLogWriter;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
+import org.apache.distributedlog.exceptions.UnexpectedException;
+import org.apache.distributedlog.util.Utils;
+
+/**
+ * DistributedLog Output Stream.
+ */
+@Slf4j
+class DLOutputStream extends OutputStream {
+
+ private static final byte[] CONTROL_RECORD_CONTENT = "control".getBytes(UTF_8);
+
+ private final DistributedLogManager dlm;
+ private final AsyncLogWriter writer;
+
+ // positions
+ private final long[] syncPos = new long[1];
+ private long writePos = 0L;
+
+ // state
+ private final AtomicReference exception = new AtomicReference<>(null);
+
+ DLOutputStream(DistributedLogManager dlm,
+ AsyncLogWriter writer) {
+ this.dlm = dlm;
+ this.writer = writer;
+ this.writePos = writer.getLastTxId() < 0L ? 0L : writer.getLastTxId();
+ this.syncPos[0] = writePos;
+ }
+
+ public synchronized long position() {
+ return syncPos[0];
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ byte[] data = new byte[] { (byte) b };
+ write(data);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ write(Unpooled.wrappedBuffer(b));
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ write(Unpooled.wrappedBuffer(b, off, len));
+ }
+
+ private synchronized void write(ByteBuf buf) throws IOException {
+ Throwable cause = exception.get();
+ if (null != cause) {
+ if (cause instanceof IOException) {
+ throw (IOException) cause;
+ } else {
+ throw new UnexpectedException("Encountered unknown issue", cause);
+ }
+ }
+
+ writePos += buf.readableBytes();
+ LogRecord record = new LogRecord(writePos, buf);
+ writer.write(record).whenComplete(new FutureEventListener() {
+ @Override
+ public void onSuccess(DLSN value) {
+ synchronized (syncPos) {
+ syncPos[0] = record.getTransactionId();
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable cause) {
+ exception.compareAndSet(null, cause);
+ }
+ });
+ }
+
+ private CompletableFuture writeControlRecord() {
+ LogRecord record;
+ synchronized (this) {
+ record = new LogRecord(writePos, Unpooled.wrappedBuffer(CONTROL_RECORD_CONTENT));
+ record.setControl();
+ }
+ return writer.write(record);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ try {
+ FutureUtils.result(writeControlRecord());
+ } catch (IOException ioe) {
+ throw ioe;
+ } catch (Exception e) {
+ log.error("Unexpected exception in DLOutputStream", e);
+ throw new UnexpectedException("unexpected exception in DLOutputStream#flush()", e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ Utils.ioResult(
+ writeControlRecord()
+ .thenCompose(ignored -> writer.asyncClose())
+ .thenCompose(ignored -> dlm.asyncClose()));
+ }
+}
diff --git a/distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/package-info.java b/distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/package-info.java
new file mode 100644
index 000000000..2af39b76c
--- /dev/null
+++ b/distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * A filesystem API built over distributedlog.
+ */
+package org.apache.distributedlog.fs;
\ No newline at end of file
diff --git a/distributedlog-io/dlfs/src/test/java/org/apache/distributedlog/fs/TestDLFSBase.java b/distributedlog-io/dlfs/src/test/java/org/apache/distributedlog/fs/TestDLFSBase.java
new file mode 100644
index 000000000..1c67d3663
--- /dev/null
+++ b/distributedlog-io/dlfs/src/test/java/org/apache/distributedlog/fs/TestDLFSBase.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.distributedlog.fs;
+
+import java.net.URI;
+import org.apache.distributedlog.DLMTestUtil;
+import org.apache.distributedlog.TestDistributedLogBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+
+/**
+ * Integration test for {@link DLFileSystem}.
+ */
+public abstract class TestDLFSBase extends TestDistributedLogBase {
+
+ @Rule
+ public final TestName runtime = new TestName();
+
+ protected static URI dlfsUri;
+ protected static DLFileSystem fs;
+
+ @BeforeClass
+ public static void setupDLFS() throws Exception {
+ setupCluster();
+ dlfsUri = DLMTestUtil.createDLMURI(zkPort, "");
+ fs = new DLFileSystem();
+ Configuration conf = new Configuration();
+ conf.set(DLFileSystem.DLFS_CONF_FILE, TestDLFSBase.class.getResource("/dlfs.conf").toURI().getPath());
+ fs.initialize(dlfsUri, conf);
+ fs.setWorkingDirectory(new Path("/"));
+ }
+
+ @AfterClass
+ public static void teardownDLFS() throws Exception {
+ fs.close();
+ teardownCluster();
+ }
+
+}
diff --git a/distributedlog-io/dlfs/src/test/java/org/apache/distributedlog/fs/TestDLFileSystem.java b/distributedlog-io/dlfs/src/test/java/org/apache/distributedlog/fs/TestDLFileSystem.java
new file mode 100644
index 000000000..70d8a216d
--- /dev/null
+++ b/distributedlog-io/dlfs/src/test/java/org/apache/distributedlog/fs/TestDLFileSystem.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.distributedlog.fs;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.InputStreamReader;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+/**
+ * Integration test for {@link DLFileSystem}.
+ */
+@Slf4j
+public class TestDLFileSystem extends TestDLFSBase {
+
+ @Test(expected = FileNotFoundException.class)
+ public void testOpenFileNotFound() throws Exception {
+ Path path = new Path("not-found-file");
+ fs.open(path, 1024);
+ }
+
+ @Test
+ public void testBasicIO() throws Exception {
+ Path path = new Path("/path/to/" + runtime.getMethodName());
+
+ assertFalse(fs.exists(path));
+
+ try (FSDataOutputStream out = fs.create(path)) {
+ for (int i = 0; i < 100; i++) {
+ out.writeBytes("line-" + i + "\n");
+ }
+ out.flush();
+ }
+ assertTrue(fs.exists(path));
+
+ File tempFile = new File("/tmp/" + runtime.getMethodName());
+ tempFile.delete();
+ Path localDst = new Path(tempFile.getPath());
+ // copy the file
+ fs.copyToLocalFile(path, localDst);
+ // copy the file to dest
+ fs.copyFromLocalFile(localDst, new Path(runtime.getMethodName() + "-copied"));
+
+ // rename
+ Path dstPath = new Path(runtime.getMethodName() + "-renamed");
+ fs.rename(path, dstPath);
+ assertFalse(fs.exists(path));
+ assertTrue(fs.exists(dstPath));
+
+ try (BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(dstPath, 1134)))) {
+ int lineno = 0;
+ String line;
+ while ((line = reader.readLine()) != null) {
+ assertEquals("line-" + lineno, line);
+ ++lineno;
+ }
+ assertEquals(100, lineno);
+ }
+
+
+ // delete the file
+ fs.delete(dstPath, false);
+ assertFalse(fs.exists(dstPath));
+ }
+
+ @Test
+ public void testListStatuses() throws Exception {
+ Path parentPath = new Path("/path/to/" + runtime.getMethodName());
+ assertFalse(fs.exists(parentPath));
+ try (FSDataOutputStream parentOut = fs.create(parentPath)) {
+ parentOut.writeBytes("parent");
+ parentOut.flush();
+ }
+ assertTrue(fs.exists(parentPath));
+
+ int numLogs = 3;
+ for (int i = 0; i < numLogs; i++) {
+ Path path = new Path("/path/to/" + runtime.getMethodName()
+ + "/" + runtime.getMethodName() + "-" + i);
+ assertFalse(fs.exists(path));
+ try (FSDataOutputStream out = fs.create(path)) {
+ out.writeBytes("line");
+ out.flush();
+ }
+ assertTrue(fs.exists(path));
+ }
+ FileStatus[] files = fs.listStatus(new Path("/path/to/" + runtime.getMethodName()));
+
+ assertEquals(3, files.length);
+ for (int i = 0; i < numLogs; i++) {
+ FileStatus file = files[i];
+ assertEquals(4, file.getLen());
+ assertFalse(file.isDirectory());
+ assertEquals(3, file.getReplication());
+ assertEquals(0L, file.getModificationTime());
+ assertEquals(
+ new Path("/path/to/" + runtime.getMethodName() + "/" + runtime.getMethodName() + "-" + i),
+ file.getPath());
+ }
+ }
+
+ @Test
+ public void testMkDirs() throws Exception {
+ Path path = new Path("/path/to/" + runtime.getMethodName());
+ assertFalse(fs.exists(path));
+ assertTrue(fs.mkdirs(path));
+ assertTrue(fs.exists(path));
+ assertTrue(fs.mkdirs(path));
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testTruncation() throws Exception {
+ Path path = new Path("/path/to/" + runtime.getMethodName());
+ fs.truncate(path, 10);
+ }
+
+ @Test
+ public void testDeleteRecursive() throws Exception {
+ int numLogs = 3;
+ for (int i = 0; i < numLogs; i++) {
+ Path path = new Path("/path/to/" + runtime.getMethodName()
+ + "/" + runtime.getMethodName() + "-" + i);
+ assertFalse(fs.exists(path));
+ try (FSDataOutputStream out = fs.create(path)) {
+ out.writeBytes("line");
+ out.flush();
+ }
+ assertTrue(fs.exists(path));
+ }
+
+ fs.delete(new Path("/path/to/" + runtime.getMethodName()), true);
+ FileStatus[] files = fs.listStatus(new Path("/path/to/" + runtime.getMethodName()));
+ assertEquals(0, files.length);
+ }
+
+ @Test
+ public void testCreateOverwrite() throws Exception {
+ Path path = new Path("/path/to/" + runtime.getMethodName());
+ assertFalse(fs.exists(path));
+ byte[] originData = "original".getBytes(UTF_8);
+ try (FSDataOutputStream out = fs.create(path)) {
+ out.write(originData);
+ out.flush();
+ }
+
+ try (FSDataInputStream in = fs.open(path, 1024)) {
+ assertEquals(originData.length, in.available());
+ byte[] readData = new byte[originData.length];
+ assertEquals(originData.length, in.read(readData));
+ assertArrayEquals(originData, readData);
+ }
+
+ byte[] overwrittenData = "overwritten".getBytes(UTF_8);
+ try (FSDataOutputStream out = fs.create(path, true)) {
+ out.write(overwrittenData);
+ out.flush();
+ }
+
+ try (FSDataInputStream in = fs.open(path, 1024)) {
+ assertEquals(overwrittenData.length, in.available());
+ byte[] readData = new byte[overwrittenData.length];
+ assertEquals(overwrittenData.length, in.read(readData));
+ assertArrayEquals(overwrittenData, readData);
+ }
+ }
+
+ @Test
+ public void testAppend() throws Exception {
+ Path path = new Path("/path/to/" + runtime.getMethodName());
+ assertFalse(fs.exists(path));
+ byte[] originData = "original".getBytes(UTF_8);
+ try (FSDataOutputStream out = fs.create(path)) {
+ out.write(originData);
+ out.flush();
+ }
+
+ try (FSDataInputStream in = fs.open(path, 1024)) {
+ assertEquals(originData.length, in.available());
+ byte[] readData = new byte[originData.length];
+ assertEquals(originData.length, in.read(readData));
+ assertArrayEquals(originData, readData);
+ }
+
+ byte[] appendData = "append".getBytes(UTF_8);
+ try (FSDataOutputStream out = fs.append(path, 1024)) {
+ out.write(appendData);
+ out.flush();
+ }
+
+ try (FSDataInputStream in = fs.open(path, 1024)) {
+ assertEquals(originData.length + appendData.length, in.available());
+ byte[] readData = new byte[originData.length];
+ assertEquals(originData.length, in.read(readData));
+ assertArrayEquals(originData, readData);
+ readData = new byte[appendData.length];
+ assertEquals(appendData.length, in.read(readData));
+ assertArrayEquals(appendData, readData);
+ }
+ }
+
+}
diff --git a/distributedlog-io/dlfs/src/test/resources/dlfs.conf b/distributedlog-io/dlfs/src/test/resources/dlfs.conf
new file mode 100644
index 000000000..26d2bd9fd
--- /dev/null
+++ b/distributedlog-io/dlfs/src/test/resources/dlfs.conf
@@ -0,0 +1,27 @@
+#/**
+# * Licensed to the Apache Software Foundation (ASF) under one
+# * or more contributor license agreements. See the NOTICE file
+# * distributed with this work for additional information
+# * regarding copyright ownership. The ASF licenses this file
+# * to you under the Apache License, Version 2.0 (the
+# * "License"); you may not use this file except in compliance
+# * with the License. You may obtain a copy of the License at
+# *
+# * http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+
+## DLFS settings
+
+writeLockEnabled=false
+
+enableImmediateFlush=false
+
+writerOutputBufferSize=131072
+
+numWorkerThreads=1
diff --git a/distributedlog-io/pom.xml b/distributedlog-io/pom.xml
new file mode 100644
index 000000000..be82c405a
--- /dev/null
+++ b/distributedlog-io/pom.xml
@@ -0,0 +1,38 @@
+
+
+
+
+ org.apache.distributedlog
+ distributedlog
+ 0.6.0-SNAPSHOT
+
+ 4.0.0
+ distributedlog-io
+ pom
+ Apache DistributedLog :: IO
+
+ dlfs
+
+
+ UTF-8
+ UTF-8
+
+
diff --git a/pom.xml b/pom.xml
index bd93cfc2c..616d755c9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -86,6 +86,7 @@
distributedlog-common
distributedlog-protocol
distributedlog-core
+ distributedlog-io
distributedlog-proxy-protocol
distributedlog-proxy-client
distributedlog-proxy-server
@@ -242,7 +243,7 @@
**/dependency-reduced-pom.xml
**/org/apache/distributedlog/thrift/*
**/logs/*.log
- **/target/*
+ **/target/**/*
.git/**/*
.github/**/*