+ * Called after the new FileSystem instance is constructed, and before it
+ * is ready for use.
+ *
+ * FileSystem implementations overriding this method MUST forward it to
+ * their superclass, though the order in which it is done, and whether
+ * to alter the configuration before the invocation are options of the
+ * subclass.
+ *
+ * @param name a URI whose authority section names the host, port, etc.
+ * for this FileSystem
+ * @param conf the configuration
+ * @throws IOException on any failure to initialize this instance.
+ * @throws IllegalArgumentException if the URI is considered invalid.
+ */
+ public void initialize(URI name, Configuration conf) throws IOException
+ {
+ final String scheme;
+ if (name.getScheme() == null || name.getScheme().isEmpty()) {
+ scheme = getDefaultUri(conf).getScheme();
+ }
+ else {
+ scheme = name.getScheme();
+ }
+ statistics = getStatistics(scheme, getClass());
+ resolveSymlinks = conf.getBoolean(
+ CommonConfigurationKeysPublic.FS_CLIENT_RESOLVE_REMOTE_SYMLINKS_KEY,
+ CommonConfigurationKeysPublic.FS_CLIENT_RESOLVE_REMOTE_SYMLINKS_DEFAULT);
+ }
+
+ /**
+ * Return the protocol scheme for this FileSystem.
+ *
+ * The default implementation simply calls {@link #canonicalizeUri(URI)}
+ * on the filesystem's own URI, so subclasses typically only need to
+ * implement that method.
+ *
+ * @return the URI of this filesystem.
+ * @see #canonicalizeUri(URI)
+ */
+ protected URI getCanonicalUri()
+ {
+ return canonicalizeUri(getUri());
+ }
+
+ /**
+ * Canonicalize the given URI.
+ *
+ * This is implementation-dependent, and may for example consist of
+ * canonicalizing the hostname using DNS and adding the default
+ * port if not specified.
+ *
+ * The default implementation simply fills in the default port if
+ * not specified and if {@link #getDefaultPort()} returns a
+ * default port.
+ *
+ * @param uri url.
+ * @return URI
+ * @see NetUtils#getCanonicalUri(URI, int)
+ */
+ protected URI canonicalizeUri(URI uri)
+ {
+ if (uri.getPort() == -1 && getDefaultPort() > 0) {
+ // reconstruct the uri with the default port set
+ try {
+ uri = new URI(uri.getScheme(), uri.getUserInfo(),
+ uri.getHost(), getDefaultPort(),
+ uri.getPath(), uri.getQuery(), uri.getFragment());
+ }
+ catch (URISyntaxException e) {
+ // Should never happen!
+ throw new AssertionError("Valid URI became unparseable: " +
+ uri);
+ }
+ }
+
+ return uri;
+ }
+
+ /**
+ * Get the default port for this FileSystem.
+ *
+ * @return the default port or 0 if there isn't one
+ */
+ protected int getDefaultPort()
+ {
+ return 0;
+ }
+
+ protected static FileSystem getFSofPath(final Path absOrFqPath,
+ final Configuration conf)
+ throws UnsupportedFileSystemException, IOException
+ {
+ absOrFqPath.checkNotSchemeWithRelative();
+ absOrFqPath.checkNotRelative();
+
+ // Uses the default FileSystem if not fully qualified
+ return get(absOrFqPath.toUri(), conf);
+ }
+
+ /**
+ * Get a canonical service name for this FileSystem.
+ * The token cache is the only user of the canonical service name,
+ * and uses it to lookup this FileSystem's service tokens.
+ * If the file system provides a token of its own then it must have a
+ * canonical name, otherwise the canonical name can be null.
+ *
+ * Default implementation: If the FileSystem has child file systems
+ * (such as an embedded file system) then it is assumed that the FS has no
+ * tokens of its own and hence returns a null name; otherwise a service
+ * name is built using Uri and port.
+ *
+ * @return a service string that uniquely identifies this file system, null
+ * if the filesystem does not implement tokens
+ * @see SecurityUtil#buildDTServiceName(URI, int)
+ */
+ @InterfaceAudience.Public
+ @InterfaceStability.Evolving
+ @Override
+ public String getCanonicalServiceName()
+ {
+ return (getChildFileSystems() == null)
+ ? SecurityUtil.buildDTServiceName(getUri(), getDefaultPort())
+ : null;
+ }
+
+ /**
+ * @return uri to string.
+ * @deprecated call {@link #getUri()} instead.
+ */
+ @Deprecated
+ public String getName()
+ {
+ return getUri().toString();
+ }
+
+ /**
+ * @param name name.
+ * @param conf configuration.
+ * @return file system.
+ * @throws IOException If an I/O error occurred.
+ * @deprecated call {@link #get(URI, Configuration)} instead.
+ */
+ @Deprecated
+ public static FileSystem getNamed(String name, Configuration conf)
+ throws IOException
+ {
+ return get(URI.create(fixName(name)), conf);
+ }
+
+ /**
+ * Update old-format filesystem names, for back-compatibility. This should
+ * eventually be replaced with a checkName() method that throws an exception
+ * for old-format names.
+ */
+ private static String fixName(String name)
+ {
+ // convert old-format name to new-format name
+ if (name.equals("local")) { // "local" is now "file:///".
+ LOGGER.warn("\"local\" is a deprecated filesystem name."
+ + " Use \"file:///\" instead.");
+ name = "file:///";
+ }
+ else if (name.indexOf('/') == -1) { // unqualified is "hdfs://"
+ LOGGER.warn("\"" + name + "\" is a deprecated filesystem name."
+ + " Use \"hdfs://" + name + "/\" instead.");
+ name = "hdfs://" + name;
+ }
+ return name;
+ }
+
+ /**
+ * Get the local FileSystem.
+ *
+ * @param conf the configuration to configure the FileSystem with
+ * if it is newly instantiated.
+ * @return a LocalFileSystem
+ * @throws IOException if somehow the local FS cannot be instantiated.
+ */
+ public static LocalFileSystem getLocal(Configuration conf)
+ throws IOException
+ {
+ return (LocalFileSystem) get(LocalFileSystem.NAME, conf);
+ }
+
+ /**
+ * Get a FileSystem for this URI's scheme and authority.
+ *
()
+ {
+ @Override
+ public FileSystem run() throws IOException
+ {
+ return newInstance(uri, conf);
+ }
+ });
+ }
+
+ /**
+ * Returns the FileSystem for this URI's scheme and authority.
+ * The entire URI is passed to the FileSystem instance's initialize method.
+ * This always returns a new FileSystem object.
+ *
+ * @param uri FS URI
+ * @param config configuration to use
+ * @return the new FS instance
+ * @throws IOException FS creation or initialization failure.
+ */
+ public static FileSystem newInstance(URI uri, Configuration config)
+ throws IOException
+ {
+ String scheme = uri.getScheme();
+ String authority = uri.getAuthority();
+
+ if (scheme == null) { // no scheme: use default FS
+ return newInstance(config);
+ }
+
+ if (authority == null) { // no authority
+ URI defaultUri = getDefaultUri(config);
+ if (scheme.equals(defaultUri.getScheme()) // if scheme matches default
+ && defaultUri.getAuthority() != null) { // & default has authority
+ return newInstance(defaultUri, config); // return default
+ }
+ }
+ return CACHE.getUnique(uri, config);
+ }
+
+ /**
+ * Returns a unique configured FileSystem implementation for the default
+ * filesystem of the supplied configuration.
+ * This always returns a new FileSystem object.
+ *
+ * @param conf the configuration to use
+ * @return the new FS instance
+ * @throws IOException FS creation or initialization failure.
+ */
+ public static FileSystem newInstance(Configuration conf) throws IOException
+ {
+ return newInstance(getDefaultUri(conf), conf);
+ }
+
+ /**
+ * Get a unique local FileSystem object.
+ *
+ * @param conf the configuration to configure the FileSystem with
+ * @return a new LocalFileSystem object.
+ * @throws IOException FS creation or initialization failure.
+ */
+ public static LocalFileSystem newInstanceLocal(Configuration conf)
+ throws IOException
+ {
+ return (LocalFileSystem) newInstance(LocalFileSystem.NAME, conf);
+ }
+
+ /**
+ * Close all cached FileSystem instances. After this operation, they
+ * may not be used in any operations.
+ *
+ * @throws IOException a problem arose closing one or more filesystem.
+ */
+ public static void closeAll() throws IOException
+ {
+ debugLogFileSystemClose("closeAll", "");
+ CACHE.closeAll();
+ }
+
+ /**
+ * Close all cached FileSystem instances for a given UGI.
+ * Be sure those filesystems are not used anymore.
+ *
+ * @param ugi user group info to close
+ * @throws IOException a problem arose closing one or more filesystem.
+ */
+ public static void closeAllForUGI(UserGroupInformation ugi)
+ throws IOException
+ {
+ debugLogFileSystemClose("closeAllForUGI", "UGI: " + ugi);
+ CACHE.closeAll(ugi);
+ }
+
+ private static void debugLogFileSystemClose(String methodName,
+ String additionalInfo)
+ {
+ if (LOGGER.isDebugEnabled()) {
+ Throwable throwable = new Throwable().fillInStackTrace();
+ LOGGER.debug("FileSystem.{}() by method: {}); {}", methodName,
+ throwable.getStackTrace()[2], additionalInfo);
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("FileSystem.{}() full stack trace:", methodName,
+ throwable);
+ }
+ }
+ }
+
+ /**
+ * Qualify a path to one which uses this FileSystem and, if relative,
+ * made absolute.
+ *
+ * @param path to qualify.
+ * @return this path if it contains a scheme and authority and is absolute, or
+ * a new path that includes a path and authority and is fully qualified
+ * @throws IllegalArgumentException if the path has a schema/URI different
+ * from this FileSystem.
+ * @see Path#makeQualified(URI, Path)
+ */
+ public Path makeQualified(Path path)
+ {
+ checkPath(path);
+ return path.makeQualified(this.getUri(), this.getWorkingDirectory());
+ }
+
+ /**
+ * Get a new delegation token for this FileSystem.
+ * This is an internal method that should have been declared protected
+ * but wasn't historically.
+ * Callers should use {@link #addDelegationTokens(String, Credentials)}
+ *
+ * @param renewer the account name that is allowed to renew the token.
+ * @return a new delegation token or null if the FS does not support tokens.
+ * @throws IOException on any problem obtaining a token
+ */
+ @InterfaceAudience.Private
+ @Override
+ public Token> getDelegationToken(String renewer) throws IOException
+ {
+ return null;
+ }
+
+ /**
+ * Get all the immediate child FileSystems embedded in this FileSystem.
+ * It does not recurse and get grand children. If a FileSystem
+ * has multiple child FileSystems, then it must return a unique list
+ * of those FileSystems. Default is to return null to signify no children.
+ *
+ * @return FileSystems that are direct children of this FileSystem,
+ * or null for "no children"
+ */
+ @InterfaceAudience.LimitedPrivate("HDFS")
+ @VisibleForTesting
+ public FileSystem[] getChildFileSystems()
+ {
+ return null;
+ }
+
+ @InterfaceAudience.Private
+ @Override
+ public DelegationTokenIssuer[] getAdditionalTokenIssuers()
+ throws IOException
+ {
+ return getChildFileSystems();
+ }
+
+ /**
+ * Create a file with the provided permission.
+ *
+ * The permission of the file is set to be the provided permission as in
+ * setPermission, not permission{@literal &~}umask
+ *
+ * The HDFS implementation is implemented using two RPCs.
+ * It is understood that it is inefficient,
+ * but the implementation is thread-safe. The other option is to change the
+ * value of umask in configuration to be 0, but it is not thread-safe.
+ *
+ * @param fs FileSystem
+ * @param file the name of the file to be created
+ * @param permission the permission of the file
+ * @return an output stream
+ * @throws IOException IO failure
+ */
+ public static FSDataOutputStream create(FileSystem fs,
+ Path file, FsPermission permission) throws IOException
+ {
+ // create the file with default permission
+ FSDataOutputStream out = fs.create(file);
+ // set its permission to the supplied one
+ fs.setPermission(file, permission);
+ return out;
+ }
+
+ /**
+ * Create a directory with the provided permission.
+ * The permission of the directory is set to be the provided permission as in
+ * setPermission, not permission{@literal &~}umask
+ *
+ * @param fs FileSystem handle
+ * @param dir the name of the directory to be created
+ * @param permission the permission of the directory
+ * @return true if the directory creation succeeds; false otherwise
+ * @throws IOException A problem creating the directories.
+ * @see #create(FileSystem, Path, FsPermission)
+ */
+ public static boolean mkdirs(FileSystem fs, Path dir, FsPermission permission)
+ throws IOException
+ {
+ // create the directory using the default permission
+ boolean result = fs.mkdirs(dir);
+ // set its permission to be the supplied one
+ fs.setPermission(dir, permission);
+ return result;
+ }
+
+ ///////////////////////////////////////////////////////////////
+ // FileSystem
+ ///////////////////////////////////////////////////////////////
+
+ protected FileSystem()
+ {
+ super(null);
+ }
+
+ /**
+ * Check that a Path belongs to this FileSystem.
+ *
+ * The base implementation performs case insensitive equality checks
+ * of the URIs' schemes and authorities. Subclasses may implement slightly
+ * different checks.
+ *
+ * @param path to check
+ * @throws IllegalArgumentException if the path is not considered to be
+ * part of this FileSystem.
+ */
+ protected void checkPath(Path path)
+ {
+ Preconditions.checkArgument(path != null, "null path");
+ URI uri = path.toUri();
+ String thatScheme = uri.getScheme();
+ if (thatScheme == null) { // fs is relative
+ return;
+ }
+ URI thisUri = getCanonicalUri();
+ String thisScheme = thisUri.getScheme();
+ //authority and scheme are not case sensitive
+ if (thisScheme.equalsIgnoreCase(thatScheme)) { // schemes match
+ String thisAuthority = thisUri.getAuthority();
+ String thatAuthority = uri.getAuthority();
+ if (thatAuthority == null && // path's authority is null
+ thisAuthority != null) { // fs has an authority
+ URI defaultUri = getDefaultUri(getConf());
+ if (thisScheme.equalsIgnoreCase(defaultUri.getScheme())) {
+ uri = defaultUri; // schemes match, so use this uri instead
+ }
+ else {
+ uri = null; // can't determine auth of the path
+ }
+ }
+ if (uri != null) {
+ // canonicalize uri before comparing with this fs
+ uri = canonicalizeUri(uri);
+ thatAuthority = uri.getAuthority();
+ if (thisAuthority == thatAuthority || // authorities match
+ (thisAuthority != null &&
+ thisAuthority.equalsIgnoreCase(thatAuthority))) {
+ return;
+ }
+ }
+ }
+ throw new IllegalArgumentException("Wrong FS: " + path +
+ ", expected: " + this.getUri());
+ }
+
+ /**
+ * Return an array containing hostnames, offset and size of
+ * portions of the given file. For nonexistent
+ * file or regions, {@code null} is returned.
+ *
+ *
+ * if f == null :
+ * result = null
+ * elif f.getLen() {@literal <=} start:
+ * result = []
+ * else result = [ locations(FS, b) for b in blocks(FS, p, s, s+l)]
+ *
+ * This call is most helpful with and distributed filesystem
+ * where the hostnames of machines that contain blocks of the given file
+ * can be determined.
+ *
+ * The default implementation returns an array containing one element:
+ *
+ * BlockLocation( { "localhost:9866" }, { "localhost" }, 0, file.getLen())
+ *
+ *
+ * In HDFS, if file is three-replicated, the returned array contains
+ * elements like:
+ *
+ * BlockLocation(offset: 0, length: BLOCK_SIZE,
+ * hosts: {"host1:9866", "host2:9866, host3:9866"})
+ * BlockLocation(offset: BLOCK_SIZE, length: BLOCK_SIZE,
+ * hosts: {"host2:9866", "host3:9866, host4:9866"})
+ *
+ *
+ * And if a file is erasure-coded, the returned BlockLocation are logical
+ * block groups.
+ *
+ * Suppose we have a RS_3_2 coded file (3 data units and 2 parity units).
+ * 1. If the file size is less than one stripe size, say 2 * CELL_SIZE, then
+ * there will be one BlockLocation returned, with 0 offset, actual file size
+ * and 4 hosts (2 data blocks and 2 parity blocks) hosting the actual blocks.
+ * 3. If the file size is less than one group size but greater than one
+ * stripe size, then there will be one BlockLocation returned, with 0 offset,
+ * actual file size with 5 hosts (3 data blocks and 2 parity blocks) hosting
+ * the actual blocks.
+ * 4. If the file size is greater than one group size, 3 * BLOCK_SIZE + 123
+ * for example, then the result will be like:
+ *
+ * BlockLocation(offset: 0, length: 3 * BLOCK_SIZE, hosts: {"host1:9866",
+ * "host2:9866","host3:9866","host4:9866","host5:9866"})
+ * BlockLocation(offset: 3 * BLOCK_SIZE, length: 123, hosts: {"host1:9866",
+ * "host4:9866", "host5:9866"})
+ *
+ *
+ * @param file FilesStatus to get data from
+ * @param start offset into the given file
+ * @param len length for which to get locations for
+ * @return block location array.
+ * @throws IOException IO failure
+ */
+ public BlockLocation[] getFileBlockLocations(FileStatus file,
+ long start, long len) throws IOException
+ {
+ if (file == null) {
+ return null;
+ }
+
+ if (start < 0 || len < 0) {
+ throw new IllegalArgumentException("Invalid start or len parameter");
+ }
+
+ if (file.getLen() <= start) {
+ return new BlockLocation[0];
+ }
+ String[] name = {"localhost:9866"};
+ String[] host = {"localhost"};
+ return new BlockLocation[]{
+ new BlockLocation(name, host, 0, file.getLen())};
+ }
+
+ /**
+ * Return an array containing hostnames, offset and size of
+ * portions of the given file. For a nonexistent
+ * file or regions, {@code null} is returned.
+ *
+ * This call is most helpful with location-aware distributed
+ * filesystems, where it returns hostnames of machines that
+ * contain the given file.
+ *
+ * A FileSystem will normally return the equivalent result
+ * of passing the {@code FileStatus} of the path to
+ * {@link #getFileBlockLocations(FileStatus, long, long)}
+ *
+ * @param p path is used to identify an FS since an FS could have
+ * another FS that it could be delegating the call to
+ * @param start offset into the given file
+ * @param len length for which to get locations for
+ * @return block location array.
+ * @throws FileNotFoundException when the path does not exist
+ * @throws IOException IO failure
+ */
+ public BlockLocation[] getFileBlockLocations(Path p,
+ long start, long len) throws IOException
+ {
+ if (p == null) {
+ throw new NullPointerException();
+ }
+ FileStatus file = getFileStatus(p);
+ return getFileBlockLocations(file, start, len);
+ }
+
+ /**
+ * Return a set of server default configuration values.
+ *
+ * @return server default configuration values
+ * @throws IOException IO failure
+ * @deprecated use {@link #getServerDefaults(Path)} instead
+ */
+ @Deprecated
+ public FsServerDefaults getServerDefaults() throws IOException
+ {
+ Configuration config = getConf();
+ // CRC32 is chosen as default as it is available in all
+ // releases that support checksum.
+ // The client trash configuration is ignored.
+ return new FsServerDefaults(getDefaultBlockSize(),
+ config.getInt("io.bytes.per.checksum", 512),
+ 64 * 1024,
+ getDefaultReplication(),
+ config.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT),
+ false,
+ FS_TRASH_INTERVAL_DEFAULT,
+ DataChecksum.Type.CRC32,
+ "");
+ }
+
+ /**
+ * Return a set of server default configuration values.
+ *
+ * @param p path is used to identify an FS since an FS could have
+ * another FS that it could be delegating the call to
+ * @return server default configuration values
+ * @throws IOException IO failure
+ */
+ public FsServerDefaults getServerDefaults(Path p) throws IOException
+ {
+ return getServerDefaults();
+ }
+
+ /**
+ * Return the fully-qualified path of path, resolving the path
+ * through any symlinks or mount point.
+ *
+ * @param p path to be resolved
+ * @return fully qualified path
+ * @throws FileNotFoundException if the path is not present
+ * @throws IOException for any other error
+ */
+ public Path resolvePath(final Path p) throws IOException
+ {
+ checkPath(p);
+ return getFileStatus(p).getPath();
+ }
+
+ /**
+ * Opens an FSDataInputStream at the indicated Path.
+ *
+ * @param f the file name to open
+ * @param bufferSize the size of the buffer to be used.
+ * @return input stream.
+ * @throws IOException IO failure
+ */
+ public abstract FSDataInputStream open(Path f, int bufferSize)
+ throws IOException;
+
+ /**
+ * Opens an FSDataInputStream at the indicated Path.
+ *
+ * @param f the file to open
+ * @return input stream.
+ * @throws IOException IO failure
+ */
+ public FSDataInputStream open(Path f) throws IOException
+ {
+ return open(f, getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
+ IO_FILE_BUFFER_SIZE_DEFAULT));
+ }
+
+ /**
+ * Open an FSDataInputStream matching the PathHandle instance. The
+ * implementation may encode metadata in PathHandle to address the
+ * resource directly and verify that the resource referenced
+ * satisfies constraints specified at its construciton.
+ *
+ * @param fd PathHandle object returned by the FS authority.
+ * @return input stream.
+ * @throws InvalidPathHandleException If {@link PathHandle} constraints are
+ * not satisfied
+ * @throws IOException IO failure
+ * @throws UnsupportedOperationException If {@link #open(PathHandle, int)}
+ * not overridden by subclass
+ */
+ public FSDataInputStream open(PathHandle fd) throws IOException
+ {
+ return open(fd, getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
+ IO_FILE_BUFFER_SIZE_DEFAULT));
+ }
+
+ /**
+ * Open an FSDataInputStream matching the PathHandle instance. The
+ * implementation may encode metadata in PathHandle to address the
+ * resource directly and verify that the resource referenced
+ * satisfies constraints specified at its construciton.
+ *
+ * @param fd PathHandle object returned by the FS authority.
+ * @param bufferSize the size of the buffer to use
+ * @return input stream.
+ * @throws InvalidPathHandleException If {@link PathHandle} constraints are
+ * not satisfied
+ * @throws IOException IO failure
+ * @throws UnsupportedOperationException If not overridden by subclass
+ */
+ public FSDataInputStream open(PathHandle fd, int bufferSize)
+ throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Create a durable, serializable handle to the referent of the given
+ * entity.
+ *
+ * @param stat Referent in the target FileSystem
+ * @param opt If absent, assume {@link HandleOpt#path()}.
+ * @return path handle.
+ * @throws IllegalArgumentException If the FileStatus does not belong to
+ * this FileSystem
+ * @throws UnsupportedOperationException If {@link #createPathHandle}
+ * not overridden by subclass.
+ * @throws UnsupportedOperationException If this FileSystem cannot enforce
+ * the specified constraints.
+ */
+ public final PathHandle getPathHandle(FileStatus stat, HandleOpt... opt)
+ {
+ // method is final with a default so clients calling getPathHandle(stat)
+ // get the same semantics for all FileSystem implementations
+ if (null == opt || 0 == opt.length) {
+ return createPathHandle(stat, HandleOpt.path());
+ }
+ return createPathHandle(stat, opt);
+ }
+
+ /**
+ * Hook to implement support for {@link PathHandle} operations.
+ *
+ * @param stat Referent in the target FileSystem
+ * @param opt Constraints that determine the validity of the
+ * {@link PathHandle} reference.
+ * @return path handle.
+ */
+ protected PathHandle createPathHandle(FileStatus stat, HandleOpt... opt)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Create an FSDataOutputStream at the indicated Path.
+ * Files are overwritten by default.
+ *
+ * @param f the file to create
+ * @return output stream.
+ * @throws IOException IO failure
+ */
+ public FSDataOutputStream create(Path f) throws IOException
+ {
+ return create(f, true);
+ }
+
+ /**
+ * Create an FSDataOutputStream at the indicated Path.
+ *
+ * @param f the file to create
+ * @param overwrite if a file with this name already exists, then if true,
+ * the file will be overwritten, and if false an exception will be thrown.
+ * @return output stream.
+ * @throws IOException IO failure
+ */
+ public FSDataOutputStream create(Path f, boolean overwrite)
+ throws IOException
+ {
+ return create(f, overwrite,
+ getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
+ IO_FILE_BUFFER_SIZE_DEFAULT),
+ getDefaultReplication(f),
+ getDefaultBlockSize(f));
+ }
+
+ /**
+ * Create an FSDataOutputStream at the indicated Path with write-progress
+ * reporting.
+ * Files are overwritten by default.
+ *
+ * @param f the file to create
+ * @param progress to report progress
+ * @return output stream.
+ * @throws IOException IO failure
+ */
+ public FSDataOutputStream create(Path f, Progressable progress)
+ throws IOException
+ {
+ return create(f, true,
+ getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
+ IO_FILE_BUFFER_SIZE_DEFAULT),
+ getDefaultReplication(f),
+ getDefaultBlockSize(f), progress);
+ }
+
+ /**
+ * Create an FSDataOutputStream at the indicated Path.
+ * Files are overwritten by default.
+ *
+ * @param f the file to create
+ * @param replication the replication factor
+ * @return output stream1
+ * @throws IOException IO failure
+ */
+ public FSDataOutputStream create(Path f, short replication)
+ throws IOException
+ {
+ return create(f, true,
+ getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
+ IO_FILE_BUFFER_SIZE_DEFAULT),
+ replication,
+ getDefaultBlockSize(f));
+ }
+
+ /**
+ * Create an FSDataOutputStream at the indicated Path with write-progress
+ * reporting.
+ * Files are overwritten by default.
+ *
+ * @param f the file to create
+ * @param replication the replication factor
+ * @param progress to report progress
+ * @return output stream.
+ * @throws IOException IO failure
+ */
+ public FSDataOutputStream create(Path f, short replication,
+ Progressable progress) throws IOException
+ {
+ return create(f, true,
+ getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
+ IO_FILE_BUFFER_SIZE_DEFAULT),
+ replication, getDefaultBlockSize(f), progress);
+ }
+
+ /**
+ * Create an FSDataOutputStream at the indicated Path.
+ *
+ * @param f the file to create
+ * @param overwrite if a path with this name already exists, then if true,
+ * the file will be overwritten, and if false an error will be thrown.
+ * @param bufferSize the size of the buffer to be used.
+ * @return output stream.
+ * @throws IOException IO failure
+ */
+ public FSDataOutputStream create(Path f,
+ boolean overwrite,
+ int bufferSize
+ ) throws IOException
+ {
+ return create(f, overwrite, bufferSize,
+ getDefaultReplication(f),
+ getDefaultBlockSize(f));
+ }
+
+ /**
+ * Create an {@link FSDataOutputStream} at the indicated Path
+ * with write-progress reporting.
+ *
+ * The frequency of callbacks is implementation-specific; it may be "none".
+ *
+ * @param f the path of the file to open
+ * @param overwrite if a file with this name already exists, then if true,
+ * the file will be overwritten, and if false an error will be thrown.
+ * @param bufferSize the size of the buffer to be used.
+ * @param progress to report progress.
+ * @return output stream.
+ * @throws IOException IO failure
+ */
+ public FSDataOutputStream create(Path f,
+ boolean overwrite,
+ int bufferSize,
+ Progressable progress
+ ) throws IOException
+ {
+ return create(f, overwrite, bufferSize,
+ getDefaultReplication(f),
+ getDefaultBlockSize(f), progress);
+ }
+
+ /**
+ * Create an FSDataOutputStream at the indicated Path.
+ *
+ * @param f the file name to open
+ * @param overwrite if a file with this name already exists, then if true,
+ * the file will be overwritten, and if false an error will be thrown.
+ * @param bufferSize the size of the buffer to be used.
+ * @param replication required block replication for the file.
+ * @param blockSize the size of the buffer to be used.
+ * @return output stream.
+ * @throws IOException IO failure
+ */
+ public FSDataOutputStream create(Path f,
+ boolean overwrite,
+ int bufferSize,
+ short replication,
+ long blockSize) throws IOException
+ {
+ return create(f, overwrite, bufferSize, replication, blockSize, null);
+ }
+
+ /**
+ * Create an FSDataOutputStream at the indicated Path with write-progress
+ * reporting.
+ *
+ * @param f the file name to open
+ * @param overwrite if a file with this name already exists, then if true,
+ * the file will be overwritten, and if false an error will be thrown.
+ * @param bufferSize the size of the buffer to be used.
+ * @param replication required block replication for the file.
+ * @param blockSize the size of the buffer to be used.
+ * @param progress to report progress.
+ * @return output stream.
+ * @throws IOException IO failure
+ */
+ public FSDataOutputStream create(Path f,
+ boolean overwrite,
+ int bufferSize,
+ short replication,
+ long blockSize,
+ Progressable progress
+ ) throws IOException
+ {
+ return this.create(f, FsCreateModes.applyUMask(
+ FsPermission.getFileDefault(), FsPermission.getUMask(getConf())),
+ overwrite, bufferSize, replication, blockSize, progress);
+ }
+
+ /**
+ * Create an FSDataOutputStream at the indicated Path with write-progress
+ * reporting.
+ *
+ * @param f the file name to open
+ * @param permission file permission
+ * @param overwrite if a file with this name already exists, then if true,
+ * the file will be overwritten, and if false an error will be thrown.
+ * @param bufferSize the size of the buffer to be used.
+ * @param replication required block replication for the file.
+ * @param blockSize block size
+ * @param progress the progress reporter
+ * @return output stream.
+ * @throws IOException IO failure
+ * @see #setPermission(Path, FsPermission)
+ */
+ public abstract FSDataOutputStream create(Path f,
+ FsPermission permission,
+ boolean overwrite,
+ int bufferSize,
+ short replication,
+ long blockSize,
+ Progressable progress) throws IOException;
+
+ /**
+ * Create an FSDataOutputStream at the indicated Path with write-progress
+ * reporting.
+ *
+ * @param f the file name to open
+ * @param permission file permission
+ * @param flags {@link CreateFlag}s to use for this stream.
+ * @param bufferSize the size of the buffer to be used.
+ * @param replication required block replication for the file.
+ * @param blockSize block size
+ * @param progress the progress reporter
+ * @return output stream.
+ * @throws IOException IO failure
+ * @see #setPermission(Path, FsPermission)
+ */
+ public FSDataOutputStream create(Path f,
+ FsPermission permission,
+ EnumSet flags,
+ int bufferSize,
+ short replication,
+ long blockSize,
+ Progressable progress) throws IOException
+ {
+ return create(f, permission, flags, bufferSize, replication,
+ blockSize, progress, null);
+ }
+
+ /**
+ * Create an FSDataOutputStream at the indicated Path with a custom
+ * checksum option.
+ *
+ * @param f the file name to open
+ * @param permission file permission
+ * @param flags {@link CreateFlag}s to use for this stream.
+ * @param bufferSize the size of the buffer to be used.
+ * @param replication required block replication for the file.
+ * @param blockSize block size
+ * @param progress the progress reporter
+ * @param checksumOpt checksum parameter. If null, the values
+ * found in conf will be used.
+ * @return output stream.
+ * @throws IOException IO failure
+ * @see #setPermission(Path, FsPermission)
+ */
+ public FSDataOutputStream create(Path f,
+ FsPermission permission,
+ EnumSet flags,
+ int bufferSize,
+ short replication,
+ long blockSize,
+ Progressable progress,
+ ChecksumOpt checksumOpt) throws IOException
+ {
+ // Checksum options are ignored by default. The file systems that
+ // implement checksum need to override this method. The full
+ // support is currently only available in DFS.
+ return create(f, permission, flags.contains(CreateFlag.OVERWRITE),
+ bufferSize, replication, blockSize, progress);
+ }
+
+ /**
+ * This create has been added to support the FileContext that processes
+ * the permission with umask before calling this method.
+ * This a temporary method added to support the transition from FileSystem
+ * to FileContext for user applications.
+ *
+ * @param f path.
+ * @param absolutePermission permission.
+ * @param flag create flag.
+ * @param bufferSize buffer size.
+ * @param replication replication.
+ * @param blockSize block size.
+ * @param progress progress.
+ * @param checksumOpt check sum opt.
+ * @return output stream.
+ * @throws IOException IO failure
+ */
+ @Deprecated
+ protected FSDataOutputStream primitiveCreate(Path f,
+ FsPermission absolutePermission,
+ EnumSet flag,
+ int bufferSize,
+ short replication,
+ long blockSize,
+ Progressable progress,
+ ChecksumOpt checksumOpt) throws IOException
+ {
+ boolean pathExists = exists(f);
+ CreateFlag.validate(f, pathExists, flag);
+
+ // Default impl assumes that permissions do not matter and
+ // nor does the bytesPerChecksum hence
+ // calling the regular create is good enough.
+ // FSs that implement permissions should override this.
+
+ if (pathExists && flag.contains(CreateFlag.APPEND)) {
+ return append(f, bufferSize, progress);
+ }
+
+ return this.create(f, absolutePermission,
+ flag.contains(CreateFlag.OVERWRITE), bufferSize, replication,
+ blockSize, progress);
+ }
+
+ /**
+ * This version of the mkdirs method assumes that the permission is absolute.
+ * It has been added to support the FileContext that processes the permission
+ * with umask before calling this method.
+ * This a temporary method added to support the transition from FileSystem
+ * to FileContext for user applications.
+ *
+ * @param f path
+ * @param absolutePermission permissions
+ * @return true if the directory was actually created.
+ * @throws IOException IO failure
+ * @see #mkdirs(Path, FsPermission)
+ */
+ @Deprecated
+ protected boolean primitiveMkdir(Path f, FsPermission absolutePermission)
+ throws IOException
+ {
+ return this.mkdirs(f, absolutePermission);
+ }
+
+ /**
+ * This version of the mkdirs method assumes that the permission is absolute.
+ * It has been added to support the FileContext that processes the permission
+ * with umask before calling this method.
+ * This a temporary method added to support the transition from FileSystem
+ * to FileContext for user applications.
+ *
+ * @param f the path.
+ * @param absolutePermission permission.
+ * @param createParent create parent.
+ * @throws IOException IO failure.
+ */
+ @Deprecated
+ protected void primitiveMkdir(Path f, FsPermission absolutePermission,
+ boolean createParent)
+ throws IOException
+ {
+ if (!createParent) {
+ // parent must exist.
+ // since the this.mkdirs makes parent dirs automatically
+ // we must throw exception if parent does not exist.
+ final FileStatus stat = getFileStatus(f.getParent());
+ if (stat == null) {
+ throw new FileNotFoundException("Missing parent:" + f);
+ }
+ if (!stat.isDirectory()) {
+ throw new ParentNotDirectoryException("parent is not a dir");
+ }
+ // parent does exist - go ahead with mkdir of leaf
+ }
+ // Default impl is to assume that permissions do not matter and hence
+ // calling the regular mkdirs is good enough.
+ // FSs that implement permissions should override this.
+ if (!this.mkdirs(f, absolutePermission)) {
+ throw new IOException("mkdir of " + f + " failed");
+ }
+ }
+
+ /**
+ * Opens an FSDataOutputStream at the indicated Path with write-progress
+ * reporting. Same as create(), except fails if parent directory doesn't
+ * already exist.
+ *
+ * @param f the file name to open
+ * @param overwrite if a file with this name already exists, then if true,
+ * the file will be overwritten, and if false an error will be thrown.
+ * @param bufferSize the size of the buffer to be used.
+ * @param replication required block replication for the file.
+ * @param blockSize block size
+ * @param progress the progress reporter
+ * @return output stream.
+ * @throws IOException IO failure
+ * @see #setPermission(Path, FsPermission)
+ */
+ public FSDataOutputStream createNonRecursive(Path f,
+ boolean overwrite,
+ int bufferSize, short replication, long blockSize,
+ Progressable progress) throws IOException
+ {
+ return this.createNonRecursive(f, FsPermission.getFileDefault(),
+ overwrite, bufferSize, replication, blockSize, progress);
+ }
+
+ /**
+ * Opens an FSDataOutputStream at the indicated Path with write-progress
+ * reporting. Same as create(), except fails if parent directory doesn't
+ * already exist.
+ *
+ * @param f the file name to open
+ * @param permission file permission
+ * @param overwrite if a file with this name already exists, then if true,
+ * the file will be overwritten, and if false an error will be thrown.
+ * @param bufferSize the size of the buffer to be used.
+ * @param replication required block replication for the file.
+ * @param blockSize block size
+ * @param progress the progress reporter
+ * @return output stream.
+ * @throws IOException IO failure
+ * @see #setPermission(Path, FsPermission)
+ */
+ public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
+ boolean overwrite, int bufferSize, short replication, long blockSize,
+ Progressable progress) throws IOException
+ {
+ return createNonRecursive(f, permission,
+ overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
+ : EnumSet.of(CreateFlag.CREATE), bufferSize,
+ replication, blockSize, progress);
+ }
+
+ /**
+ * Opens an FSDataOutputStream at the indicated Path with write-progress
+ * reporting. Same as create(), except fails if parent directory doesn't
+ * already exist.
+ *
+ * @param f the file name to open
+ * @param permission file permission
+ * @param flags {@link CreateFlag}s to use for this stream.
+ * @param bufferSize the size of the buffer to be used.
+ * @param replication required block replication for the file.
+ * @param blockSize block size
+ * @param progress the progress reporter
+ * @return output stream.
+ * @throws IOException IO failure
+ * @see #setPermission(Path, FsPermission)
+ */
+ public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
+ EnumSet flags, int bufferSize, short replication, long blockSize,
+ Progressable progress) throws IOException
+ {
+ throw new IOException("createNonRecursive unsupported for this filesystem "
+ + this.getClass());
+ }
+
+ /**
+ * Creates the given Path as a brand-new zero-length file. If
+ * create fails, or if it already existed, return false.
+ * Important: the default implementation is not atomic
+ *
+ * @param f path to use for create
+ * @return if create new file success true,not false.
+ * @throws IOException IO failure
+ */
+ public boolean createNewFile(Path f) throws IOException
+ {
+ if (exists(f)) {
+ return false;
+ }
+ else {
+ create(f, false, getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
+ IO_FILE_BUFFER_SIZE_DEFAULT)).close();
+ return true;
+ }
+ }
+
+ /**
+ * Append to an existing file (optional operation).
+ * Same as
+ * {@code append(f, getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
+ * IO_FILE_BUFFER_SIZE_DEFAULT), null)}
+ *
+ * @param f the existing file to be appended.
+ * @return output stream.
+ * @throws IOException IO failure
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default).
+ */
+ public FSDataOutputStream append(Path f) throws IOException
+ {
+ return append(f, getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
+ IO_FILE_BUFFER_SIZE_DEFAULT), null);
+ }
+
+ /**
+ * Append to an existing file (optional operation).
+ * Same as append(f, bufferSize, null).
+ *
+ * @param f the existing file to be appended.
+ * @param bufferSize the size of the buffer to be used.
+ * @return output stream.
+ * @throws IOException IO failure
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default).
+ */
+ public FSDataOutputStream append(Path f, int bufferSize) throws IOException
+ {
+ return append(f, bufferSize, null);
+ }
+
+ /**
+ * Append to an existing file (optional operation).
+ *
+ * @param f the existing file to be appended.
+ * @param bufferSize the size of the buffer to be used.
+ * @param progress for reporting progress if it is not null.
+ * @return output stream.
+ * @throws IOException IO failure
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default).
+ */
+ public abstract FSDataOutputStream append(Path f, int bufferSize,
+ Progressable progress) throws IOException;
+
+ /**
+ * Append to an existing file (optional operation).
+ *
+ * @param f the existing file to be appended.
+ * @param appendToNewBlock whether to append data to a new block
+ * instead of the end of the last partial block
+ * @return output stream.
+ * @throws IOException IO failure
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default).
+ */
+ public FSDataOutputStream append(Path f, boolean appendToNewBlock) throws IOException
+ {
+ return append(f, getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
+ IO_FILE_BUFFER_SIZE_DEFAULT), null, appendToNewBlock);
+ }
+
+ /**
+ * Append to an existing file (optional operation).
+ * This function is used for being overridden by some FileSystem like DistributedFileSystem
+ *
+ * @param f the existing file to be appended.
+ * @param bufferSize the size of the buffer to be used.
+ * @param progress for reporting progress if it is not null.
+ * @param appendToNewBlock whether to append data to a new block
+ * instead of the end of the last partial block
+ * @return output stream.
+ * @throws IOException IO failure
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default).
+ */
+ public FSDataOutputStream append(Path f, int bufferSize,
+ Progressable progress, boolean appendToNewBlock) throws IOException
+ {
+ return append(f, bufferSize, progress);
+ }
+
+ /**
+ * Concat existing files together.
+ *
+ * @param trg the path to the target destination.
+ * @param psrcs the paths to the sources to use for the concatenation.
+ * @throws IOException IO failure
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default).
+ */
+ public void concat(final Path trg, final Path[] psrcs) throws IOException
+ {
+ throw new UnsupportedOperationException("Not implemented by the " +
+ getClass().getSimpleName() + " FileSystem implementation");
+ }
+
+ /**
+ * Get the replication factor.
+ *
+ * @param src file name
+ * @return file replication
+ * @throws FileNotFoundException if the path does not resolve.
+ * @throws IOException an IO failure
+ * @deprecated Use {@link #getFileStatus(Path)} instead
+ */
+ @Deprecated
+ public short getReplication(Path src) throws IOException
+ {
+ return getFileStatus(src).getReplication();
+ }
+
+ /**
+ * Set the replication for an existing file.
+ * If a filesystem does not support replication, it will always
+ * return true: the check for a file existing may be bypassed.
+ * This is the default behavior.
+ *
+ * @param src file name
+ * @param replication new replication
+ * @return true if successful, or the feature in unsupported;
+ * false if replication is supported but the file does not exist,
+ * or is a directory
+ * @throws IOException an IO failure.
+ */
+ public boolean setReplication(Path src, short replication)
+ throws IOException
+ {
+ return true;
+ }
+
+ /**
+ * Renames Path src to Path dst.
+ *
+ * @param src path to be renamed
+ * @param dst new path after rename
+ * @return true if rename is successful
+ * @throws IOException on failure
+ */
+ public abstract boolean rename(Path src, Path dst) throws IOException;
+
+ /**
+ * Renames Path src to Path dst
+ *
+ * - Fails if src is a file and dst is a directory.
+ * - Fails if src is a directory and dst is a file.
+ * - Fails if the parent of dst does not exist or is a file.
+ *
+ *
+ * If OVERWRITE option is not passed as an argument, rename fails
+ * if the dst already exists.
+ *
+ *
+ * If OVERWRITE option is passed as an argument, rename overwrites
+ * the dst if it is a file or an empty directory. Rename fails if dst is
+ * a non-empty directory.
+ *
+ * Note that atomicity of rename is dependent on the file system
+ * implementation. Please refer to the file system documentation for
+ * details. This default implementation is non atomic.
+ *
+ * This method is deprecated since it is a temporary method added to
+ * support the transition from FileSystem to FileContext for user
+ * applications.
+ *
+ *
+ * @param src path to be renamed
+ * @param dst new path after rename
+ * @param options rename options.
+ * @throws FileNotFoundException src path does not exist, or the parent
+ * path of dst does not exist.
+ * @throws FileAlreadyExistsException dest path exists and is a file
+ * @throws ParentNotDirectoryException if the parent path of dest is not
+ * a directory
+ * @throws IOException on failure
+ */
+ @Deprecated
+ protected void rename(final Path src, final Path dst,
+ final Rename... options) throws IOException
+ {
+ // Default implementation
+ final FileStatus srcStatus = getFileLinkStatus(src);
+ if (srcStatus == null) {
+ throw new FileNotFoundException("rename source " + src + " not found.");
+ }
+
+ boolean overwrite = false;
+ if (null != options) {
+ for (Rename option : options) {
+ if (option == Rename.OVERWRITE) {
+ overwrite = true;
+ }
+ }
+ }
+
+ FileStatus dstStatus;
+ try {
+ dstStatus = getFileLinkStatus(dst);
+ }
+ catch (IOException e) {
+ dstStatus = null;
+ }
+ if (dstStatus != null) {
+ if (srcStatus.isDirectory() != dstStatus.isDirectory()) {
+ throw new IOException("Source " + src + " Destination " + dst
+ + " both should be either file or directory");
+ }
+ if (!overwrite) {
+ throw new FileAlreadyExistsException("rename destination " + dst
+ + " already exists.");
+ }
+ // Delete the destination that is a file or an empty directory
+ if (dstStatus.isDirectory()) {
+ FileStatus[] list = listStatus(dst);
+ if (list != null && list.length != 0) {
+ throw new IOException(
+ "rename cannot overwrite non empty destination directory " + dst);
+ }
+ }
+ delete(dst, false);
+ }
+ else {
+ final Path parent = dst.getParent();
+ final FileStatus parentStatus = getFileStatus(parent);
+ if (parentStatus == null) {
+ throw new FileNotFoundException("rename destination parent " + parent
+ + " not found.");
+ }
+ if (!parentStatus.isDirectory()) {
+ throw new ParentNotDirectoryException("rename destination parent " + parent
+ + " is a file.");
+ }
+ }
+ if (!rename(src, dst)) {
+ throw new IOException("rename from " + src + " to " + dst + " failed.");
+ }
+ }
+
+ /**
+ * Truncate the file in the indicated path to the indicated size.
+ *
+ * - Fails if path is a directory.
+ * - Fails if path does not exist.
+ * - Fails if path is not closed.
+ * - Fails if new size is greater than current size.
+ *
+ *
+ * @param f The path to the file to be truncated
+ * @param newLength The size the file is to be truncated to
+ * @return true
if the file has been truncated to the desired
+ * newLength
and is immediately available to be reused for
+ * write operations such as append
, or
+ * false
if a background process of adjusting the length of
+ * the last block has been started, and clients should wait for it to
+ * complete before proceeding with further file updates.
+ * @throws IOException IO failure
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default).
+ */
+ public boolean truncate(Path f, long newLength) throws IOException
+ {
+ throw new UnsupportedOperationException("Not implemented by the " +
+ getClass().getSimpleName() + " FileSystem implementation");
+ }
+
+ /**
+ * Delete a file/directory.
+ *
+ * @param f the path.
+ * @return if delete success true, not false.
+ * @throws IOException IO failure.
+ * @deprecated Use {@link #delete(Path, boolean)} instead.
+ */
+ @Deprecated
+ public boolean delete(Path f) throws IOException
+ {
+ return delete(f, true);
+ }
+
+ /**
+ * Delete a file.
+ *
+ * @param f the path to delete.
+ * @param recursive if path is a directory and set to
+ * true, the directory is deleted else throws an exception. In
+ * case of a file the recursive can be set to either true or false.
+ * @return true if delete is successful else false.
+ * @throws IOException IO failure
+ */
+ public abstract boolean delete(Path f, boolean recursive) throws IOException;
+
+ /**
+ * Mark a path to be deleted when its FileSystem is closed.
+ * When the JVM shuts down cleanly, all cached FileSystem objects will be
+ * closed automatically. These the marked paths will be deleted as a result.
+ *
+ * If a FileSystem instance is not cached, i.e. has been created with
+ * {@link #createFileSystem(URI, Configuration)}, then the paths will
+ * be deleted in when {@link #close()} is called on that instance.
+ *
+ * The path must exist in the filesystem at the time of the method call;
+ * it does not have to exist at the time of JVM shutdown.
+ *
+ * Notes
+ *
+ * - Clean shutdown of the JVM cannot be guaranteed.
+ * - The time to shut down a FileSystem will depends on the number of
+ * files to delete. For filesystems where the cost of checking
+ * for the existence of a file/directory and the actual delete operation
+ * (for example: object stores) is high, the time to shutdown the JVM can be
+ * significantly extended by over-use of this feature.
+ * - Connectivity problems with a remote filesystem may delay shutdown
+ * further, and may cause the files to not be deleted.
+ *
+ *
+ * @param f the path to delete.
+ * @return true if deleteOnExit is successful, otherwise false.
+ * @throws IOException IO failure
+ */
+ public boolean deleteOnExit(Path f) throws IOException
+ {
+ if (!exists(f)) {
+ return false;
+ }
+ synchronized (deleteOnExit) {
+ deleteOnExit.add(f);
+ }
+ return true;
+ }
+
+ /**
+ * Cancel the scheduled deletion of the path when the FileSystem is closed.
+ *
+ * @param f the path to cancel deletion
+ * @return true if the path was found in the delete-on-exit list.
+ */
+ public boolean cancelDeleteOnExit(Path f)
+ {
+ synchronized (deleteOnExit) {
+ return deleteOnExit.remove(f);
+ }
+ }
+
+ /**
+ * Delete all paths that were marked as delete-on-exit. This recursively
+ * deletes all files and directories in the specified paths.
+ *
+ * The time to process this operation is {@code O(paths)}, with the actual
+ * time dependent on the time for existence and deletion operations to
+ * complete, successfully or not.
+ */
+ protected void processDeleteOnExit()
+ {
+ synchronized (deleteOnExit) {
+ for (Iterator iter = deleteOnExit.iterator(); iter.hasNext(); ) {
+ Path path = iter.next();
+ try {
+ if (exists(path)) {
+ delete(path, true);
+ }
+ }
+ catch (IOException e) {
+ LOGGER.info("Ignoring failure to deleteOnExit for path {}", path);
+ }
+ iter.remove();
+ }
+ }
+ }
+
+ /**
+ * Check if a path exists.
+ *
+ * It is highly discouraged to call this method back to back with other
+ * {@link #getFileStatus(Path)} calls, as this will involve multiple redundant
+ * RPC calls in HDFS.
+ *
+ * @param f source path
+ * @return true if the path exists
+ * @throws IOException IO failure
+ */
+ public boolean exists(Path f) throws IOException
+ {
+ try {
+ return getFileStatus(f) != null;
+ }
+ catch (FileNotFoundException e) {
+ return false;
+ }
+ }
+
+ /**
+ * True iff the named path is a directory.
+ * Note: Avoid using this method. Instead reuse the FileStatus
+ * returned by getFileStatus() or listStatus() methods.
+ *
+ * @param f path to check
+ * @return if f is directory true, not false.
+ * @throws IOException IO failure
+ * @deprecated Use {@link #getFileStatus(Path)} instead
+ */
+ @Deprecated
+ public boolean isDirectory(Path f) throws IOException
+ {
+ try {
+ return getFileStatus(f).isDirectory();
+ }
+ catch (FileNotFoundException e) {
+ return false; // f does not exist
+ }
+ }
+
+ /**
+ * True iff the named path is a regular file.
+ * Note: Avoid using this method. Instead reuse the FileStatus
+ * returned by {@link #getFileStatus(Path)} or listStatus() methods.
+ *
+ * @param f path to check
+ * @return if f is file true, not false.
+ * @throws IOException IO failure
+ * @deprecated Use {@link #getFileStatus(Path)} instead
+ */
+ @Deprecated
+ public boolean isFile(Path f) throws IOException
+ {
+ try {
+ return getFileStatus(f).isFile();
+ }
+ catch (FileNotFoundException e) {
+ return false; // f does not exist
+ }
+ }
+
+ /**
+ * The number of bytes in a file.
+ *
+ * @param f the path.
+ * @return the number of bytes; 0 for a directory
+ * @throws FileNotFoundException if the path does not resolve
+ * @throws IOException IO failure
+ * @deprecated Use {@link #getFileStatus(Path)} instead.
+ */
+ @Deprecated
+ public long getLength(Path f) throws IOException
+ {
+ return getFileStatus(f).getLen();
+ }
+
+ /**
+ * Return the {@link ContentSummary} of a given {@link Path}.
+ *
+ * @param f path to use
+ * @return content summary.
+ * @throws FileNotFoundException if the path does not resolve
+ * @throws IOException IO failure
+ */
+ public ContentSummary getContentSummary(Path f) throws IOException
+ {
+ FileStatus status = getFileStatus(f);
+ if (status.isFile()) {
+ // f is a file
+ long length = status.getLen();
+ return new ContentSummary.Builder().length(length).fileCount(1).directoryCount(0).spaceConsumed(length).build();
+ }
+ // f is a directory
+ long[] summary = {0, 0, 1};
+ for (FileStatus s : listStatus(f)) {
+ long length = s.getLen();
+ ContentSummary c = s.isDirectory() ? getContentSummary(s.getPath()) :
+ new ContentSummary.Builder().length(length).fileCount(1).directoryCount(0).spaceConsumed(length).build();
+ summary[0] += c.getLength();
+ summary[1] += c.getFileCount();
+ summary[2] += c.getDirectoryCount();
+ }
+ return new ContentSummary.Builder().length(summary[0]).fileCount(summary[1]).directoryCount(summary[2]).spaceConsumed(summary[0]).build();
+ }
+
+ /**
+ * Return the {@link QuotaUsage} of a given {@link Path}.
+ *
+ * @param f path to use
+ * @return the quota usage
+ * @throws IOException IO failure
+ */
+ public QuotaUsage getQuotaUsage(Path f) throws IOException
+ {
+ return getContentSummary(f);
+ }
+
+ /**
+ * Set quota for the given {@link Path}.
+ *
+ * @param src the target path to set quota for
+ * @param namespaceQuota the namespace quota (i.e., # of files/directories)
+ * to set
+ * @param storagespaceQuota the storage space quota to set
+ * @throws IOException IO failure
+ */
+ public void setQuota(Path src, final long namespaceQuota,
+ final long storagespaceQuota) throws IOException
+ {
+ methodNotSupported();
+ }
+
+ /**
+ * Set per storage type quota for the given {@link Path}.
+ *
+ * @param src the target path to set storage type quota for
+ * @param type the storage type to set
+ * @param quota the quota to set for the given storage type
+ * @throws IOException IO failure
+ */
+ public void setQuotaByStorageType(Path src, final StorageType type,
+ final long quota) throws IOException
+ {
+ methodNotSupported();
+ }
+
+ /**
+ * The default filter accepts all paths.
+ */
+ private static final PathFilter DEFAULT_FILTER = new PathFilter()
+ {
+ @Override
+ public boolean accept(Path file)
+ {
+ return true;
+ }
+ };
+
+ /**
+ * List the statuses of the files/directories in the given path if the path is
+ * a directory.
+ *
+ * Does not guarantee to return the List of files/directories status in a
+ * sorted order.
+ *
+ * Will not return null. Expect IOException upon access error.
+ *
+ * @param f given path
+ * @return the statuses of the files/directories in the given patch
+ * @throws FileNotFoundException when the path does not exist
+ * @throws IOException see specific implementation
+ */
+ public abstract FileStatus[] listStatus(Path f) throws FileNotFoundException,
+ IOException;
+
+ /**
+ * Represents a batch of directory entries when iteratively listing a
+ * directory. This is a private API not meant for use by end users.
+ *
+ * For internal use by FileSystem subclasses that override
+ * {@link FileSystem#listStatusBatch(Path, byte[])} to implement iterative
+ * listing.
+ */
+ @InterfaceAudience.Private
+ public static class DirectoryEntries
+ {
+ private final FileStatus[] entries;
+ private final byte[] token;
+ private final boolean hasMore;
+
+ public DirectoryEntries(FileStatus[] entries, byte[] token, boolean
+ hasMore)
+ {
+ this.entries = entries;
+ if (token != null) {
+ this.token = token.clone();
+ }
+ else {
+ this.token = null;
+ }
+ this.hasMore = hasMore;
+ }
+
+ public FileStatus[] getEntries()
+ {
+ return entries;
+ }
+
+ public byte[] getToken()
+ {
+ return token;
+ }
+
+ public boolean hasMore()
+ {
+ return hasMore;
+ }
+ }
+
+ /**
+ * Given an opaque iteration token, return the next batch of entries in a
+ * directory. This is a private API not meant for use by end users.
+ *
+ * This method should be overridden by FileSystem subclasses that want to
+ * use the generic {@link FileSystem#listStatusIterator(Path)} implementation.
+ *
+ * @param f Path to list
+ * @param token opaque iteration token returned by previous call, or null
+ * if this is the first call.
+ * @return directory entries.
+ * @throws FileNotFoundException when the path does not exist.
+ * @throws IOException If an I/O error occurred.
+ */
+ @InterfaceAudience.Private
+ protected DirectoryEntries listStatusBatch(Path f, byte[] token) throws
+ FileNotFoundException, IOException
+ {
+ // The default implementation returns the entire listing as a single batch.
+ // Thus, there is never a second batch, and no need to respect the passed
+ // token or set a token in the returned DirectoryEntries.
+ FileStatus[] listing = listStatus(f);
+ return new DirectoryEntries(listing, null, false);
+ }
+
+ /**
+ * Filter files/directories in the given path using the user-supplied path
+ * filter. Results are added to the given array results
.
+ *
+ * @throws FileNotFoundException when the path does not exist
+ * @throws IOException see specific implementation
+ */
+ private void listStatus(ArrayList results, Path f,
+ PathFilter filter) throws FileNotFoundException, IOException
+ {
+ FileStatus[] listing = listStatus(f);
+ Preconditions.checkNotNull(listing, "listStatus should not return NULL");
+ for (int i = 0; i < listing.length; i++) {
+ if (filter.accept(listing[i].getPath())) {
+ results.add(listing[i]);
+ }
+ }
+ }
+
+ /**
+ * List corrupted file blocks.
+ *
+ * @param path the path.
+ * @return an iterator over the corrupt files under the given path
+ * (may contain duplicates if a file has more than one corrupt block)
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default).
+ * @throws IOException IO failure
+ */
+ public RemoteIterator listCorruptFileBlocks(Path path)
+ throws IOException
+ {
+ throw new UnsupportedOperationException(getClass().getCanonicalName() +
+ " does not support listCorruptFileBlocks");
+ }
+
+ /**
+ * Filter files/directories in the given path using the user-supplied path
+ * filter.
+ *
+ * Does not guarantee to return the List of files/directories status in a
+ * sorted order.
+ *
+ * @param f a path name
+ * @param filter the user-supplied path filter
+ * @return an array of FileStatus objects for the files under the given path
+ * after applying the filter
+ * @throws FileNotFoundException when the path does not exist
+ * @throws IOException see specific implementation
+ */
+ public FileStatus[] listStatus(Path f, PathFilter filter)
+ throws FileNotFoundException, IOException
+ {
+ ArrayList results = new ArrayList<>();
+ listStatus(results, f, filter);
+ return results.toArray(new FileStatus[results.size()]);
+ }
+
+ /**
+ * Filter files/directories in the given list of paths using default
+ * path filter.
+ *
+ * Does not guarantee to return the List of files/directories status in a
+ * sorted order.
+ *
+ * @param files a list of paths
+ * @return a list of statuses for the files under the given paths after
+ * applying the filter default Path filter
+ * @throws FileNotFoundException when the path does not exist
+ * @throws IOException see specific implementation
+ */
+ public FileStatus[] listStatus(Path[] files)
+ throws FileNotFoundException, IOException
+ {
+ return listStatus(files, DEFAULT_FILTER);
+ }
+
+ /**
+ * Filter files/directories in the given list of paths using user-supplied
+ * path filter.
+ *
+ * Does not guarantee to return the List of files/directories status in a
+ * sorted order.
+ *
+ * @param files a list of paths
+ * @param filter the user-supplied path filter
+ * @return a list of statuses for the files under the given paths after
+ * applying the filter
+ * @throws FileNotFoundException when the path does not exist
+ * @throws IOException see specific implementation
+ */
+ public FileStatus[] listStatus(Path[] files, PathFilter filter)
+ throws FileNotFoundException, IOException
+ {
+ ArrayList results = new ArrayList();
+ for (int i = 0; i < files.length; i++) {
+ listStatus(results, files[i], filter);
+ }
+ return results.toArray(new FileStatus[results.size()]);
+ }
+
+ /**
+ * Return all the files that match filePattern and are not checksum
+ * files. Results are sorted by their names.
+ *
+ *
+ * A filename pattern is composed of regular characters and
+ * special pattern matching characters, which are:
+ *
+ *
+ * -
+ *
+ * - ?
+ *
- Matches any single character.
+ *
+ *
- *
+ *
- Matches zero or more characters.
+ *
+ *
- [abc]
+ *
- Matches a single character from character set
+ * {a,b,c}.
+ *
+ *
- [a-b]
+ *
- Matches a single character from the character range
+ * {a...b}. Note that character a must be
+ * lexicographically less than or equal to character b.
+ *
+ *
- [^a]
+ *
- Matches a single character that is not from character set or range
+ * {a}. Note that the ^ character must occur
+ * immediately to the right of the opening bracket.
+ *
+ *
- \c
+ *
- Removes (escapes) any special meaning of character c.
+ *
+ *
- {ab,cd}
+ *
- Matches a string from the string set {ab, cd}
+ *
+ *
- {ab,c{de,fh}}
+ *
- Matches a string from the string set {ab, cde, cfh}
+ *
+ *
+ *
+ *
+ *
+ * @param pathPattern a glob specifying a path pattern
+ * @return an array of paths that match the path pattern
+ * @throws IOException IO failure
+ */
+ public FileStatus[] globStatus(Path pathPattern) throws IOException
+ {
+ return Globber.createGlobber(this)
+ .withPathPattern(pathPattern)
+ .withPathFiltern(DEFAULT_FILTER)
+ .withResolveSymlinks(true)
+ .build()
+ .glob();
+ }
+
+ /**
+ * Return an array of {@link FileStatus} objects whose path names match
+ * {@code pathPattern} and is accepted by the user-supplied path filter.
+ * Results are sorted by their path names.
+ *
+ * @param pathPattern a glob specifying the path pattern
+ * @param filter a user-supplied path filter
+ * @return null if {@code pathPattern} has no glob and the path does not exist
+ * an empty array if {@code pathPattern} has a glob and no path
+ * matches it else an array of {@link FileStatus} objects matching the
+ * pattern
+ * @throws IOException if any I/O error occurs when fetching file status
+ */
+ public FileStatus[] globStatus(Path pathPattern, PathFilter filter)
+ throws IOException
+ {
+ return new Globber(this, pathPattern, filter).glob();
+ }
+
+ /**
+ * List the statuses of the files/directories in the given path if the path is
+ * a directory.
+ * Return the file's status and block locations If the path is a file.
+ *
+ * If a returned status is a file, it contains the file's block locations.
+ *
+ * @param f is the path
+ * @return an iterator that traverses statuses of the files/directories
+ * in the given path
+ * @throws FileNotFoundException If f
does not exist
+ * @throws IOException If an I/O error occurred
+ */
+ public RemoteIterator listLocatedStatus(final Path f)
+ throws FileNotFoundException, IOException
+ {
+ return listLocatedStatus(f, DEFAULT_FILTER);
+ }
+
+ /**
+ * List a directory.
+ * The returned results include its block location if it is a file
+ * The results are filtered by the given path filter
+ *
+ * @param f a path
+ * @param filter a path filter
+ * @return an iterator that traverses statuses of the files/directories
+ * in the given path
+ * @throws FileNotFoundException if f
does not exist
+ * @throws IOException if any I/O error occurred
+ */
+ protected RemoteIterator listLocatedStatus(final Path f,
+ final PathFilter filter)
+ throws FileNotFoundException, IOException
+ {
+ return new RemoteIterator()
+ {
+ private final FileStatus[] stats = listStatus(f, filter);
+ private int i;
+
+ @Override
+ public boolean hasNext()
+ {
+ return i < stats.length;
+ }
+
+ @Override
+ public LocatedFileStatus next() throws IOException
+ {
+ if (!hasNext()) {
+ throw new NoSuchElementException("No more entries in " + f);
+ }
+ FileStatus result = stats[i++];
+ // for files, use getBlockLocations(FileStatus, int, int) to avoid
+ // calling getFileStatus(Path) to load the FileStatus again
+ BlockLocation[] locs = result.isFile() ?
+ getFileBlockLocations(result, 0, result.getLen()) :
+ null;
+ return new LocatedFileStatus(result, locs);
+ }
+ };
+ }
+
+ /**
+ * Generic iterator for implementing {@link #listStatusIterator(Path)}.
+ */
+ protected class DirListingIterator
+ implements RemoteIterator
+ {
+ private final Path path;
+ private DirectoryEntries entries;
+ private int i;
+
+ DirListingIterator(Path path) throws IOException
+ {
+ this.path = path;
+ this.entries = listStatusBatch(path, null);
+ }
+
+ @Override
+ public boolean hasNext() throws IOException
+ {
+ return i < entries.getEntries().length ||
+ entries.hasMore();
+ }
+
+ private void fetchMore() throws IOException
+ {
+ byte[] token = entries.getToken();
+ entries = listStatusBatch(path, token);
+ i = 0;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public T next() throws IOException
+ {
+ if (!hasNext()) {
+ throw new NoSuchElementException("No more items in iterator");
+ }
+ if (i == entries.getEntries().length) {
+ fetchMore();
+ }
+ return (T) entries.getEntries()[i++];
+ }
+ }
+
+ /**
+ * Returns a remote iterator so that followup calls are made on demand
+ * while consuming the entries. Each FileSystem implementation should
+ * override this method and provide a more efficient implementation, if
+ * possible.
+ *
+ * Does not guarantee to return the iterator that traverses statuses
+ * of the files in a sorted order.
+ *
+ * @param p target path
+ * @return remote iterator
+ * @throws FileNotFoundException if p
does not exist
+ * @throws IOException if any I/O error occurred
+ */
+ public RemoteIterator listStatusIterator(final Path p)
+ throws FileNotFoundException, IOException
+ {
+ return new DirListingIterator<>(p);
+ }
+
+ /**
+ * List the statuses and block locations of the files in the given path.
+ * Does not guarantee to return the iterator that traverses statuses
+ * of the files in a sorted order.
+ *
+ *
+ * If the path is a directory,
+ * if recursive is false, returns files in the directory;
+ * if recursive is true, return files in the subtree rooted at the path.
+ * If the path is a file, return the file's status and block locations.
+ *
+ *
+ * @param f is the path
+ * @param recursive if the subdirectories need to be traversed recursively
+ * @return an iterator that traverses statuses of the files
+ * @throws FileNotFoundException when the path does not exist;
+ * @throws IOException see specific implementation
+ */
+ public RemoteIterator listFiles(
+ final Path f, final boolean recursive)
+ throws FileNotFoundException, IOException
+ {
+ return new RemoteIterator()
+ {
+ private Stack> itors = new Stack<>();
+ private RemoteIterator curItor =
+ listLocatedStatus(f);
+ private LocatedFileStatus curFile;
+
+ @Override
+ public boolean hasNext() throws IOException
+ {
+ while (curFile == null) {
+ if (curItor.hasNext()) {
+ handleFileStat(curItor.next());
+ }
+ else if (!itors.empty()) {
+ curItor = itors.pop();
+ }
+ else {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Process the input stat.
+ * If it is a file, return the file stat.
+ * If it is a directory, traverse the directory if recursive is true;
+ * ignore it if recursive is false.
+ * @param stat input status
+ * @throws IOException if any IO error occurs
+ */
+ private void handleFileStat(LocatedFileStatus stat) throws IOException
+ {
+ if (stat.isFile()) { // file
+ curFile = stat;
+ }
+ else if (recursive) { // directory
+ try {
+ RemoteIterator newDirItor = listLocatedStatus(stat.getPath());
+ itors.push(curItor);
+ curItor = newDirItor;
+ }
+ catch (FileNotFoundException ignored) {
+ LOGGER.debug("Directory {} deleted while attempting for recursive listing",
+ stat.getPath());
+ }
+ }
+ }
+
+ @Override
+ public LocatedFileStatus next() throws IOException
+ {
+ if (hasNext()) {
+ LocatedFileStatus result = curFile;
+ curFile = null;
+ return result;
+ }
+ throw new java.util.NoSuchElementException("No more entry in " + f);
+ }
+ };
+ }
+
+ /**
+ * Return the current user's home directory in this FileSystem.
+ * The default implementation returns {@code "/user/$USER/"}.
+ *
+ * @return the path.
+ */
+ public Path getHomeDirectory()
+ {
+ String username;
+ try {
+ username = UserGroupInformation.getCurrentUser().getShortUserName();
+ }
+ catch (IOException ex) {
+ LOGGER.warn("Unable to get user name. Fall back to system property " +
+ "user.name", ex);
+ username = System.getProperty("user.name");
+ }
+ return this.makeQualified(
+ new Path(USER_HOME_PREFIX + "/" + username));
+ }
+
+ /**
+ * Set the current working directory for the given FileSystem. All relative
+ * paths will be resolved relative to it.
+ *
+ * @param newDir Path of new working directory
+ */
+ public abstract void setWorkingDirectory(Path newDir);
+
+ /**
+ * Get the current working directory for the given FileSystem
+ *
+ * @return the directory pathname
+ */
+ public abstract Path getWorkingDirectory();
+
+ /**
+ * Note: with the new FileContext class, getWorkingDirectory()
+ * will be removed.
+ * The working directory is implemented in FileContext.
+ *
+ * Some FileSystems like LocalFileSystem have an initial workingDir
+ * that we use as the starting workingDir. For other file systems
+ * like HDFS there is no built in notion of an initial workingDir.
+ *
+ * @return if there is built in notion of workingDir then it
+ * is returned; else a null is returned.
+ */
+ protected Path getInitialWorkingDirectory()
+ {
+ return null;
+ }
+
+ /**
+ * Call {@link #mkdirs(Path, FsPermission)} with default permission.
+ *
+ * @param f path
+ * @return true if the directory was created
+ * @throws IOException IO failure
+ */
+ public boolean mkdirs(Path f) throws IOException
+ {
+ return mkdirs(f, FsPermission.getDirDefault());
+ }
+
+ /**
+ * Make the given file and all non-existent parents into
+ * directories. Has roughly the semantics of Unix @{code mkdir -p}.
+ * Existence of the directory hierarchy is not an error.
+ *
+ * @param f path to create
+ * @param permission to apply to f
+ * @return if mkdir success true, not false.
+ * @throws IOException IO failure
+ */
+ public abstract boolean mkdirs(Path f, FsPermission permission
+ ) throws IOException;
+
+ /**
+ * The src file is on the local disk. Add it to filesystem at
+ * the given dst name and the source is kept intact afterwards
+ *
+ * @param src path
+ * @param dst path
+ * @throws IOException IO failure
+ */
+ public void copyFromLocalFile(Path src, Path dst)
+ throws IOException
+ {
+ copyFromLocalFile(false, src, dst);
+ }
+
+ /**
+ * The src files is on the local disk. Add it to filesystem at
+ * the given dst name, removing the source afterwards.
+ *
+ * @param srcs source paths
+ * @param dst path
+ * @throws IOException IO failure
+ */
+ public void moveFromLocalFile(Path[] srcs, Path dst)
+ throws IOException
+ {
+ copyFromLocalFile(true, true, srcs, dst);
+ }
+
+ /**
+ * The src file is on the local disk. Add it to the filesystem at
+ * the given dst name, removing the source afterwards.
+ *
+ * @param src local path
+ * @param dst path
+ * @throws IOException IO failure
+ */
+ public void moveFromLocalFile(Path src, Path dst)
+ throws IOException
+ {
+ copyFromLocalFile(true, src, dst);
+ }
+
+ /**
+ * The src file is on the local disk. Add it to the filesystem at
+ * the given dst name.
+ * delSrc indicates if the source should be removed
+ *
+ * @param delSrc whether to delete the src
+ * @param src path
+ * @param dst path
+ * @throws IOException IO failure.
+ */
+ public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
+ throws IOException
+ {
+ copyFromLocalFile(delSrc, true, src, dst);
+ }
+
+ /**
+ * The src files are on the local disk. Add it to the filesystem at
+ * the given dst name.
+ * delSrc indicates if the source should be removed
+ *
+ * @param delSrc whether to delete the src
+ * @param overwrite whether to overwrite an existing file
+ * @param srcs array of paths which are source
+ * @param dst path
+ * @throws IOException IO failure
+ */
+ public void copyFromLocalFile(boolean delSrc, boolean overwrite,
+ Path[] srcs, Path dst)
+ throws IOException
+ {
+ Configuration conf = getConf();
+ FileUtil.copy(getLocal(conf), srcs, this, dst, delSrc, overwrite, conf);
+ }
+
+ /**
+ * The src file is on the local disk. Add it to the filesystem at
+ * the given dst name.
+ * delSrc indicates if the source should be removed
+ *
+ * @param delSrc whether to delete the src
+ * @param overwrite whether to overwrite an existing file
+ * @param src path
+ * @param dst path
+ * @throws IOException IO failure
+ */
+ public void copyFromLocalFile(boolean delSrc, boolean overwrite,
+ Path src, Path dst)
+ throws IOException
+ {
+ Configuration conf = getConf();
+ FileUtil.copy(getLocal(conf), src, this, dst, delSrc, overwrite, conf);
+ }
+
+ /**
+ * Copy it a file from the remote filesystem to the local one.
+ *
+ * @param src path src file in the remote filesystem
+ * @param dst path local destination
+ * @throws IOException IO failure
+ */
+ public void copyToLocalFile(Path src, Path dst) throws IOException
+ {
+ copyToLocalFile(false, src, dst);
+ }
+
+ /**
+ * Copy a file to the local filesystem, then delete it from the
+ * remote filesystem (if successfully copied).
+ *
+ * @param src path src file in the remote filesystem
+ * @param dst path local destination
+ * @throws IOException IO failure
+ */
+ public void moveToLocalFile(Path src, Path dst) throws IOException
+ {
+ copyToLocalFile(true, src, dst);
+ }
+
+ /**
+ * Copy it a file from a remote filesystem to the local one.
+ * delSrc indicates if the src will be removed or not.
+ *
+ * @param delSrc whether to delete the src
+ * @param src path src file in the remote filesystem
+ * @param dst path local destination
+ * @throws IOException IO failure
+ */
+ public void copyToLocalFile(boolean delSrc, Path src, Path dst)
+ throws IOException
+ {
+ copyToLocalFile(delSrc, src, dst, false);
+ }
+
+ /**
+ * The src file is under this filesystem, and the dst is on the local disk.
+ * Copy it from the remote filesystem to the local dst name.
+ * delSrc indicates if the src will be removed
+ * or not. useRawLocalFileSystem indicates whether to use RawLocalFileSystem
+ * as the local file system or not. RawLocalFileSystem is non checksumming,
+ * So, It will not create any crc files at local.
+ *
+ * @param delSrc whether to delete the src
+ * @param src path
+ * @param dst path
+ * @param useRawLocalFileSystem whether to use RawLocalFileSystem as local file system or not.
+ * @throws IOException for any IO error
+ */
+ public void copyToLocalFile(boolean delSrc, Path src, Path dst,
+ boolean useRawLocalFileSystem) throws IOException
+ {
+ Configuration conf = getConf();
+ FileSystem local = null;
+ if (useRawLocalFileSystem) {
+ local = getLocal(conf).getRawFileSystem();
+ }
+ else {
+ local = getLocal(conf);
+ }
+ FileUtil.copy(this, src, local, dst, delSrc, conf);
+ }
+
+ /**
+ * Returns a local file that the user can write output to. The caller
+ * provides both the eventual target name in this FileSystem
+ * and the local working file path.
+ * If this FileSystem is local, we write directly into the target. If
+ * the FileSystem is not local, we write into the tmp local area.
+ *
+ * @param fsOutputFile path of output file
+ * @param tmpLocalFile path of local tmp file
+ * @return the path.
+ * @throws IOException IO failure
+ */
+ public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
+ throws IOException
+ {
+ return tmpLocalFile;
+ }
+
+ /**
+ * Called when we're all done writing to the target.
+ * A local FS will do nothing, because we've written to exactly the
+ * right place.
+ * A remote FS will copy the contents of tmpLocalFile to the correct target at
+ * fsOutputFile.
+ *
+ * @param fsOutputFile path of output file
+ * @param tmpLocalFile path to local tmp file
+ * @throws IOException IO failure
+ */
+ public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
+ throws IOException
+ {
+ moveFromLocalFile(tmpLocalFile, fsOutputFile);
+ }
+
+ /**
+ * Close this FileSystem instance.
+ * Will release any held locks, delete all files queued for deletion
+ * through calls to {@link #deleteOnExit(Path)}, and remove this FS instance
+ * from the cache, if cached.
+ *
+ * After this operation, the outcome of any method call on this FileSystem
+ * instance, or any input/output stream created by it is undefined.
+ *
+ * @throws IOException IO failure
+ */
+ @Override
+ public void close() throws IOException
+ {
+ debugLogFileSystemClose("close", "Key: " + key + "; URI: " + getUri()
+ + "; Object Identity Hash: "
+ + Integer.toHexString(System.identityHashCode(this)));
+ // delete all files that were marked as delete-on-exit.
+ try {
+ processDeleteOnExit();
+ }
+ finally {
+ CACHE.remove(this.key, this);
+ }
+ }
+
+ /**
+ * Return the total size of all files in the filesystem.
+ *
+ * @return the number of path used.
+ * @throws IOException IO failure
+ */
+ public long getUsed() throws IOException
+ {
+ Path path = new Path("/");
+ return getUsed(path);
+ }
+
+ /**
+ * Return the total size of all files from a specified path.
+ *
+ * @param path the path.
+ * @return the number of path content summary.
+ * @throws IOException IO failure
+ */
+ public long getUsed(Path path) throws IOException
+ {
+ return getContentSummary(path).getLength();
+ }
+
+ /**
+ * Get the block size for a particular file.
+ *
+ * @param f the filename
+ * @return the number of bytes in a block
+ * @throws FileNotFoundException if the path is not present
+ * @throws IOException IO failure
+ * @deprecated Use {@link #getFileStatus(Path)} instead
+ */
+ @Deprecated
+ public long getBlockSize(Path f) throws IOException
+ {
+ return getFileStatus(f).getBlockSize();
+ }
+
+ /**
+ * Return the number of bytes that large input files should be optimally
+ * be split into to minimize I/O time.
+ *
+ * @return default block size.
+ * @deprecated use {@link #getDefaultBlockSize(Path)} instead
+ */
+ @Deprecated
+ public long getDefaultBlockSize()
+ {
+ // default to 32MB: large enough to minimize the impact of seeks
+ return getConf().getLong("fs.local.block.size", 32 * 1024 * 1024);
+ }
+
+ /**
+ * Return the number of bytes that large input files should be optimally
+ * be split into to minimize I/O time. The given path will be used to
+ * locate the actual filesystem. The full path does not have to exist.
+ *
+ * @param f path of file
+ * @return the default block size for the path's filesystem
+ */
+ public long getDefaultBlockSize(Path f)
+ {
+ return getDefaultBlockSize();
+ }
+
+ /**
+ * Get the default replication.
+ *
+ * @return the replication; the default value is "1".
+ * @deprecated use {@link #getDefaultReplication(Path)} instead
+ */
+ @Deprecated
+ public short getDefaultReplication()
+ {
+ return 1;
+ }
+
+ /**
+ * Get the default replication for a path.
+ * The given path will be used to locate the actual FileSystem to query.
+ * The full path does not have to exist.
+ *
+ * @param path of the file
+ * @return default replication for the path's filesystem
+ */
+ public short getDefaultReplication(Path path)
+ {
+ return getDefaultReplication();
+ }
+
+ /**
+ * Return a file status object that represents the path.
+ *
+ * @param f The path we want information from
+ * @return a FileStatus object
+ * @throws FileNotFoundException when the path does not exist
+ * @throws IOException see specific implementation
+ */
+ public abstract FileStatus getFileStatus(Path f) throws IOException;
+
+ /**
+ * Synchronize client metadata state.
+ *
+ * In some FileSystem implementations such as HDFS metadata
+ * synchronization is essential to guarantee consistency of read requests
+ * particularly in HA setting.
+ *
+ * @throws IOException If an I/O error occurred.
+ * @throws UnsupportedOperationException if the operation is unsupported.
+ */
+ public void msync() throws IOException, UnsupportedOperationException
+ {
+ throw new UnsupportedOperationException(getClass().getCanonicalName() +
+ " does not support method msync");
+ }
+
+ /**
+ * Checks if the user can access a path. The mode specifies which access
+ * checks to perform. If the requested permissions are granted, then the
+ * method returns normally. If access is denied, then the method throws an
+ * {@link AccessControlException}.
+ *
+ * The default implementation calls {@link #getFileStatus(Path)}
+ * and checks the returned permissions against the requested permissions.
+ *
+ * Note that the {@link #getFileStatus(Path)} call will be subject to
+ * authorization checks.
+ * Typically, this requires search (execute) permissions on each directory in
+ * the path's prefix, but this is implementation-defined. Any file system
+ * that provides a richer authorization model (such as ACLs) may override the
+ * default implementation so that it checks against that model instead.
+ *
+ * In general, applications should avoid using this method, due to the risk of
+ * time-of-check/time-of-use race conditions. The permissions on a file may
+ * change immediately after the access call returns. Most applications should
+ * prefer running specific file system actions as the desired user represented
+ * by a {@link UserGroupInformation}.
+ *
+ * @param path Path to check
+ * @param mode type of access to check
+ * @throws AccessControlException if access is denied
+ * @throws FileNotFoundException if the path does not exist
+ * @throws IOException see specific implementation
+ */
+ @InterfaceAudience.LimitedPrivate({"HDFS", "Hive"})
+ public void access(Path path, FsAction mode) throws AccessControlException,
+ FileNotFoundException, IOException
+ {
+ checkAccessPermissions(this.getFileStatus(path), mode);
+ }
+
+ /**
+ * This method provides the default implementation of
+ * {@link #access(Path, FsAction)}.
+ *
+ * @param stat FileStatus to check
+ * @param mode type of access to check
+ * @throws AccessControlException if access is denied
+ * @throws IOException for any error
+ */
+ @InterfaceAudience.Private
+ static void checkAccessPermissions(FileStatus stat, FsAction mode)
+ throws AccessControlException, IOException
+ {
+ FsPermission perm = stat.getPermission();
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ String user = ugi.getShortUserName();
+ if (user.equals(stat.getOwner())) {
+ if (perm.getUserAction().implies(mode)) {
+ return;
+ }
+ }
+ else if (ugi.getGroupsSet().contains(stat.getGroup())) {
+ if (perm.getGroupAction().implies(mode)) {
+ return;
+ }
+ }
+ else {
+ if (perm.getOtherAction().implies(mode)) {
+ return;
+ }
+ }
+ throw new AccessControlException(String.format(
+ "Permission denied: user=%s, path=\"%s\":%s:%s:%s%s", user, stat.getPath(),
+ stat.getOwner(), stat.getGroup(), stat.isDirectory() ? "d" : "-", perm));
+ }
+
+ /**
+ * See {@link FileContext#fixRelativePart}.
+ *
+ * @param p the path.
+ * @return relative part.
+ */
+ protected Path fixRelativePart(Path p)
+ {
+ if (p.isUriPathAbsolute()) {
+ return p;
+ }
+ else {
+ return new Path(getWorkingDirectory(), p);
+ }
+ }
+
+ /**
+ * See {@link FileContext#createSymlink(Path, Path, boolean)}.
+ *
+ * @param target target path.
+ * @param link link.
+ * @param createParent create parent.
+ * @throws AccessControlException if access is denied.
+ * @throws FileAlreadyExistsException when the path does not exist.
+ * @throws FileNotFoundException when the path does not exist.
+ * @throws ParentNotDirectoryException if the parent path of dest is not
+ * a directory.
+ * @throws UnsupportedFileSystemException if there was no known implementation
+ * for the scheme.
+ * @throws IOException raised on errors performing I/O.
+ */
+ public void createSymlink(final Path target, final Path link,
+ final boolean createParent) throws AccessControlException,
+ FileAlreadyExistsException, FileNotFoundException,
+ ParentNotDirectoryException, UnsupportedFileSystemException,
+ IOException
+ {
+ // Supporting filesystems should override this method
+ throw new UnsupportedOperationException(
+ "Filesystem does not support symlinks!");
+ }
+
+ /**
+ * See {@link FileContext#getFileLinkStatus(Path)}.
+ *
+ * @param f the path.
+ * @return file status
+ * @throws AccessControlException if access is denied.
+ * @throws FileNotFoundException when the path does not exist.
+ * @throws IOException raised on errors performing I/O.
+ * @throws UnsupportedFileSystemException if there was no known implementation
+ * for the scheme.
+ */
+ public FileStatus getFileLinkStatus(final Path f)
+ throws AccessControlException, FileNotFoundException,
+ UnsupportedFileSystemException, IOException
+ {
+ // Supporting filesystems should override this method
+ return getFileStatus(f);
+ }
+
+ /**
+ * See {@link AbstractFileSystem#supportsSymlinks()}.
+ *
+ * @return if support symlinkls true, not false.
+ */
+ public boolean supportsSymlinks()
+ {
+ return false;
+ }
+
+ /**
+ * See {@link FileContext#getLinkTarget(Path)}.
+ *
+ * @param f the path.
+ * @return the path.
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ * @throws IOException IO failure.
+ */
+ public Path getLinkTarget(Path f) throws IOException
+ {
+ // Supporting filesystems should override this method
+ throw new UnsupportedOperationException(
+ "Filesystem does not support symlinks!");
+ }
+
+ /**
+ * See {@link AbstractFileSystem#getLinkTarget(Path)}.
+ *
+ * @param f the path.
+ * @return the path.
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ * @throws IOException IO failure.
+ */
+ protected Path resolveLink(Path f) throws IOException
+ {
+ // Supporting filesystems should override this method
+ throw new UnsupportedOperationException(
+ "Filesystem does not support symlinks!");
+ }
+
+ /**
+ * Get the checksum of a file, if the FS supports checksums.
+ *
+ * @param f The file path
+ * @return The file checksum. The default return value is null,
+ * which indicates that no checksum algorithm is implemented
+ * in the corresponding FileSystem.
+ * @throws IOException IO failure
+ */
+ public FileChecksum getFileChecksum(Path f) throws IOException
+ {
+ return getFileChecksum(f, Long.MAX_VALUE);
+ }
+
+ /**
+ * Get the checksum of a file, from the beginning of the file till the
+ * specific length.
+ *
+ * @param f The file path
+ * @param length The length of the file range for checksum calculation
+ * @return The file checksum or null if checksums are not supported.
+ * @throws IOException IO failure
+ */
+ public FileChecksum getFileChecksum(Path f, final long length)
+ throws IOException
+ {
+ return null;
+ }
+
+ /**
+ * Set the verify checksum flag. This is only applicable if the
+ * corresponding filesystem supports checksums.
+ * By default doesn't do anything.
+ *
+ * @param verifyChecksum Verify checksum flag
+ */
+ public void setVerifyChecksum(boolean verifyChecksum)
+ {
+ //doesn't do anything
+ }
+
+ /**
+ * Set the write checksum flag. This is only applicable if the
+ * corresponding filesystem supports checksums.
+ * By default doesn't do anything.
+ *
+ * @param writeChecksum Write checksum flag
+ */
+ public void setWriteChecksum(boolean writeChecksum)
+ {
+ //doesn't do anything
+ }
+
+ /**
+ * Returns a status object describing the use and capacity of the
+ * filesystem. If the filesystem has multiple partitions, the
+ * use and capacity of the root partition is reflected.
+ *
+ * @return a FsStatus object
+ * @throws IOException see specific implementation
+ */
+ public FsStatus getStatus() throws IOException
+ {
+ return getStatus(null);
+ }
+
+ /**
+ * Returns a status object describing the use and capacity of the
+ * filesystem. If the filesystem has multiple partitions, the
+ * use and capacity of the partition pointed to by the specified
+ * path is reflected.
+ *
+ * @param p Path for which status should be obtained. null means
+ * the default partition.
+ * @return a FsStatus object
+ * @throws IOException see specific implementation
+ */
+ public FsStatus getStatus(Path p) throws IOException
+ {
+ return new FsStatus(Long.MAX_VALUE, 0, Long.MAX_VALUE);
+ }
+
+ /**
+ * Set permission of a path.
+ *
+ * @param p The path
+ * @param permission permission
+ * @throws IOException IO failure
+ */
+ public void setPermission(Path p, FsPermission permission
+ ) throws IOException
+ {
+ }
+
+ /**
+ * Set owner of a path (i.e. a file or a directory).
+ * The parameters username and groupname cannot both be null.
+ *
+ * @param p The path
+ * @param username If it is null, the original username remains unchanged.
+ * @param groupname If it is null, the original groupname remains unchanged.
+ * @throws IOException IO failure
+ */
+ public void setOwner(Path p, String username, String groupname
+ ) throws IOException
+ {
+ }
+
+ /**
+ * Set access time of a file.
+ *
+ * @param p The path
+ * @param mtime Set the modification time of this file.
+ * The number of milliseconds since Jan 1, 1970.
+ * A value of -1 means that this call should not set modification time.
+ * @param atime Set the access time of this file.
+ * The number of milliseconds since Jan 1, 1970.
+ * A value of -1 means that this call should not set access time.
+ * @throws IOException IO failure
+ */
+ public void setTimes(Path p, long mtime, long atime
+ ) throws IOException
+ {
+ }
+
+ /**
+ * Create a snapshot with a default name.
+ *
+ * @param path The directory where snapshots will be taken.
+ * @return the snapshot path.
+ * @throws IOException IO failure
+ * @throws UnsupportedOperationException if the operation is unsupported
+ */
+ public final Path createSnapshot(Path path) throws IOException
+ {
+ return createSnapshot(path, null);
+ }
+
+ /**
+ * Create a snapshot.
+ *
+ * @param path The directory where snapshots will be taken.
+ * @param snapshotName The name of the snapshot
+ * @return the snapshot path.
+ * @throws IOException IO failure
+ * @throws UnsupportedOperationException if the operation is unsupported
+ */
+ public Path createSnapshot(Path path, String snapshotName)
+ throws IOException
+ {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support createSnapshot");
+ }
+
+ /**
+ * Rename a snapshot.
+ *
+ * @param path The directory path where the snapshot was taken
+ * @param snapshotOldName Old name of the snapshot
+ * @param snapshotNewName New name of the snapshot
+ * @throws IOException IO failure
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ */
+ public void renameSnapshot(Path path, String snapshotOldName,
+ String snapshotNewName) throws IOException
+ {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support renameSnapshot");
+ }
+
+ /**
+ * Delete a snapshot of a directory.
+ *
+ * @param path The directory that the to-be-deleted snapshot belongs to
+ * @param snapshotName The name of the snapshot
+ * @throws IOException IO failure
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ */
+ public void deleteSnapshot(Path path, String snapshotName)
+ throws IOException
+ {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support deleteSnapshot");
+ }
+
+ /**
+ * Modifies ACL entries of files and directories. This method can add new ACL
+ * entries or modify the permissions on existing ACL entries. All existing
+ * ACL entries that are not specified in this call are retained without
+ * changes. (Modifications are merged into the current ACL.)
+ *
+ * @param path Path to modify
+ * @param aclSpec List<AclEntry> describing modifications
+ * @throws IOException if an ACL could not be modified
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ */
+ public void modifyAclEntries(Path path, List aclSpec)
+ throws IOException
+ {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support modifyAclEntries");
+ }
+
+ /**
+ * Removes ACL entries from files and directories. Other ACL entries are
+ * retained.
+ *
+ * @param path Path to modify
+ * @param aclSpec List describing entries to remove
+ * @throws IOException if an ACL could not be modified
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ */
+ public void removeAclEntries(Path path, List aclSpec)
+ throws IOException
+ {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support removeAclEntries");
+ }
+
+ /**
+ * Removes all default ACL entries from files and directories.
+ *
+ * @param path Path to modify
+ * @throws IOException if an ACL could not be modified
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ */
+ public void removeDefaultAcl(Path path)
+ throws IOException
+ {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support removeDefaultAcl");
+ }
+
+ /**
+ * Removes all but the base ACL entries of files and directories. The entries
+ * for user, group, and others are retained for compatibility with permission
+ * bits.
+ *
+ * @param path Path to modify
+ * @throws IOException if an ACL could not be removed
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ */
+ public void removeAcl(Path path)
+ throws IOException
+ {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support removeAcl");
+ }
+
+ /**
+ * Fully replaces ACL of files and directories, discarding all existing
+ * entries.
+ *
+ * @param path Path to modify
+ * @param aclSpec List describing modifications, which must include entries
+ * for user, group, and others for compatibility with permission bits.
+ * @throws IOException if an ACL could not be modified
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ */
+ public void setAcl(Path path, List aclSpec) throws IOException
+ {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support setAcl");
+ }
+
+ /**
+ * Gets the ACL of a file or directory.
+ *
+ * @param path Path to get
+ * @return AclStatus describing the ACL of the file or directory
+ * @throws IOException if an ACL could not be read
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ */
+ public AclStatus getAclStatus(Path path) throws IOException
+ {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support getAclStatus");
+ }
+
+ /**
+ * Set an xattr of a file or directory.
+ * The name must be prefixed with the namespace followed by ".". For example,
+ * "user.attr".
+ *
+ * Refer to the HDFS extended attributes user documentation for details.
+ *
+ * @param path Path to modify
+ * @param name xattr name.
+ * @param value xattr value.
+ * @throws IOException IO failure
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ */
+ public void setXAttr(Path path, String name, byte[] value)
+ throws IOException
+ {
+ setXAttr(path, name, value, EnumSet.of(XAttrSetFlag.CREATE,
+ XAttrSetFlag.REPLACE));
+ }
+
+ /**
+ * Set an xattr of a file or directory.
+ * The name must be prefixed with the namespace followed by ".". For example,
+ * "user.attr".
+ *
+ * Refer to the HDFS extended attributes user documentation for details.
+ *
+ * @param path Path to modify
+ * @param name xattr name.
+ * @param value xattr value.
+ * @param flag xattr set flag
+ * @throws IOException IO failure
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ */
+ public void setXAttr(Path path, String name, byte[] value,
+ EnumSet flag) throws IOException
+ {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support setXAttr");
+ }
+
+ /**
+ * Get an xattr name and value for a file or directory.
+ * The name must be prefixed with the namespace followed by ".". For example,
+ * "user.attr".
+ *
+ * Refer to the HDFS extended attributes user documentation for details.
+ *
+ * @param path Path to get extended attribute
+ * @param name xattr name.
+ * @return byte[] xattr value.
+ * @throws IOException IO failure
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ */
+ public byte[] getXAttr(Path path, String name) throws IOException
+ {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support getXAttr");
+ }
+
+ /**
+ * Get all of the xattr name/value pairs for a file or directory.
+ * Only those xattrs which the logged-in user has permissions to view
+ * are returned.
+ *
+ * Refer to the HDFS extended attributes user documentation for details.
+ *
+ * @param path Path to get extended attributes
+ * @return Map describing the XAttrs of the file or directory
+ * @throws IOException IO failure
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ */
+ public Map getXAttrs(Path path) throws IOException
+ {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support getXAttrs");
+ }
+
+ /**
+ * Get all of the xattrs name/value pairs for a file or directory.
+ * Only those xattrs which the logged-in user has permissions to view
+ * are returned.
+ *
+ * Refer to the HDFS extended attributes user documentation for details.
+ *
+ * @param path Path to get extended attributes
+ * @param names XAttr names.
+ * @return Map describing the XAttrs of the file or directory
+ * @throws IOException IO failure
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ */
+ public Map getXAttrs(Path path, List names)
+ throws IOException
+ {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support getXAttrs");
+ }
+
+ /**
+ * Get all of the xattr names for a file or directory.
+ * Only those xattr names which the logged-in user has permissions to view
+ * are returned.
+ *
+ * Refer to the HDFS extended attributes user documentation for details.
+ *
+ * @param path Path to get extended attributes
+ * @return List{@literal } of the XAttr names of the file or directory
+ * @throws IOException IO failure
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ */
+ public List listXAttrs(Path path) throws IOException
+ {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support listXAttrs");
+ }
+
+ /**
+ * Remove an xattr of a file or directory.
+ * The name must be prefixed with the namespace followed by ".". For example,
+ * "user.attr".
+ *
+ * Refer to the HDFS extended attributes user documentation for details.
+ *
+ * @param path Path to remove extended attribute
+ * @param name xattr name
+ * @throws IOException IO failure
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ */
+ public void removeXAttr(Path path, String name) throws IOException
+ {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support removeXAttr");
+ }
+
+ /**
+ * Set the source path to satisfy storage policy.
+ *
+ * @param path The source path referring to either a directory or a file.
+ * @throws IOException If an I/O error occurred.
+ */
+ public void satisfyStoragePolicy(final Path path) throws IOException
+ {
+ throw new UnsupportedOperationException(
+ getClass().getSimpleName() + " doesn't support setStoragePolicy");
+ }
+
+ /**
+ * Set the storage policy for a given file or directory.
+ *
+ * @param src file or directory path.
+ * @param policyName the name of the target storage policy. The list
+ * of supported Storage policies can be retrieved
+ * via {@link #getAllStoragePolicies}.
+ * @throws IOException IO failure
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ */
+ public void setStoragePolicy(final Path src, final String policyName)
+ throws IOException
+ {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support setStoragePolicy");
+ }
+
+ /**
+ * Unset the storage policy set for a given file or directory.
+ *
+ * @param src file or directory path.
+ * @throws IOException IO failure
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ */
+ public void unsetStoragePolicy(final Path src) throws IOException
+ {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support unsetStoragePolicy");
+ }
+
+ /**
+ * Query the effective storage policy ID for the given file or directory.
+ *
+ * @param src file or directory path.
+ * @return storage policy for give file.
+ * @throws IOException IO failure
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ */
+ public BlockStoragePolicySpi getStoragePolicy(final Path src)
+ throws IOException
+ {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support getStoragePolicy");
+ }
+
+ /**
+ * Retrieve all the storage policies supported by this file system.
+ *
+ * @return all storage policies supported by this filesystem.
+ * @throws IOException IO failure
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ */
+ public Collection extends BlockStoragePolicySpi> getAllStoragePolicies()
+ throws IOException
+ {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support getAllStoragePolicies");
+ }
+
+ /**
+ * Get the root directory of Trash for current user when the path specified
+ * is deleted.
+ *
+ * @param path the trash root of the path to be determined.
+ * @return the default implementation returns {@code /user/$USER/.Trash}
+ */
+ public Path getTrashRoot(Path path)
+ {
+ return this.makeQualified(new Path(getHomeDirectory().toUri().getPath(),
+ TRASH_PREFIX));
+ }
+
+ /**
+ * Get all the trash roots for current user or all users.
+ *
+ * @param allUsers return trash roots for all users if true.
+ * @return all the trash root directories.
+ * Default FileSystem returns .Trash under users' home directories if
+ * {@code /user/$USER/.Trash} exists.
+ */
+ public Collection getTrashRoots(boolean allUsers)
+ {
+ Path userHome = new Path(getHomeDirectory().toUri().getPath());
+ List ret = new ArrayList<>();
+ try {
+ if (!allUsers) {
+ Path userTrash = new Path(userHome, TRASH_PREFIX);
+ if (exists(userTrash)) {
+ ret.add(getFileStatus(userTrash));
+ }
+ }
+ else {
+ Path homeParent = userHome.getParent();
+ if (exists(homeParent)) {
+ FileStatus[] candidates = listStatus(homeParent);
+ for (FileStatus candidate : candidates) {
+ Path userTrash = new Path(candidate.getPath(), TRASH_PREFIX);
+ if (exists(userTrash)) {
+ candidate.setPath(userTrash);
+ ret.add(candidate);
+ }
+ }
+ }
+ }
+ }
+ catch (IOException e) {
+ LOGGER.warn("Cannot get all trash roots", e);
+ }
+ return ret;
+ }
+
+ /**
+ * The base FileSystem implementation generally has no knowledge
+ * of the capabilities of actual implementations.
+ * Unless it has a way to explicitly determine the capabilities,
+ * this method returns false.
+ * {@inheritDoc}
+ */
+ public boolean hasPathCapability(final Path path, final String capability)
+ throws IOException
+ {
+ switch (validatePathCapabilityArgs(makeQualified(path), capability)) {
+ case CommonPathCapabilities.BULK_DELETE:
+ // bulk delete has default implementation which
+ // can called on any FileSystem.
+ return true;
+ case CommonPathCapabilities.FS_SYMLINKS:
+ // delegate to the existing supportsSymlinks() call.
+ return supportsSymlinks() && areSymlinksEnabled();
+ default:
+ // the feature is not implemented.
+ return false;
+ }
+ }
+
+ // making it volatile to be able to do a double checked locking
+ private static volatile boolean fileSystemsLoaded;
+
+ /**
+ * Filesystems listed as services.
+ */
+ private static final Map>
+ SERVICE_FILE_SYSTEMS = new HashMap<>();
+
+ /**
+ * Load the filesystem declarations from service resources.
+ * This is a synchronized operation.
+ */
+ private static void loadFileSystems()
+ {
+ LOGGER.debug("Loading filesystems");
+ synchronized (FileSystem.class) {
+ if (!fileSystemsLoaded) {
+ ServiceLoader serviceLoader = ServiceLoader.load(FileSystem.class);
+ Iterator it = serviceLoader.iterator();
+ while (it.hasNext()) {
+ FileSystem fs;
+ try {
+ fs = it.next();
+ try {
+ SERVICE_FILE_SYSTEMS.put(fs.getScheme(), fs.getClass());
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("{}:// = {} from {}",
+ fs.getScheme(), fs.getClass(),
+ ClassUtil.findContainingJar(fs.getClass()));
+ }
+ }
+ catch (Exception e) {
+ LOGGER.warn("Cannot load: {} from {}", fs,
+ ClassUtil.findContainingJar(fs.getClass()));
+ LOGGER.info("Full exception loading: {}", fs, e);
+ }
+ }
+ catch (ServiceConfigurationError ee) {
+ LOGGER.warn("Cannot load filesystem", ee);
+ }
+ }
+ fileSystemsLoaded = true;
+ }
+ }
+ }
+
+ /**
+ * Get the FileSystem implementation class of a filesystem.
+ * This triggers a scan and load of all FileSystem implementations listed as
+ * services and discovered via the {@link ServiceLoader}
+ *
+ * @param scheme URL scheme of FS
+ * @param conf configuration: can be null, in which case the check for
+ * a filesystem binding declaration in the configuration is skipped.
+ * @return the filesystem
+ * @throws UnsupportedFileSystemException if there was no known implementation
+ * for the scheme.
+ * @throws IOException if the filesystem could not be loaded
+ */
+ public static Class extends FileSystem> getFileSystemClass(String scheme,
+ Configuration conf) throws IOException
+ {
+ if (!fileSystemsLoaded) {
+ loadFileSystems();
+ }
+ LOGGER.debug("Looking for FS supporting {}", scheme);
+ Class extends FileSystem> clazz = null;
+ if (conf != null) {
+ String property = "fs." + scheme + ".impl";
+ LOGGER.debug("looking for configuration option {}", property);
+ clazz = (Class extends FileSystem>) conf.getClass(
+ property, null);
+ }
+ else {
+ LOGGER.debug("No configuration: skipping check for fs.{}.impl", scheme);
+ }
+ if (clazz == null) {
+ LOGGER.debug("Looking in service filesystems for implementation class");
+ clazz = SERVICE_FILE_SYSTEMS.get(scheme);
+ }
+ else {
+ LOGGER.debug("Filesystem {} defined in configuration option", scheme);
+ }
+ if (clazz == null) {
+ throw new UnsupportedFileSystemException("No FileSystem for scheme "
+ + "\"" + scheme + "\"");
+ }
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("FS for {} is {}", scheme, clazz);
+ final String jarLocation = ClassUtil.findContainingJar(clazz);
+ if (jarLocation != null) {
+ LOGGER.debug("Jar location for {} : {}", clazz, jarLocation);
+ }
+ else {
+ LOGGER.debug("Class location for {} : {}", clazz, ClassUtil.findClassLocation(clazz));
+ }
+ }
+ return clazz;
+ }
+
+ /**
+ * Create and initialize a new instance of a FileSystem.
+ *
+ * @param uri URI containing the FS schema and FS details
+ * @param conf configuration to use to look for the FS instance declaration
+ * and to pass to the {@link FileSystem#initialize(URI, Configuration)}.
+ * @return the initialized filesystem.
+ * @throws IOException problems loading or initializing the FileSystem
+ */
+ private static FileSystem createFileSystem(URI uri, Configuration conf)
+ throws IOException
+ {
+ Tracer tracer = FsTracer.get(conf);
+ try (TraceScope scope = tracer.newScope("FileSystem#createFileSystem");
+ DurationInfo ignored =
+ new DurationInfo(LOGGER, false, "Creating FS %s", uri)) {
+ scope.addKVAnnotation("scheme", uri.getScheme());
+ Class extends FileSystem> clazz =
+ getFileSystemClass(uri.getScheme(), conf);
+ FileSystem fs = ReflectionUtils.newInstance(clazz, conf);
+ try {
+ fs.initialize(uri, conf);
+ }
+ catch (IOException | RuntimeException e) {
+ // exception raised during initialization.
+ // log summary at warn and full stack at debug
+ LOGGER.warn("Failed to initialize filesystem {}: {}",
+ uri, e.toString());
+ LOGGER.debug("Failed to initialize filesystem", e);
+ // then (robustly) close the FS, so as to invoke any
+ // cleanup code.
+ IOUtils.cleanupWithLogger(LOGGER, fs);
+ throw e;
+ }
+ return fs;
+ }
+ }
+
+ /**
+ * Caching FileSystem objects.
+ */
+ static class Cache
+ {
+ private final ClientFinalizer clientFinalizer = new ClientFinalizer();
+
+ private final Map map = new HashMap<>();
+ private final Set toAutoClose = new HashSet<>();
+
+ /**
+ * Semaphore used to serialize creation of new FS instances.
+ */
+ private final Semaphore creatorPermits;
+
+ /**
+ * Counter of the number of discarded filesystem instances
+ * in this cache. Primarily for testing, but it could possibly
+ * be made visible as some kind of metric.
+ */
+ private final AtomicLong discardedInstances = new AtomicLong(0);
+
+ /**
+ * A variable that makes all objects in the cache unique.
+ */
+ private static AtomicLong unique = new AtomicLong(1);
+
+ /**
+ * Instantiate. The configuration is used to read the
+ * count of permits issued for concurrent creation
+ * of filesystem instances.
+ *
+ * @param conf configuration
+ */
+ Cache(final Configuration conf)
+ {
+ int permits = conf.getInt(FS_CREATION_PARALLEL_COUNT,
+ FS_CREATION_PARALLEL_COUNT_DEFAULT);
+ checkArgument(permits > 0, "Invalid value of %s: %s",
+ FS_CREATION_PARALLEL_COUNT, permits);
+ creatorPermits = new Semaphore(permits);
+ }
+
+ FileSystem get(URI uri, Configuration conf) throws IOException
+ {
+ Key key = new Key(uri, conf);
+ return getInternal(uri, conf, key);
+ }
+
+ /**
+ * The objects inserted into the cache using this method are all unique.
+ */
+ FileSystem getUnique(URI uri, Configuration conf) throws IOException
+ {
+ Key key = new Key(uri, conf, unique.getAndIncrement());
+ return getInternal(uri, conf, key);
+ }
+
+ /**
+ * Get the FS instance if the key maps to an instance, creating and
+ * initializing the FS if it is not found.
+ * If this is the first entry in the map and the JVM is not shutting down,
+ * this registers a shutdown hook to close filesystems, and adds this
+ * FS to the {@code toAutoClose} set if {@code "fs.automatic.close"}
+ * is set in the configuration (default: true).
+ *
+ * @param uri filesystem URI
+ * @param conf configuration
+ * @param key key to store/retrieve this FileSystem in the cache
+ * @return a cached or newly instantiated FileSystem.
+ * @throws IOException If an I/O error occurred.
+ */
+ private FileSystem getInternal(URI uri, Configuration conf, Key key)
+ throws IOException
+ {
+ FileSystem fs;
+ synchronized (this) {
+ fs = map.get(key);
+ }
+ if (fs != null) {
+ return fs;
+ }
+ // fs not yet created, acquire lock
+ // to construct an instance.
+ try (DurationInfo d = new DurationInfo(LOGGER, false,
+ "Acquiring creator semaphore for %s", uri)) {
+ creatorPermits.acquireUninterruptibly();
+ }
+ FileSystem fsToClose = null;
+ try {
+ // See if FS was instantiated by another thread while waiting
+ // for the permit.
+ synchronized (this) {
+ fs = map.get(key);
+ }
+ if (fs != null) {
+ LOGGER.debug("Filesystem {} created while awaiting semaphore", uri);
+ return fs;
+ }
+ // create the filesystem
+ fs = createFileSystem(uri, conf);
+ final long timeout = conf.getTimeDuration(SERVICE_SHUTDOWN_TIMEOUT,
+ SERVICE_SHUTDOWN_TIMEOUT_DEFAULT,
+ ShutdownHookManager.TIME_UNIT_DEFAULT);
+ // any FS to close outside of the synchronized section
+ synchronized (this) { // lock on the Cache object
+
+ // see if there is now an entry for the FS, which happens
+ // if another thread's creation overlapped with this one.
+ FileSystem oldfs = map.get(key);
+ if (oldfs != null) {
+ // a file system was created in a separate thread.
+ // save the FS reference to close outside all locks,
+ // and switch to returning the oldFS
+ fsToClose = fs;
+ fs = oldfs;
+ }
+ else {
+ // register the clientFinalizer if needed and shutdown isn't
+ // already active
+ if (map.isEmpty()
+ && !ShutdownHookManager.get().isShutdownInProgress()) {
+ ShutdownHookManager.get().addShutdownHook(clientFinalizer,
+ SHUTDOWN_HOOK_PRIORITY, timeout,
+ ShutdownHookManager.TIME_UNIT_DEFAULT);
+ }
+ // insert the new file system into the map
+ fs.key = key;
+ map.put(key, fs);
+ if (conf.getBoolean(
+ FS_AUTOMATIC_CLOSE_KEY, FS_AUTOMATIC_CLOSE_DEFAULT)) {
+ toAutoClose.add(key);
+ }
+ }
+ } // end of synchronized block
+ }
+ finally {
+ // release the creator permit.
+ creatorPermits.release();
+ }
+ if (fsToClose != null) {
+ LOGGER.debug("Duplicate FS created for {}; discarding {}",
+ uri, fs);
+ discardedInstances.incrementAndGet();
+ // close the new file system
+ // note this will briefly remove and reinstate "fsToClose" from
+ // the map. It is done in a synchronized block so will not be
+ // visible to others.
+ IOUtils.cleanupWithLogger(LOGGER, fsToClose);
+ }
+ return fs;
+ }
+
+ /**
+ * Get the count of discarded instances.
+ *
+ * @return the new instance.
+ */
+ @VisibleForTesting
+ long getDiscardedInstances()
+ {
+ return discardedInstances.get();
+ }
+
+ synchronized void remove(Key key, FileSystem fs)
+ {
+ FileSystem cachedFs = map.remove(key);
+ if (fs == cachedFs) {
+ toAutoClose.remove(key);
+ }
+ else if (cachedFs != null) {
+ map.put(key, cachedFs);
+ }
+ }
+
+ /**
+ * Close all FileSystems in the cache, whether they are marked for
+ * automatic closing or not.
+ *
+ * @throws IOException a problem arose closing one or more FileSystem.
+ */
+ synchronized void closeAll() throws IOException
+ {
+ closeAll(false);
+ }
+
+ /**
+ * Close all FileSystem instances in the Cache.
+ *
+ * @param onlyAutomatic only close those that are marked for automatic closing
+ * @throws IOException a problem arose closing one or more FileSystem.
+ */
+ synchronized void closeAll(boolean onlyAutomatic) throws IOException
+ {
+ List exceptions = new ArrayList<>();
+
+ // Make a copy of the keys in the map since we'll be modifying
+ // the map while iterating over it, which isn't safe.
+ List keys = new ArrayList<>();
+ keys.addAll(map.keySet());
+
+ for (Key key : keys) {
+ final FileSystem fs = map.get(key);
+
+ if (onlyAutomatic && !toAutoClose.contains(key)) {
+ continue;
+ }
+
+ //remove from cache
+ map.remove(key);
+ toAutoClose.remove(key);
+
+ if (fs != null) {
+ try {
+ fs.close();
+ }
+ catch (IOException ioe) {
+ exceptions.add(ioe);
+ }
+ }
+ }
+
+ if (!exceptions.isEmpty()) {
+ throw MultipleIOException.createIOException(exceptions);
+ }
+ }
+
+ private class ClientFinalizer
+ implements Runnable
+ {
+ @Override
+ public synchronized void run()
+ {
+ try {
+ closeAll(true);
+ }
+ catch (IOException e) {
+ LOGGER.info("FileSystem.Cache.closeAll() threw an exception:\n" + e);
+ }
+ }
+ }
+
+ synchronized void closeAll(UserGroupInformation ugi) throws IOException
+ {
+ List targetFSList = new ArrayList<>(map.entrySet().size());
+ //Make a pass over the list and collect the FileSystems to close
+ //we cannot close inline since close() removes the entry from the Map
+ for (Map.Entry entry : map.entrySet()) {
+ final Key key = entry.getKey();
+ final FileSystem fs = entry.getValue();
+ if (ugi.equals(key.ugi) && fs != null) {
+ targetFSList.add(fs);
+ }
+ }
+ List exceptions = new ArrayList<>();
+ //now make a pass over the target list and close each
+ for (FileSystem fs : targetFSList) {
+ try {
+ fs.close();
+ }
+ catch (IOException ioe) {
+ exceptions.add(ioe);
+ }
+ }
+ if (!exceptions.isEmpty()) {
+ throw MultipleIOException.createIOException(exceptions);
+ }
+ }
+
+ /**
+ * FileSystem.Cache.Key
+ */
+ static class Key
+ {
+ final String scheme;
+ final String authority;
+ final UserGroupInformation ugi;
+ final long unique; // an artificial way to make a key unique
+
+ Key(URI uri, Configuration conf) throws IOException
+ {
+ this(uri, conf, 0);
+ }
+
+ Key(URI uri, Configuration conf, long unique) throws IOException
+ {
+ scheme = uri.getScheme() == null ?
+ "" : StringUtils.toLowerCase(uri.getScheme());
+ authority = uri.getAuthority() == null ?
+ "" : StringUtils.toLowerCase(uri.getAuthority());
+ this.unique = unique;
+
+ this.ugi = UserGroupInformation.getCurrentUser();
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return (scheme + authority).hashCode() + ugi.hashCode() + (int) unique;
+ }
+
+ static boolean isEqual(Object a, Object b)
+ {
+ return a == b || (a != null && a.equals(b));
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (obj == this) {
+ return true;
+ }
+ if (obj instanceof Key) {
+ Key that = (Key) obj;
+ return isEqual(this.scheme, that.scheme)
+ && isEqual(this.authority, that.authority)
+ && isEqual(this.ugi, that.ugi)
+ && (this.unique == that.unique);
+ }
+ return false;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "(" + ugi.toString() + ")@" + scheme + "://" + authority;
+ }
+ }
+ }
+
+ /**
+ * Tracks statistics about how many reads, writes, and so forth have been
+ * done in a FileSystem.
+ *
+ * Since there is only one of these objects per FileSystem, there will
+ * typically be many threads writing to this object. Almost every operation
+ * on an open file will involve a write to this object. In contrast, reading
+ * statistics is done infrequently by most programs, and not at all by others.
+ * Hence, this is optimized for writes.
+ *
+ * Each thread writes to its own thread-local area of memory. This removes
+ * contention and allows us to scale up to many, many threads. To read
+ * statistics, the reader thread totals up the contents of all of the
+ * thread-local data areas.
+ */
+ public static final class Statistics
+ {
+ /**
+ * Statistics data.
+ *
+ * There is only a single writer to thread-local StatisticsData objects.
+ * Hence, volatile is adequate here-- we do not need AtomicLong or similar
+ * to prevent lost updates.
+ * The Java specification guarantees that updates to volatile longs will
+ * be perceived as atomic with respect to other threads, which is all we
+ * need.
+ */
+ public static class StatisticsData
+ {
+ private volatile long bytesRead;
+ private volatile long bytesWritten;
+ private volatile int readOps;
+ private volatile int largeReadOps;
+ private volatile int writeOps;
+ private volatile long bytesReadLocalHost;
+ private volatile long bytesReadDistanceOfOneOrTwo;
+ private volatile long bytesReadDistanceOfThreeOrFour;
+ private volatile long bytesReadDistanceOfFiveOrLarger;
+ private volatile long bytesReadErasureCoded;
+ private volatile long remoteReadTimeMS;
+
+ /**
+ * Add another StatisticsData object to this one.
+ */
+ void add(StatisticsData other)
+ {
+ this.bytesRead += other.bytesRead;
+ this.bytesWritten += other.bytesWritten;
+ this.readOps += other.readOps;
+ this.largeReadOps += other.largeReadOps;
+ this.writeOps += other.writeOps;
+ this.bytesReadLocalHost += other.bytesReadLocalHost;
+ this.bytesReadDistanceOfOneOrTwo += other.bytesReadDistanceOfOneOrTwo;
+ this.bytesReadDistanceOfThreeOrFour +=
+ other.bytesReadDistanceOfThreeOrFour;
+ this.bytesReadDistanceOfFiveOrLarger +=
+ other.bytesReadDistanceOfFiveOrLarger;
+ this.bytesReadErasureCoded += other.bytesReadErasureCoded;
+ this.remoteReadTimeMS += other.remoteReadTimeMS;
+ }
+
+ /**
+ * Negate the values of all statistics.
+ */
+ void negate()
+ {
+ this.bytesRead = -this.bytesRead;
+ this.bytesWritten = -this.bytesWritten;
+ this.readOps = -this.readOps;
+ this.largeReadOps = -this.largeReadOps;
+ this.writeOps = -this.writeOps;
+ this.bytesReadLocalHost = -this.bytesReadLocalHost;
+ this.bytesReadDistanceOfOneOrTwo = -this.bytesReadDistanceOfOneOrTwo;
+ this.bytesReadDistanceOfThreeOrFour =
+ -this.bytesReadDistanceOfThreeOrFour;
+ this.bytesReadDistanceOfFiveOrLarger =
+ -this.bytesReadDistanceOfFiveOrLarger;
+ this.bytesReadErasureCoded = -this.bytesReadErasureCoded;
+ this.remoteReadTimeMS = -this.remoteReadTimeMS;
+ }
+
+ @Override
+ public String toString()
+ {
+ return bytesRead + " bytes read, " + bytesWritten + " bytes written, "
+ + readOps + " read ops, " + largeReadOps + " large read ops, "
+ + writeOps + " write ops";
+ }
+
+ public long getBytesRead()
+ {
+ return bytesRead;
+ }
+
+ public long getBytesWritten()
+ {
+ return bytesWritten;
+ }
+
+ public int getReadOps()
+ {
+ return readOps;
+ }
+
+ public int getLargeReadOps()
+ {
+ return largeReadOps;
+ }
+
+ public int getWriteOps()
+ {
+ return writeOps;
+ }
+
+ public long getBytesReadLocalHost()
+ {
+ return bytesReadLocalHost;
+ }
+
+ public long getBytesReadDistanceOfOneOrTwo()
+ {
+ return bytesReadDistanceOfOneOrTwo;
+ }
+
+ public long getBytesReadDistanceOfThreeOrFour()
+ {
+ return bytesReadDistanceOfThreeOrFour;
+ }
+
+ public long getBytesReadDistanceOfFiveOrLarger()
+ {
+ return bytesReadDistanceOfFiveOrLarger;
+ }
+
+ public long getBytesReadErasureCoded()
+ {
+ return bytesReadErasureCoded;
+ }
+
+ public long getRemoteReadTimeMS()
+ {
+ return remoteReadTimeMS;
+ }
+ }
+
+ private interface StatisticsAggregator
+ {
+ void accept(StatisticsData data);
+
+ T aggregate();
+ }
+
+ private final String scheme;
+
+ /**
+ * rootData is data that doesn't belong to any thread, but will be added
+ * to the totals. This is useful for making copies of Statistics objects,
+ * and for storing data that pertains to threads that have been garbage
+ * collected. Protected by the Statistics lock.
+ */
+ private final StatisticsData rootData;
+
+ /**
+ * Thread-local data.
+ */
+ @SuppressWarnings("ThreadLocalNotStaticFinal")
+ private final ThreadLocal threadData;
+
+ /**
+ * Set of all thread-local data areas. Protected by the Statistics lock.
+ * The references to the statistics data are kept using weak references
+ * to the associated threads. Proper clean-up is performed by the cleaner
+ * thread when the threads are garbage collected.
+ */
+ private final Set allData;
+
+ /**
+ * Global reference queue and a cleaner thread that manage statistics data
+ * references from all filesystem instances.
+ */
+ private static final ReferenceQueue STATS_DATA_REF_QUEUE;
+ private static final Thread STATS_DATA_CLEANER;
+
+ static
+ {
+ STATS_DATA_REF_QUEUE = new ReferenceQueue<>();
+ // start a single daemon cleaner thread
+ STATS_DATA_CLEANER = new Thread(new StatisticsDataReferenceCleaner());
+ STATS_DATA_CLEANER.setName(StatisticsDataReferenceCleaner.class.getName());
+ STATS_DATA_CLEANER.setDaemon(true);
+ STATS_DATA_CLEANER.setContextClassLoader(null);
+ STATS_DATA_CLEANER.start();
+ }
+
+ public Statistics(String scheme)
+ {
+ this.scheme = scheme;
+ this.rootData = new StatisticsData();
+ this.threadData = new ThreadLocal<>();
+ this.allData = new HashSet<>();
+ }
+
+ /**
+ * Copy constructor.
+ *
+ * @param other The input Statistics object which is cloned.
+ */
+ public Statistics(Statistics other)
+ {
+ this.scheme = other.scheme;
+ this.rootData = new StatisticsData();
+ other.visitAll(new StatisticsAggregator()
+ {
+ @Override
+ public void accept(StatisticsData data)
+ {
+ rootData.add(data);
+ }
+
+ public Void aggregate()
+ {
+ return null;
+ }
+ });
+ this.threadData = new ThreadLocal<>();
+ this.allData = new HashSet<>();
+ }
+
+ /**
+ * A weak reference to a thread that also includes the data associated
+ * with that thread. On the thread being garbage collected, it is enqueued
+ * to the reference queue for clean-up.
+ */
+ private final class StatisticsDataReference
+ extends WeakReference
+ {
+ private final StatisticsData data;
+
+ private StatisticsDataReference(StatisticsData data, Thread thread)
+ {
+ super(thread, STATS_DATA_REF_QUEUE);
+ this.data = data;
+ }
+
+ public StatisticsData getData()
+ {
+ return data;
+ }
+
+ /**
+ * Performs clean-up action when the associated thread is garbage
+ * collected.
+ */
+ public void cleanUp()
+ {
+ // use the statistics lock for safety
+ synchronized (Statistics.this) {
+ /*
+ * If the thread that created this thread-local data no longer exists,
+ * remove the StatisticsData from our list and fold the values into
+ * rootData.
+ */
+ rootData.add(data);
+ allData.remove(this);
+ }
+ }
+ }
+ /**
+ * Background action to act on references being removed.
+ */
+
+ private static class StatisticsDataReferenceCleaner
+ implements Runnable
+ {
+ @Override
+ public void run()
+ {
+ while (!Thread.interrupted()) {
+ try {
+ StatisticsDataReference ref =
+ (StatisticsDataReference) STATS_DATA_REF_QUEUE.remove();
+ ref.cleanUp();
+ }
+ catch (InterruptedException ie) {
+ LOGGER.warn("Cleaner thread interrupted, will stop", ie);
+ Thread.currentThread().interrupt();
+ }
+ catch (Throwable th) {
+ LOGGER.warn("Exception in the cleaner thread but it will" +
+ " continue to run", th);
+ }
+ }
+ }
+ }
+
+ /**
+ * Get or create the thread-local data associated with the current thread.
+ *
+ * @return statistics data.
+ */
+ public StatisticsData getThreadStatistics()
+ {
+ StatisticsData data = threadData.get();
+ if (data == null) {
+ data = new StatisticsData();
+ threadData.set(data);
+ StatisticsDataReference ref =
+ new StatisticsDataReference(data, Thread.currentThread());
+ synchronized (this) {
+ allData.add(ref);
+ }
+ }
+ return data;
+ }
+
+ /**
+ * Increment the bytes read in the statistics.
+ *
+ * @param newBytes the additional bytes read
+ */
+ public void incrementBytesRead(long newBytes)
+ {
+ getThreadStatistics().bytesRead += newBytes;
+ }
+
+ /**
+ * Increment the bytes written in the statistics.
+ *
+ * @param newBytes the additional bytes written
+ */
+ public void incrementBytesWritten(long newBytes)
+ {
+ getThreadStatistics().bytesWritten += newBytes;
+ }
+
+ /**
+ * Increment the number of read operations.
+ *
+ * @param count number of read operations
+ */
+ public void incrementReadOps(int count)
+ {
+ getThreadStatistics().readOps += count;
+ }
+
+ /**
+ * Increment the number of large read operations.
+ *
+ * @param count number of large read operations
+ */
+ public void incrementLargeReadOps(int count)
+ {
+ getThreadStatistics().largeReadOps += count;
+ }
+
+ /**
+ * Increment the number of write operations.
+ *
+ * @param count number of write operations
+ */
+ public void incrementWriteOps(int count)
+ {
+ getThreadStatistics().writeOps += count;
+ }
+
+ /**
+ * Increment the bytes read on erasure-coded files in the statistics.
+ *
+ * @param newBytes the additional bytes read
+ */
+ public void incrementBytesReadErasureCoded(long newBytes)
+ {
+ getThreadStatistics().bytesReadErasureCoded += newBytes;
+ }
+
+ /**
+ * Increment the bytes read by the network distance in the statistics
+ * In the common network topology setup, distance value should be an even
+ * number such as 0, 2, 4, 6. To make it more general, we group distance
+ * by {1, 2}, {3, 4} and {5 and beyond} for accounting.
+ *
+ * @param distance the network distance
+ * @param newBytes the additional bytes read
+ */
+ public void incrementBytesReadByDistance(int distance, long newBytes)
+ {
+ switch (distance) {
+ case 0:
+ getThreadStatistics().bytesReadLocalHost += newBytes;
+ break;
+ case 1:
+ case 2:
+ getThreadStatistics().bytesReadDistanceOfOneOrTwo += newBytes;
+ break;
+ case 3:
+ case 4:
+ getThreadStatistics().bytesReadDistanceOfThreeOrFour += newBytes;
+ break;
+ default:
+ getThreadStatistics().bytesReadDistanceOfFiveOrLarger += newBytes;
+ break;
+ }
+ }
+
+ /**
+ * Increment the time taken to read bytes from remote in the statistics.
+ *
+ * @param durationMS time taken in ms to read bytes from remote
+ */
+ public void increaseRemoteReadTime(final long durationMS)
+ {
+ getThreadStatistics().remoteReadTimeMS += durationMS;
+ }
+
+ /**
+ * Apply the given aggregator to all StatisticsData objects associated with
+ * this Statistics object.
+ *
+ * For each StatisticsData object, we will call accept on the visitor.
+ * Finally, at the end, we will call aggregate to get the final total.
+ *
+ * @param visitor to use.
+ * @return The total.
+ */
+ private synchronized T visitAll(StatisticsAggregator visitor)
+ {
+ visitor.accept(rootData);
+ for (StatisticsDataReference ref : allData) {
+ StatisticsData data = ref.getData();
+ visitor.accept(data);
+ }
+ return visitor.aggregate();
+ }
+
+ /**
+ * Get the total number of bytes read.
+ *
+ * @return the number of bytes
+ */
+ public long getBytesRead()
+ {
+ return visitAll(new StatisticsAggregator()
+ {
+ private long bytesRead;
+
+ @Override
+ public void accept(StatisticsData data)
+ {
+ bytesRead += data.bytesRead;
+ }
+
+ public Long aggregate()
+ {
+ return bytesRead;
+ }
+ });
+ }
+
+ /**
+ * Get the total number of bytes written.
+ *
+ * @return the number of bytes
+ */
+ public long getBytesWritten()
+ {
+ return visitAll(new StatisticsAggregator()
+ {
+ private long bytesWritten;
+
+ @Override
+ public void accept(StatisticsData data)
+ {
+ bytesWritten += data.bytesWritten;
+ }
+
+ public Long aggregate()
+ {
+ return bytesWritten;
+ }
+ });
+ }
+
+ /**
+ * Get the number of file system read operations such as list files.
+ *
+ * @return number of read operations
+ */
+ public int getReadOps()
+ {
+ return visitAll(new StatisticsAggregator()
+ {
+ private int readOps;
+
+ @Override
+ public void accept(StatisticsData data)
+ {
+ readOps += data.readOps;
+ readOps += data.largeReadOps;
+ }
+
+ public Integer aggregate()
+ {
+ return readOps;
+ }
+ });
+ }
+
+ /**
+ * Get the number of large file system read operations such as list files
+ * under a large directory.
+ *
+ * @return number of large read operations
+ */
+ public int getLargeReadOps()
+ {
+ return visitAll(new StatisticsAggregator()
+ {
+ private int largeReadOps;
+
+ @Override
+ public void accept(StatisticsData data)
+ {
+ largeReadOps += data.largeReadOps;
+ }
+
+ public Integer aggregate()
+ {
+ return largeReadOps;
+ }
+ });
+ }
+
+ /**
+ * Get the number of file system write operations such as create, append
+ * rename etc.
+ *
+ * @return number of write operations
+ */
+ public int getWriteOps()
+ {
+ return visitAll(new StatisticsAggregator()
+ {
+ private int writeOps;
+
+ @Override
+ public void accept(StatisticsData data)
+ {
+ writeOps += data.writeOps;
+ }
+
+ public Integer aggregate()
+ {
+ return writeOps;
+ }
+ });
+ }
+
+ /**
+ * In the common network topology setup, distance value should be an even
+ * number such as 0, 2, 4, 6. To make it more general, we group distance
+ * by {1, 2}, {3, 4} and {5 and beyond} for accounting. So if the caller
+ * ask for bytes read for distance 2, the function will return the value
+ * for group {1, 2}.
+ *
+ * @param distance the network distance
+ * @return the total number of bytes read by the network distance
+ */
+ public long getBytesReadByDistance(int distance)
+ {
+ long bytesRead;
+ switch (distance) {
+ case 0:
+ bytesRead = getData().getBytesReadLocalHost();
+ break;
+ case 1:
+ case 2:
+ bytesRead = getData().getBytesReadDistanceOfOneOrTwo();
+ break;
+ case 3:
+ case 4:
+ bytesRead = getData().getBytesReadDistanceOfThreeOrFour();
+ break;
+ default:
+ bytesRead = getData().getBytesReadDistanceOfFiveOrLarger();
+ break;
+ }
+ return bytesRead;
+ }
+
+ /**
+ * Get total time taken in ms for bytes read from remote.
+ *
+ * @return time taken in ms for remote bytes read.
+ */
+ public long getRemoteReadTime()
+ {
+ return visitAll(new StatisticsAggregator()
+ {
+ private long remoteReadTimeMS;
+
+ @Override
+ public void accept(StatisticsData data)
+ {
+ remoteReadTimeMS += data.remoteReadTimeMS;
+ }
+
+ public Long aggregate()
+ {
+ return remoteReadTimeMS;
+ }
+ });
+ }
+
+ /**
+ * Get all statistics data.
+ * MR or other frameworks can use the method to get all statistics at once.
+ *
+ * @return the StatisticsData
+ */
+ public StatisticsData getData()
+ {
+ return visitAll(new StatisticsAggregator()
+ {
+ private StatisticsData all = new StatisticsData();
+
+ @Override
+ public void accept(StatisticsData data)
+ {
+ all.add(data);
+ }
+
+ public StatisticsData aggregate()
+ {
+ return all;
+ }
+ });
+ }
+
+ /**
+ * Get the total number of bytes read on erasure-coded files.
+ *
+ * @return the number of bytes
+ */
+ public long getBytesReadErasureCoded()
+ {
+ return visitAll(new StatisticsAggregator()
+ {
+ private long bytesReadErasureCoded;
+
+ @Override
+ public void accept(StatisticsData data)
+ {
+ bytesReadErasureCoded += data.bytesReadErasureCoded;
+ }
+
+ public Long aggregate()
+ {
+ return bytesReadErasureCoded;
+ }
+ });
+ }
+
+ @Override
+ public String toString()
+ {
+ return visitAll(new StatisticsAggregator()
+ {
+ private StatisticsData total = new StatisticsData();
+
+ @Override
+ public void accept(StatisticsData data)
+ {
+ total.add(data);
+ }
+
+ public String aggregate()
+ {
+ return total.toString();
+ }
+ });
+ }
+
+ /**
+ * Resets all statistics to 0.
+ *
+ * In order to reset, we add up all the thread-local statistics data, and
+ * set rootData to the negative of that.
+ *
+ * This may seem like a counterintuitive way to reset the statistics. Why
+ * can't we just zero out all the thread-local data? Well, thread-local
+ * data can only be modified by the thread that owns it. If we tried to
+ * modify the thread-local data from this thread, our modification might get
+ * interleaved with a read-modify-write operation done by the thread that
+ * owns the data. That would result in our update getting lost.
+ *
+ * The approach used here avoids this problem because it only ever reads
+ * (not writes) the thread-local data. Both reads and writes to rootData
+ * are done under the lock, so we're free to modify rootData from any thread
+ * that holds the lock.
+ */
+ public void reset()
+ {
+ visitAll(new StatisticsAggregator()
+ {
+ private StatisticsData total = new StatisticsData();
+
+ @Override
+ public void accept(StatisticsData data)
+ {
+ total.add(data);
+ }
+
+ public Void aggregate()
+ {
+ total.negate();
+ rootData.add(total);
+ return null;
+ }
+ });
+ }
+
+ /**
+ * Get the uri scheme associated with this statistics object.
+ *
+ * @return the schema associated with this set of statistics
+ */
+ public String getScheme()
+ {
+ return scheme;
+ }
+
+ @VisibleForTesting
+ synchronized int getAllThreadLocalDataSize()
+ {
+ return allData.size();
+ }
+ }
+
+ /**
+ * Get the Map of Statistics object indexed by URI Scheme.
+ *
+ * @return a Map having a key as URI scheme and value as Statistics object
+ * @deprecated use {@link #getGlobalStorageStatistics()}
+ */
+ @Deprecated
+ public static synchronized Map getStatistics()
+ {
+ Map result = new HashMap<>();
+ for (Statistics stat : statisticsTable.values()) {
+ result.put(stat.getScheme(), stat);
+ }
+ return result;
+ }
+
+ /**
+ * Return the FileSystem classes that have Statistics.
+ *
+ * @return statistics lists.
+ * @deprecated use {@link #getGlobalStorageStatistics()}
+ */
+ @Deprecated
+ public static synchronized List getAllStatistics()
+ {
+ return new ArrayList<>(statisticsTable.values());
+ }
+
+ /**
+ * Get the statistics for a particular file system.
+ *
+ * @param scheme scheme.
+ * @param cls the class to lookup
+ * @return a statistics object
+ * @deprecated use {@link #getGlobalStorageStatistics()}
+ */
+ @Deprecated
+ public static synchronized Statistics getStatistics(final String scheme,
+ Class extends FileSystem> cls)
+ {
+ checkArgument(scheme != null,
+ "No statistics is allowed for a file system with null scheme!");
+ Statistics result = statisticsTable.get(cls);
+ if (result == null) {
+ final Statistics newStats = new Statistics(scheme);
+ statisticsTable.put(cls, newStats);
+ result = newStats;
+ GlobalStorageStatistics.INSTANCE.put(scheme,
+ new StorageStatisticsProvider()
+ {
+ @Override
+ public StorageStatistics provide()
+ {
+ return new FileSystemStorageStatistics(scheme, newStats);
+ }
+ });
+ }
+ return result;
+ }
+
+ /**
+ * Reset all statistics for all file systems.
+ */
+ public static synchronized void clearStatistics()
+ {
+ GlobalStorageStatistics.INSTANCE.reset();
+ }
+
+ /**
+ * Print all statistics for all file systems to {@code System.out}
+ *
+ * @throws IOException If an I/O error occurred.
+ */
+ public static synchronized void printStatistics() throws IOException
+ {
+ for (Map.Entry, Statistics> pair :
+ statisticsTable.entrySet()) {
+ System.out.println(" FileSystem " + pair.getKey().getName() +
+ ": " + pair.getValue());
+ }
+ }
+
+ // Symlinks are temporarily disabled - see HADOOP-10020 and HADOOP-10052
+ private static boolean symlinksEnabled;
+
+ @VisibleForTesting
+ public static boolean areSymlinksEnabled()
+ {
+ return symlinksEnabled;
+ }
+
+ @VisibleForTesting
+ public static void enableSymlinks()
+ {
+ symlinksEnabled = true;
+ }
+
+ /**
+ * Get the StorageStatistics for this FileSystem object. These statistics are
+ * per-instance. They are not shared with any other FileSystem object.
+ *
+ * This is a default method which is intended to be overridden by
+ * subclasses. The default implementation returns an empty storage statistics
+ * object.
+ *
+ * @return The StorageStatistics for this FileSystem instance.
+ * Will never be null.
+ */
+ public StorageStatistics getStorageStatistics()
+ {
+ return new EmptyStorageStatistics(getUri().toString());
+ }
+
+ /**
+ * Get the global storage statistics.
+ *
+ * @return global storage statistics.
+ */
+ public static GlobalStorageStatistics getGlobalStorageStatistics()
+ {
+ return GlobalStorageStatistics.INSTANCE;
+ }
+
+ /**
+ * Create instance of the standard FSDataOutputStreamBuilder for the
+ * given filesystem and path.
+ *
+ * @param fileSystem owner
+ * @param path path to create
+ * @return a builder.
+ */
+ @InterfaceStability.Unstable
+ protected static FSDataOutputStreamBuilder createDataOutputStreamBuilder(
+ @Nonnull final FileSystem fileSystem,
+ @Nonnull final Path path)
+ {
+ return new FileSystemDataOutputStreamBuilder(fileSystem, path);
+ }
+ /**
+ * Standard implementation of the FSDataOutputStreamBuilder; invokes
+ * create/createNonRecursive or Append depending upon the options.
+ */
+
+ private static final class FileSystemDataOutputStreamBuilder
+ extends FSDataOutputStreamBuilder
+ {
+ /**
+ * Constructor.
+ *
+ * @param fileSystem owner
+ * @param p path to create
+ */
+ private FileSystemDataOutputStreamBuilder(FileSystem fileSystem, Path p)
+ {
+ super(fileSystem, p);
+ }
+
+ @Override
+ public FSDataOutputStream build() throws IOException
+ {
+ rejectUnknownMandatoryKeys(Collections.emptySet(),
+ " for " + getPath());
+ if (getFlags().contains(CreateFlag.CREATE) ||
+ getFlags().contains(CreateFlag.OVERWRITE)) {
+ if (isRecursive()) {
+ return getFS().create(getPath(), getPermission(), getFlags(),
+ getBufferSize(), getReplication(), getBlockSize(), getProgress(),
+ getChecksumOpt());
+ }
+ else {
+ return getFS().createNonRecursive(getPath(), getPermission(),
+ getFlags(), getBufferSize(), getReplication(), getBlockSize(),
+ getProgress());
+ }
+ }
+ else if (getFlags().contains(CreateFlag.APPEND)) {
+ return getFS().append(getPath(), getBufferSize(), getProgress());
+ }
+ throw new PathIOException(getPath().toString(),
+ "Must specify either create, overwrite or append");
+ }
+
+ @Override
+ public FileSystemDataOutputStreamBuilder getThisBuilder()
+ {
+ return this;
+ }
+ }
+
+ /**
+ * Create a new FSDataOutputStreamBuilder for the file with path.
+ * Files are overwritten by default.
+ *
+ * @param path file path
+ * @return a FSDataOutputStreamBuilder object to build the file
+ *
+ * HADOOP-14384. Temporarily reduce the visibility of method before the
+ * builder interface becomes stable.
+ */
+ public FSDataOutputStreamBuilder createFile(Path path)
+ {
+ return createDataOutputStreamBuilder(this, path)
+ .create().overwrite(true);
+ }
+
+ /**
+ * Create a Builder to append a file.
+ *
+ * @param path file path.
+ * @return a {@link FSDataOutputStreamBuilder} to build file append request.
+ */
+ public FSDataOutputStreamBuilder appendFile(Path path)
+ {
+ return createDataOutputStreamBuilder(this, path).append();
+ }
+
+ /**
+ * Open a file for reading through a builder API.
+ * Ultimately calls {@link #open(Path, int)} unless a subclass
+ * executes the open command differently.
+ *
+ * The semantics of this call are therefore the same as that of
+ * {@link #open(Path, int)} with one special point: it is in
+ * {@code FSDataInputStreamBuilder.build()} in which the open operation
+ * takes place -it is there where all preconditions to the operation
+ * are checked.
+ *
+ * @param path file path
+ * @return a FSDataInputStreamBuilder object to build the input stream
+ * @throws IOException if some early checks cause IO failures.
+ * @throws UnsupportedOperationException if support is checked early.
+ */
+ @InterfaceStability.Unstable
+ public FutureDataInputStreamBuilder openFile(Path path)
+ throws IOException, UnsupportedOperationException
+ {
+ return createDataInputStreamBuilder(this, path).getThisBuilder();
+ }
+
+ /**
+ * Open a file for reading through a builder API.
+ * Ultimately calls {@link #open(PathHandle, int)} unless a subclass
+ * executes the open command differently.
+ *
+ * If PathHandles are unsupported, this may fail in the
+ * {@code FSDataInputStreamBuilder.build()} command,
+ * rather than in this {@code openFile()} operation.
+ *
+ * @param pathHandle path handle.
+ * @return a FSDataInputStreamBuilder object to build the input stream
+ * @throws IOException if some early checks cause IO failures.
+ * @throws UnsupportedOperationException if support is checked early.
+ */
+ @InterfaceStability.Unstable
+ public FutureDataInputStreamBuilder openFile(PathHandle pathHandle)
+ throws IOException, UnsupportedOperationException
+ {
+ return createDataInputStreamBuilder(this, pathHandle)
+ .getThisBuilder();
+ }
+
+ /**
+ * Execute the actual open file operation.
+ *
+ * This is invoked from {@code FSDataInputStreamBuilder.build()}
+ * and from {@link DelegateToFileSystem} and is where
+ * the action of opening the file should begin.
+ *
+ * The base implementation performs a blocking
+ * call to {@link #open(Path, int)} in this call;
+ * the actual outcome is in the returned {@code CompletableFuture}.
+ * This avoids having to create some thread pool, while still
+ * setting up the expectation that the {@code get()} call
+ * is needed to evaluate the result.
+ *
+ * @param path path to the file
+ * @param parameters open file parameters from the builder.
+ * @return a future which will evaluate to the opened file.
+ * @throws IOException failure to resolve the link.
+ * @throws IllegalArgumentException unknown mandatory key
+ */
+ protected CompletableFuture openFileWithOptions(
+ final Path path,
+ final OpenFileParameters parameters) throws IOException
+ {
+ AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
+ parameters.getMandatoryKeys(),
+ Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS,
+ "for " + path);
+ return LambdaUtils.eval(
+ new CompletableFuture<>(), () ->
+ open(path, parameters.getBufferSize()));
+ }
+
+ /**
+ * Execute the actual open file operation.
+ * The base implementation performs a blocking
+ * call to {@link #open(Path, int)} in this call;
+ * the actual outcome is in the returned {@code CompletableFuture}.
+ * This avoids having to create some thread pool, while still
+ * setting up the expectation that the {@code get()} call
+ * is needed to evaluate the result.
+ *
+ * @param pathHandle path to the file
+ * @param parameters open file parameters from the builder.
+ * @return a future which will evaluate to the opened file.
+ * @throws IOException failure to resolve the link.
+ * @throws IllegalArgumentException unknown mandatory key
+ * @throws UnsupportedOperationException PathHandles are not supported.
+ * This may be deferred until the future is evaluated.
+ */
+ protected CompletableFuture openFileWithOptions(
+ final PathHandle pathHandle,
+ final OpenFileParameters parameters) throws IOException
+ {
+ AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
+ parameters.getMandatoryKeys(),
+ Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS, "");
+ CompletableFuture result = new CompletableFuture<>();
+ try {
+ result.complete(open(pathHandle, parameters.getBufferSize()));
+ }
+ catch (UnsupportedOperationException tx) {
+ // fail fast here
+ throw tx;
+ }
+ catch (Throwable tx) {
+ // fail lazily here to ensure callers expect all File IO operations to
+ // surface later
+ result.completeExceptionally(tx);
+ }
+ return result;
+ }
+
+ /**
+ * Helper method that throws an {@link UnsupportedOperationException} for the
+ * current {@link FileSystem} method being called.
+ */
+ private void methodNotSupported()
+ {
+ // The order of the stacktrace elements is (from top to bottom):
+ // - java.lang.Thread.getStackTrace
+ // - org.apache.hadoop.fs.FileSystem.methodNotSupported
+ // -
+ // therefore, to find out the current method name, we use the element at
+ // index 2.
+ String name = Thread.currentThread().getStackTrace()[2].getMethodName();
+ throw new UnsupportedOperationException(getClass().getCanonicalName() +
+ " does not support method " + name);
+ }
+
+ /**
+ * Create instance of the standard {@link FSDataInputStreamBuilder} for the
+ * given filesystem and path.
+ *
+ * @param fileSystem owner
+ * @param path path to read
+ * @return a builder.
+ */
+ @InterfaceAudience.LimitedPrivate("Filesystems")
+ @InterfaceStability.Unstable
+ protected static FSDataInputStreamBuilder createDataInputStreamBuilder(
+ @Nonnull final FileSystem fileSystem,
+ @Nonnull final Path path)
+ {
+ return new FSDataInputStreamBuilder(fileSystem, path);
+ }
+
+ /**
+ * Create instance of the standard {@link FSDataInputStreamBuilder} for the
+ * given filesystem and path handle.
+ *
+ * @param fileSystem owner
+ * @param pathHandle path handle of file to open.
+ * @return a builder.
+ */
+ @InterfaceAudience.LimitedPrivate("Filesystems")
+ @InterfaceStability.Unstable
+ protected static FSDataInputStreamBuilder createDataInputStreamBuilder(
+ @Nonnull final FileSystem fileSystem,
+ @Nonnull final PathHandle pathHandle)
+ {
+ return new FSDataInputStreamBuilder(fileSystem, pathHandle);
+ }
+
+ /**
+ * Builder returned for {@code #openFile(Path)}
+ * and {@code #openFile(PathHandle)}.
+ */
+ private static class FSDataInputStreamBuilder
+ extends FutureDataInputStreamBuilderImpl
+ implements FutureDataInputStreamBuilder
+ {
+ /**
+ * Path Constructor.
+ *
+ * @param fileSystem owner
+ * @param path path to open.
+ */
+ protected FSDataInputStreamBuilder(
+ @Nonnull final FileSystem fileSystem,
+ @Nonnull final Path path)
+ {
+ super(fileSystem, path);
+ }
+
+ /**
+ * Construct from a path handle.
+ *
+ * @param fileSystem owner
+ * @param pathHandle path handle of file to open.
+ */
+ protected FSDataInputStreamBuilder(
+ @Nonnull final FileSystem fileSystem,
+ @Nonnull final PathHandle pathHandle)
+ {
+ super(fileSystem, pathHandle);
+ }
+
+ /**
+ * Perform the open operation.
+ * Returns a future which, when get() or a chained completion
+ * operation is invoked, will supply the input stream of the file
+ * referenced by the path/path handle.
+ *
+ * @return a future to the input stream.
+ * @throws IOException early failure to open
+ * @throws UnsupportedOperationException if the specific operation
+ * is not supported.
+ * @throws IllegalArgumentException if the parameters are not valid.
+ */
+ @Override
+ public CompletableFuture build() throws IOException
+ {
+ Optional optionalPath = getOptionalPath();
+ OpenFileParameters parameters = new OpenFileParameters()
+ .withMandatoryKeys(getMandatoryKeys())
+ .withOptionalKeys(getOptionalKeys())
+ .withOptions(getOptions())
+ .withStatus(super.getStatus())
+ .withBufferSize(
+ getOptions().getInt(FS_OPTION_OPENFILE_BUFFER_SIZE, getBufferSize()));
+ if (optionalPath.isPresent()) {
+ return getFS().openFileWithOptions(optionalPath.get(),
+ parameters);
+ }
+ else {
+ return getFS().openFileWithOptions(getPathHandle(),
+ parameters);
+ }
+ }
+ }
+
+ /**
+ * Return path of the enclosing root for a given path.
+ * The enclosing root path is a common ancestor that should be used for temp and staging dirs
+ * as well as within encryption zones and other restricted directories.
+ *
+ * Call makeQualified on the param path to ensure its part of the correct filesystem.
+ *
+ * @param path file path to find the enclosing root path for
+ * @return a path to the enclosing root
+ * @throws IOException early checks like failure to resolve path cause IO failures
+ */
+ @InterfaceAudience.Public
+ @InterfaceStability.Unstable
+ public Path getEnclosingRoot(Path path) throws IOException
+ {
+ this.makeQualified(path);
+ return this.makeQualified(new Path("/"));
+ }
+
+ /**
+ * Create a multipart uploader.
+ *
+ * @param basePath file path under which all files are uploaded
+ * @return a MultipartUploaderBuilder object to build the uploader
+ * @throws IOException if some early checks cause IO failures.
+ * @throws UnsupportedOperationException if support is checked early.
+ */
+ @InterfaceStability.Unstable
+ public MultipartUploaderBuilder createMultipartUploader(Path basePath)
+ throws IOException
+ {
+ methodNotSupported();
+ return null;
+ }
+
+ /**
+ * Create a bulk delete operation.
+ * The default implementation returns an instance of {@link DefaultBulkDeleteOperation}.
+ *
+ * @param path base path for the operation.
+ * @return an instance of the bulk delete.
+ * @throws IllegalArgumentException any argument is invalid.
+ * @throws IOException if there is an IO problem.
+ */
+ @Override
+ public BulkDelete createBulkDelete(Path path)
+ throws IllegalArgumentException, IOException
+ {
+ return new DefaultBulkDeleteOperation(path, this);
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/hadoop/fs/PrestoFileSystemCache.java b/src/main/java/org/apache/hadoop/fs/PrestoFileSystemCache.java
index f046618..8d42658 100644
--- a/src/main/java/org/apache/hadoop/fs/PrestoFileSystemCache.java
+++ b/src/main/java/org/apache/hadoop/fs/PrestoFileSystemCache.java
@@ -73,6 +73,11 @@ FileSystem getUnique(URI uri, Configuration conf)
return getInternal(uri, conf, unique.incrementAndGet());
}
+ public PrestoFileSystemCache()
+ {
+ super(new Configuration(false));
+ }
+
private synchronized FileSystem getInternal(URI uri, Configuration conf, long unique)
throws IOException
{
diff --git a/src/main/java/org/apache/hadoop/util/LineReader.java b/src/main/java/org/apache/hadoop/util/LineReader.java
index fb494fc..e14b391 100644
--- a/src/main/java/org/apache/hadoop/util/LineReader.java
+++ b/src/main/java/org/apache/hadoop/util/LineReader.java
@@ -17,12 +17,17 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import org.apache.hadoop.fs.statistics.IOStatisticsSupport;
import org.apache.hadoop.io.Text;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
+
/**
* A class that provides a line reader from an input stream.
* Depending on the constructor used, lines will either be terminated by:
@@ -37,7 +42,7 @@
@InterfaceAudience.LimitedPrivate("MapReduce")
@InterfaceStability.Unstable
public class LineReader
- implements Closeable
+ implements Closeable , IOStatisticsSource
{
// Limitation for array size is VM specific. Current HotSpot VM limitation
// for array size is Integer.MAX_VALUE - 5 (2^31 - 1 - 5).
@@ -96,7 +101,7 @@ public LineReader(InputStream in, int bufferSize)
public LineReader(InputStream in, Configuration conf)
throws IOException
{
- this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
+ this(in, conf.getInt(IO_FILE_BUFFER_SIZE_KEY, DEFAULT_BUFFER_SIZE));
}
/**
@@ -143,14 +148,14 @@ public LineReader(InputStream in, int bufferSize,
* @param in input stream
* @param conf configuration
* @param recordDelimiterBytes The delimiter
- * @throws IOException
+ * @throws IOException raised on errors performing I/O.
*/
public LineReader(InputStream in, Configuration conf,
byte[] recordDelimiterBytes)
throws IOException
{
this.in = in;
- this.bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
+ this.bufferSize = conf.getInt(IO_FILE_BUFFER_SIZE_KEY, DEFAULT_BUFFER_SIZE);
this.buffer = new byte[this.bufferSize];
this.recordDelimiterBytes = recordDelimiterBytes;
}
@@ -158,7 +163,7 @@ public LineReader(InputStream in, Configuration conf,
/**
* Close the underlying stream.
*
- * @throws IOException
+ * @throws IOException raised on errors performing I/O.
*/
public void close()
throws IOException
@@ -166,6 +171,14 @@ public void close()
in.close();
}
+ /**
+ * Return any IOStatistics provided by the source.
+ * @return IO stats from the input stream.
+ */
+ @Override
+ public IOStatistics getIOStatistics() {
+ return IOStatisticsSupport.retrieveIOStatistics(in);
+ }
/**
* Read one line from the InputStream into the given Text.
*
diff --git a/src/main/resources/core-site.xml b/src/main/resources/core-site.xml
deleted file mode 100644
index d932dbd..0000000
--- a/src/main/resources/core-site.xml
+++ /dev/null
@@ -1,7 +0,0 @@
-
-
-
- dfs.client.use.legacy.blockreader
- true
-
-
diff --git a/src/main/resources/nativelib/Linux-aarch64/libhadoop.so b/src/main/resources/nativelib/Linux-aarch64/libhadoop.so
index 7f425eb..2c809d6 100755
Binary files a/src/main/resources/nativelib/Linux-aarch64/libhadoop.so and b/src/main/resources/nativelib/Linux-aarch64/libhadoop.so differ
diff --git a/src/main/resources/nativelib/Linux-aarch64/libsnappy.so b/src/main/resources/nativelib/Linux-aarch64/libsnappy.so
deleted file mode 100755
index aae4bca..0000000
Binary files a/src/main/resources/nativelib/Linux-aarch64/libsnappy.so and /dev/null differ
diff --git a/src/main/resources/nativelib/Linux-aarch64/libzstd.so b/src/main/resources/nativelib/Linux-aarch64/libzstd.so
new file mode 100755
index 0000000..01abab4
Binary files /dev/null and b/src/main/resources/nativelib/Linux-aarch64/libzstd.so differ
diff --git a/src/main/resources/nativelib/Linux-amd64/libhadoop.so b/src/main/resources/nativelib/Linux-amd64/libhadoop.so
index cc02d67..bf7d188 100755
Binary files a/src/main/resources/nativelib/Linux-amd64/libhadoop.so and b/src/main/resources/nativelib/Linux-amd64/libhadoop.so differ
diff --git a/src/main/resources/nativelib/Linux-amd64/libsnappy.so b/src/main/resources/nativelib/Linux-amd64/libsnappy.so
deleted file mode 100755
index b178a23..0000000
Binary files a/src/main/resources/nativelib/Linux-amd64/libsnappy.so and /dev/null differ
diff --git a/src/main/resources/nativelib/Linux-amd64/libzstd.so b/src/main/resources/nativelib/Linux-amd64/libzstd.so
new file mode 100755
index 0000000..33387df
Binary files /dev/null and b/src/main/resources/nativelib/Linux-amd64/libzstd.so differ
diff --git a/src/main/resources/nativelib/Linux-ppc64le/libhadoop.so b/src/main/resources/nativelib/Linux-ppc64le/libhadoop.so
index 9e39376..4b6fd3e 100755
Binary files a/src/main/resources/nativelib/Linux-ppc64le/libhadoop.so and b/src/main/resources/nativelib/Linux-ppc64le/libhadoop.so differ
diff --git a/src/main/resources/nativelib/Linux-ppc64le/libsnappy.so b/src/main/resources/nativelib/Linux-ppc64le/libsnappy.so
index f7cbf30..45049bd 100755
Binary files a/src/main/resources/nativelib/Linux-ppc64le/libsnappy.so and b/src/main/resources/nativelib/Linux-ppc64le/libsnappy.so differ
diff --git a/src/main/resources/nativelib/Linux-ppc64le/libzstd.so b/src/main/resources/nativelib/Linux-ppc64le/libzstd.so
new file mode 100644
index 0000000..ddb6a31
Binary files /dev/null and b/src/main/resources/nativelib/Linux-ppc64le/libzstd.so differ
diff --git a/src/main/resources/nativelib/Mac_OS_X-aarch64/libhadoop.dylib b/src/main/resources/nativelib/Mac_OS_X-aarch64/libhadoop.dylib
old mode 100755
new mode 100644
index 2030df6..2ad0d23
Binary files a/src/main/resources/nativelib/Mac_OS_X-aarch64/libhadoop.dylib and b/src/main/resources/nativelib/Mac_OS_X-aarch64/libhadoop.dylib differ
diff --git a/src/main/resources/nativelib/Mac_OS_X-aarch64/libsnappy.dylib b/src/main/resources/nativelib/Mac_OS_X-aarch64/libsnappy.dylib
deleted file mode 100755
index 5e89359..0000000
Binary files a/src/main/resources/nativelib/Mac_OS_X-aarch64/libsnappy.dylib and /dev/null differ
diff --git a/src/main/resources/nativelib/Mac_OS_X-aarch64/libzstd.dylib b/src/main/resources/nativelib/Mac_OS_X-aarch64/libzstd.dylib
new file mode 100644
index 0000000..16e8d0e
Binary files /dev/null and b/src/main/resources/nativelib/Mac_OS_X-aarch64/libzstd.dylib differ
diff --git a/src/main/resources/nativelib/Mac_OS_X-x86_64/libhadoop.dylib b/src/main/resources/nativelib/Mac_OS_X-x86_64/libhadoop.dylib
index 8077d2f..89bea98 100644
Binary files a/src/main/resources/nativelib/Mac_OS_X-x86_64/libhadoop.dylib and b/src/main/resources/nativelib/Mac_OS_X-x86_64/libhadoop.dylib differ
diff --git a/src/main/resources/nativelib/Mac_OS_X-x86_64/libsnappy.dylib b/src/main/resources/nativelib/Mac_OS_X-x86_64/libsnappy.dylib
deleted file mode 100644
index 8982bb8..0000000
Binary files a/src/main/resources/nativelib/Mac_OS_X-x86_64/libsnappy.dylib and /dev/null differ
diff --git a/src/main/resources/nativelib/Mac_OS_X-x86_64/libzstd.dylib b/src/main/resources/nativelib/Mac_OS_X-x86_64/libzstd.dylib
new file mode 100644
index 0000000..c626dd2
Binary files /dev/null and b/src/main/resources/nativelib/Mac_OS_X-x86_64/libzstd.dylib differ
diff --git a/src/test/java/com/facebook/presto/hadoop/TestHadoopNative.java b/src/test/java/com/facebook/presto/hadoop/TestHadoopNative.java
index 5f4e8a9..89b3d99 100644
--- a/src/test/java/com/facebook/presto/hadoop/TestHadoopNative.java
+++ b/src/test/java/com/facebook/presto/hadoop/TestHadoopNative.java
@@ -41,7 +41,7 @@ public void testNative()
HadoopNative.requireHadoopNative();
assertTrue(NativeCodeLoader.isNativeCodeLoaded());
- assertTrue(NativeCodeLoader.buildSupportsSnappy());
+ assertTrue(NativeCodeLoader.buildSupportsZstd());
assertTrue(ZlibFactory.isNativeZlibLoaded(new Configuration()));
assertTrue(Bzip2Factory.isNativeBzip2Loaded(new Configuration()));
}