Skip to content

Files

Latest commit

 

History

History
233 lines (168 loc) · 7.71 KB

HDFS-Usage.md

File metadata and controls

233 lines (168 loc) · 7.71 KB

HDFS Usage

About employing HDFS as a distributed file system.

This document shows how a DAPHNE user can execute DAPHNE scripts using HDFS as a file system, which is optimized for performance on big data through distributed computing. This document assumes that DAPHNE was build with the --hdfs options, if this is not the case please rebuild DAPHNE with the --hdfs option ./build.sh --hdfs

The DAPHNE build script uses HAWQ (libhdfs3).

Configuring DAPHNE for HDFS

In order for DAPHNE to utilize the HDFS file system certain command line arguments need to be passed (or included in the config file).

  • --enable-hdfs: A flag to enable hdfs.
  • --hdfs-address=<IP:PORT>: The IP and port HDFS listens to.
  • --hdfs-username=<username>: The username used to connect to HDFS.

Reading from HDFS

In order to read a file from the HDFS some pre processing must be done. Assuming the file is named FILE_NAME, a user needs to:

  1. Upload the file into HDFS. DAPHNE expects the file to be located inside a directory with some specific naming conventions. The path can by any path under HDFS, however the file must be named with the following convention:
/path/to/hdfs/file/FILE_NAME.FILE_TYPE/FILE_NAME.FILE_TYPE_segment_1

FILE_TYPE is either .csv or .dbdf (DAPHNE binary data format) followed by .hdfs, e.g. myfile.csv.hdfs.

The suffix _segment_1 is necessary, since we support multiple writers at once (see more below), the writers need to write into different files (different segments). In this case where the user pre-uploads the file, it needs to be in the same format, but just one segment.

Each segment must also have it's own .meta file within the HDFS. This is a JSON file containg information about the size of the segment as well as the type. For example myfile.csv.hdfs_segment_1.meta:

{
    "numCols": 10,
    "numRows": 10,
    "valueType": "f64"
}
  1. We also need to create a .meta file containing information about the file, within the local file system (from where DAPHNE is invoked). Similar to any other file which will be read by DAPHNE, we need to create a .meta file, which is in JSON format, containing information about where the file is, information about the rows/cols etc. The file should be named: FILE_NAME.FILE_TYPE.meta, e.g. myfile.csv.hdfs.meta. The meta file should contain all the regular information any DAPHNE meta file contains, but in addition it also contains information about whether this is an HDFS file and where it is located within HDFS:
{
    "hdfs": {
        "HDFSFilename": "/path/to/hdfs/file/FILE_NAME.FILE_TYPE",
        "isHDFS": true
    },
    "numCols": 10,
    "numRows": 10,
    "valueType": "f64"
}

Example:

Let's say we have a dataset called training_data.csv which we want to upload to HDFS and use it with DAPHNE.

  1. Upload file under path datasets and create the segment .meta file. HDFS should look like this:
$ hdfs dfs -ls /
/datasets/training_data.csv.hdfs/training_data.csv.hdfs_segment_1
/datasets/training_data.csv.hdfs/training_data.csv.hdfs_segment_1.meta

$ hdfs dfs -cat /datasets/training_data.csv.hdfs/training_data.csv.hdfs_segment_1.meta
{"numCols":10,"numRows":10,"valueType":"f64"}
  1. Create the local file .meta file:
$ cat ./training_data.csv.hdfs.meta
{"hdfs":{"HDFSFilename":"/datasets/training_data.csv.hdfs","isHDFS":true},"numCols":10,"numRows":10,"valueType":"f64"}
  1. DAPHNE script:
X = readMatrix("training_data.csv.hdfs");
print(X);
  1. Run DAPHNE
./bin/daphne --enable-hdfs --hdfs-ip=<IP:PORT> --hdfs-username=ubuntu code.daph

Writing to HDFS

