+ 4.0.0
+ distributedlog
+ com.twitter
+ 0.4.0-incubating-SNAPSHOT
+ ../..
+ com.twitter
+ distributedlog-hdfs
+ Apache DistributedLog :: Contribs :: DistributedLog FileSystem
+ http://maven.apache.org
+ UTF-8
+ ${basedir}/lib
+ com.twitter
+ distributedlog-core
+ ${project.parent.version}
+ org.apache.hadoop
+ hadoop-common
+ 2.7.2
+package com.twitter.distributedlog.fs;
+import com.google.common.base.Optional;
+import com.twitter.distributedlog.AppendOnlyStreamReader;
+import com.twitter.distributedlog.AppendOnlyStreamWriter;
+import com.twitter.distributedlog.DistributedLogConfiguration;
+import com.twitter.distributedlog.DistributedLogConstants;
+import com.twitter.distributedlog.DistributedLogManager;
+import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
+import com.twitter.distributedlog.exceptions.LogEmptyException;
+import com.twitter.distributedlog.exceptions.LogNotFoundException;
+import com.twitter.distributedlog.namespace.DistributedLogNamespace;
+import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BufferedFSInputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.URI;
+ * A FileSystem Implementation powered by replicated logs
+ */
+public class DistributedLogFileSystem extends FileSystem {
+ private final Logger logger = LoggerFactory.getLogger(DistributedLogFileSystem.class);
+ //
+ // Settings
+ //
+ public static final String DLFS_CONF_FILE = "dlfs.configuration.file";
+ private URI rootUri;
+ private DistributedLogNamespace namespace;
+ private final DistributedLogConfiguration dlConf;
+ private Path workingDir;
+ public DistributedLogFileSystem() {
+ this.dlConf = new DistributedLogConfiguration();
+ }
+ @Override
+ public URI getUri() {
+ return rootUri;
+ }
+ //
+ // Initialization
+ //
+ @Override
+ public void initialize(URI name, Configuration conf) throws IOException {
+ super.initialize(name, conf);
+ setConf(conf);
+ // initialize
+ this.rootUri = name;
+ // load the configuration
+ String dlConfLocation = conf.get(DLFS_CONF_FILE);
+ if (null != dlConfLocation) {
+ try {
+ this.dlConf.loadConf(new File(dlConfLocation).toURI().toURL());
+ logger.info("Loaded the distributedlog configuration from {}", dlConfLocation);
+ } catch (ConfigurationException e) {
+ logger.error("Failed to load the distributedlog configuration from " + dlConfLocation, e);
+ throw new IOException("Failed to load distributedlog configuration from " + dlConfLocation);
+ }
+ }
+ // initialize the namespace
+ this.namespace = DistributedLogNamespaceBuilder.newBuilder()
+ .clientId("dlfs-client-" + InetAddress.getLocalHost().getHostName())
+ .conf(dlConf)
+ .regionId(DistributedLogConstants.LOCAL_REGION_ID)
+ .uri(name)
+ .build();
+ logger.info("Initialized the filesystem at {}", name);
+ }
+ @Override
+ public void close() throws IOException {
+ // clean up the resource
+ namespace.close();
+ super.close();
+ }
+ //
+ // Util Functions
+ //
+ private Path makeAbsolute(Path f) {
+ if (f.isAbsolute()) {
+ return f;
+ } else {
+ return new Path(workingDir, f);
+ }
+ }
+ private String getStreamName(Path absolutePath) {
+ return absolutePath.toUri().getPath().substring(1);
+ }
+ //
+ // Home & Working Directory
+ //
+ @Override
+ public Path getHomeDirectory() {
+ return this.makeQualified(new Path(System.getProperty("user.home")));
+ }
+ protected Path getInitialWorkingDirectory() {
+ return this.makeQualified(new Path(System.getProperty("user.dir")));
+ }
+ @Override
+ public void setWorkingDirectory(Path path) {
+ workingDir = makeAbsolute(path);
+ checkPath(workingDir);
+ }
+ @Override
+ public Path getWorkingDirectory() {
+ return workingDir;
+ }
+ @Override
+ public FSDataInputStream open(Path path, int bufferSize)
+ throws IOException {
+ Path absolutePath = makeAbsolute(path);
+ DistributedLogManager dlm = namespace.openLog(getStreamName(absolutePath));
+ AppendOnlyStreamReader reader;
+ try {
+ reader = dlm.getAppendOnlyStreamReader();
+ } catch (LogNotFoundException lnfe) {
+ throw new FileNotFoundException(path.toString());
+ } catch (LogEmptyException lee) {
+ throw new FileNotFoundException(path.toString());
+ }
+ return new FSDataInputStream(
+ new BufferedFSInputStream(
+ new DistributedLogInputStream(dlm, reader),
+ bufferSize));
+ }
+ @Override
+ public FSDataOutputStream create(Path path,
+ FsPermission fsPermission,
+ boolean overwrite,
+ int bufferSize,
+ short replication,
+ long blockSize,
+ Progressable progressable) throws IOException {
+ // TODO: support overwrite, support permission
+ DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
+ confLocal.addConfiguration(dlConf);
+ confLocal.setEnsembleSize(replication);
+ confLocal.setWriteQuorumSize(replication);
+ confLocal.setAckQuorumSize(replication);
+ confLocal.setMaxLogSegmentBytes(blockSize);
+ return append(path, bufferSize, Optional.of(confLocal));
+ }
+ @Override
+ public FSDataOutputStream append(Path path,
+ int bufferSize,
+ Progressable progressable) throws IOException {
+ return append(path, bufferSize, Optional.absent());
+ }
+ private FSDataOutputStream append(Path path,
+ int bufferSize,
+ Optional confLocal)
+ throws IOException {
+ Path absolutePath = makeAbsolute(path);
+ DistributedLogManager dlm = namespace.openLog(
+ getStreamName(absolutePath),
+ confLocal,
+ Optional.absent());
+ AppendOnlyStreamWriter streamWriter = dlm.getAppendOnlyStreamWriter();
+ return new FSDataOutputStream(new BufferedOutputStream(
+ new DistributedLogOutputStream(dlm, streamWriter), bufferSize), statistics);
+ }
+ @Override
+ public boolean delete(Path path, boolean recursive) throws IOException {
+ Path absolutePath = makeAbsolute(path);
+ namespace.deleteLog(getStreamName(absolutePath));
+ return true;
+ }
+ //
+ // Not Supported
+ //
+ @Override
+ public FileStatus[] listStatus(Path path) throws FileNotFoundException, IOException {
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public boolean mkdirs(Path path, FsPermission fsPermission) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public FileStatus getFileStatus(Path path) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public boolean rename(Path src, Path dst) throws IOException {
+ throw new UnsupportedOperationException("Rename is not supported yet");
+ }
+ @Override
+ public boolean truncate(Path f, long newLength) throws IOException {
+ throw new UnsupportedOperationException("Truncate is not supported yet");
+ }
+package com.twitter.distributedlog.fs;
+import com.google.common.base.Objects;
+import com.twitter.distributedlog.AppendOnlyStreamReader;
+import com.twitter.distributedlog.DistributedLogManager;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSInputStream;
+import java.io.IOException;
+ * The input stream for a distributedlog stream.
+ */
+class DistributedLogInputStream extends FSInputStream {
+ private final DistributedLogManager dlm;
+ private final AppendOnlyStreamReader streamReader;
+ DistributedLogInputStream(DistributedLogManager dlm,
+ AppendOnlyStreamReader streamReader) throws IOException {
+ this.dlm = dlm;
+ this.streamReader = streamReader;
+ }
+ //
+ // FSInputStream
+ //
+ @Override
+ public void seek(long pos) throws IOException {
+ this.streamReader.skipTo(pos);
+ }
+ @Override
+ public long getPos() throws IOException {
+ return this.streamReader.position();
+ }
+ @Override
+ public boolean seekToNewSource(long targetPos) throws IOException {
+ return false;
+ }
+ //
+ // Input Stream
+ //
+ @Override
+ public int read(byte[] b) throws IOException {
+ return streamReader.read(b);
+ }
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ return streamReader.read(b, off, len);
+ }
+ @Override
+ public long skip(long n) throws IOException {
+ return streamReader.skip(n);
+ }
+ @Override
+ public int available() throws IOException {
+ return streamReader.available();
+ }
+ @Override
+ public void close() throws IOException {
+ streamReader.close();
+ dlm.close();
+ }
+ @Override
+ public synchronized void mark(int readlimit) {
+ streamReader.mark(readlimit);
+ }
+ @Override
+ public synchronized void reset() throws IOException {
+ streamReader.reset();
+ }
+ @Override
+ public boolean markSupported() {
+ return streamReader.markSupported();
+ }
+ @Override
+ public int read() throws IOException {
+ return streamReader.read();
+ }
+ //
+ // Object
+ //
+ @Override
+ public int hashCode() {
+ return streamReader.hashCode();
+ }
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof DistributedLogInputStream)) {
+ return false;
+ }
+ DistributedLogInputStream another = (DistributedLogInputStream) obj;
+ return Objects.equal(streamReader, another.streamReader);
+ }
+ @Override
+ public String toString() {
+ return streamReader.toString();
+ }
+package com.twitter.distributedlog.fs;
+import com.twitter.distributedlog.AppendOnlyStreamWriter;
+import com.twitter.distributedlog.DistributedLogManager;
+import java.io.IOException;
+import java.io.OutputStream;
+ * DistributedLog Output Stream
+ */
+class DistributedLogOutputStream extends OutputStream {
+ private final DistributedLogManager dlm;
+ private final AppendOnlyStreamWriter streamWriter;
+ DistributedLogOutputStream(DistributedLogManager dlm,
+ AppendOnlyStreamWriter streamWriter) {
+ this.dlm = dlm;
+ this.streamWriter = streamWriter;
+ }
+ @Override
+ public void write(int b) throws IOException {
+ byte[] data = new byte[] { (byte) b };
+ write(data);
+ }
+ @Override
+ public void write(byte[] b) throws IOException {
+ streamWriter.write(b);
+ }
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ // TODO: improve AppendOnlyStreamWriter
+ byte[] newData = new byte[len];
+ System.arraycopy(b, off, newData, 0, len);
+ streamWriter.write(newData);
+ }
+ @Override
+ public void flush() throws IOException {
+ streamWriter.force(false);
+ }
+ @Override
+ public void close() throws IOException {
+ streamWriter.close();
+ dlm.close();
+ }
+ com.twitter
+ distributedlog
+ 0.4.0-incubating-SNAPSHOT
+ 4.0.0
+ distributedlog-contribs
+ pom
+ Apache DistributedLog :: Contribs
+ distributedlog-hdfs
+ UTF-8
+ UTF-8