In order to write to HDFS we just need to use the writeMatrix function like we would for any other file type and specify the hdfs suffix. For example:

  1. Code
X = rand(10, 10, 0.0, 1.0, 1.0, 1);
writeMatrix(X, "randomSet.csv.hdfs");
  1. Call daphne
./bin/daphne --enable-hdfs --hdfs-ip=<IP:PORT> --hdfs-username=ubuntu code.daph

This will create the following files inside HDFS:

$ hdfs dfs -ls /
/randomSet.csv.hdfs/randomSet.csv.hdfs_segment_1
/randomSet.csv.hdfs/randomSet.csv.hdfs_segment_1.meta

$ hdfs dfs -cat /randomSet.csv.hdfs/randomSet.csv.hdfs_segment_1.meta
{"numCols":10,"numRows":10,"valueType":"f64"}

And also the .meta file within the local file system named randomSet.csv.hdfs.meta:

{
    "hdfs": {
        "HDFSFilename": "/randomSet.csv.hdfs",
        "isHDFS": true
    },
    "numCols": 10,
    "numRows": 10,
    "valueType": "f64"
}

Limitations:

For now writing to a specific directory, through DAPHNE, within HDFS is not supported. DAPHNE will always try to write under the root HDFS directory /<name>.<type>.hdfs.

Distributed Runtime

Both read and write operations are supported by the distributed runtime.

Read

Exactly the same preprocessing must be done, creating one file inside the HDFS with the appropriate naming conventions. Users can then run DAPHNE using the distributed runtime and depending on the generated pipeline, DAPHNE's distributed workers will read their corresponding part of the data speeding up IO significantly. For example:

  1. DAPHNE script:
X = readMatrix("training_data.csv.hdfs");
print(X+X);
  1. Run DAPHNE
$ export DISTRIBUTED_WORKERS=worker-1:<PORT>:worker-2:<PORT>
$ ./bin/daphne --distributed --dist_backend=sync-gRPC --enable-hdfs --hdfs-ip=<IP:PORT> --hdfs-username=ubuntu code.daph

Write

Similar to read, nothing really changes, users just need to call DAPHNE using the distributed runtime flags. Notice that since we have multiple workers/writers, more than one segements are generated inside HDFS:

  1. Code
X = rand(10, 10, 0.0, 1.0, 1.0, 1);
writeMatrix(X, "randomSet.csv.hdfs");
  1. Call daphne
$ export DISTRIBUTED_WORKERS=worker-1:<PORT>:worker-2:<PORT>
$ ./bin/daphne --distributed --dist_backend=sync-gRPC --enable-hdfs --hdfs-ip=<IP:PORT> --hdfs-username=ubuntu code.daph

Assuming 2 distributed workers:

$ hdfs dfs -ls /
/randomSet.csv.hdfs/randomSet.csv.hdfs_segment_1        # First part of the matrix
/randomSet.csv.hdfs/randomSet.csv.hdfs_segment_1.meta
/randomSet.csv.hdfs/randomSet.csv.hdfs_segment_2        # Second part of the matrix
/randomSet.csv.hdfs/randomSet.csv.hdfs_segment_2.meta

$ hdfs dfs -cat /randomSet.csv.hdfs/randomSet.csv.hdfs_segment_1.meta
{"numCols":10,"numRows":5,"valueType":"f64"}
$ hdfs dfs -cat /randomSet.csv.hdfs/randomSet.csv.hdfs_segment_2.meta
{"numCols":10,"numRows":5,"valueType":"f64"}

And also the .meta file within the local file system named randomSet.csv.hdfs.meta.

Notes

It does not matter how many segments are generated or exist. DAPHNE is designed to read the segments according to the current state (distributed or not and how many distributed workers are being used).

For example if we use 4 distributed workers to write a matrix, DAPHNE will generate 4 different segments. DAPHNE can later read the same matrix either in local execution (no distributed runtime) or using a different number of workers, not depending on the amount of segments generated earlier.