RDD는 분산되어 존재하는 데이터 요소들의 모임이다. 스파크에서 모든 작업은 새로운 RDD를 만들거나, 존재하는 RDD를 변형하거나, 결과 계산을 위해서 RDD에서 연산을 호출하는 것 중의 하나로 표현된다. 내부적으로 스파크는 자동으로 RDD에 있는 데이터들을 클러스터에 분배하며 클러스터 위에서 수행하는 연산들을 병렬화한다.
-
분산되어 있는 변경 불가능한 객체 모음
-
각 RDD는 클러스터의 서로 다른 노드들에서 연산 가능하도록 여러 개의 파티션 (partition) 으로 나뉨
-
RDD는 사용자 정의 클래스를 포함해서 파이썬, 자바, 스칼라의 어떤 타입의 객체든 가질 수 있음
-
RDD는 외부 데이터세트를 로드하거나 드라이버 프로그램에서 객체 컬렉션을 분산시키는 두 가지 방법 중의 하나로 만들 수 있음
-
ex) SparkContext, textFile() 을 써서 텍스트 파일을 문자열 RDD로 로딩
lines = sc.textFile("README.md")
-
-
한 번 만들어진 RDD는 두 가지 타입의 연산을 지원
- transformation
- 존재하는 RDD에서 새로운 RDD를 만들어냄
- ex) filter()
- action
- RDD를 기초로 결과 값을 계산하며 그 값을 드라이버 프로그램에 되돌려주거나 외부 스토리지에 저장
- ex) first()
트랜스포메이션과 액션은 위와 같이 스파크가 RDD를 다루는 방식에서 서로 차이가 있다. 스파크는 RDD를 항상 lazy evaluation으로 처음 액션을 사용하는 시점에 처리한다. 즉,
sc.textFile()
시에 텍스트 파일의 모든 라인을 로드해서 저장해놓는 것이 아닌,first()
와 같은 액션을 사용하는 시점에 처음 일치하는 라인이 나올 때 까지만 파일을 읽게 되는 것이다. - transformation
-
스파크의 RDD는 기본적으로 액션이 실행될 때 마다 매번 새로운 연산을 한다
- 여러 액션에서 RDD를 재사용하고 싶다면, 스파크에게
RDD.persist()
를 통해 계속 결과를 유지하도록 요청한다 - RDD를 여러 메모리, 디스크 등에 데이터를 보존해주도록 스파크에 요청할 수도 있다
- 첫 연산이 이루어진 후, 스파크는 RDD의 내용을 클러스터의 여러 머신들에 나눠서 메모리에 저장하게 되며, 이후의 액션들에서 재사용하게 된다
- 여러 액션에서 RDD를 재사용하고 싶다면, 스파크에게
-
RDD를 재사용하지 않는 것은 스파크가 일회성 데이터를 가져와 결과만 계산하고 데이터를 굳이 저장할 필요가 없는 경우에 굉장히 유용하다
- 외부 데이터에서 입력 RDD를 만든다
filter()
와 같은 트랜스포메이션을 사용하여 새로운 RDD를 정의한다- 재사용을 위한 중간 단계의 RDD를 보존하기 위해
persist()
로 스파크에 요청한다 - 스파크가 최적화 한 병렬 연산 수행을 위해
count()
나first()
같은 액션을 시작한다
스파크에서는 RDD를 만드는 두 가지 방법을 아래와 같이 제공한다.
- 외부 데이터세트 로드
- 직접 만든 드라이버 프로그램에서 데이터 집합을 병렬화하기
- RDD를 만드는 가장 간단한 방법
- 프로그램에 있는 데이터세트를 가져다가
SparkContext
의parallelize()
메소드에 넘겨주기
- 프로그램에 있는 데이터세트를 가져다가
- 셸에서 자신만의 RDD 를 만들 수 있고, 연산을 수행할 수 있음
- 그러나, 하나의 머신 메모리에 모든 데이터세트를 담고 있으므로 프로토타이핑이나 테스팅 목적이 아니면 널리 쓰이지 않음
val lines = sc.parallelize(List("pandas", "i like pandas"));
JavaRDD<String> lines = sc.parallelize(Arrays.asList("pandas", "i like pandas"))
val lines = sc.textFile("/path/to/README.md");
JavaRDD<String> lines = sc.textFile("/path/to/README.md");
RDD는 두 가지 타입의 연산 작업을 지원한다.
- 새로운 RDD를 만들어 내는 연산
- RDD를 리턴
- ex) map(), filter()
- 드라이버 프로그램에 결과를 되돌려주거나 스토리지에 결과를 써 넣는 연산
- RDD 이외의 다른 데이터 타입을 리턴
- ex) count(), first()
- 새로운 RDD를 만들어 리턴해주는 RDD의 연산 방식
- lazy evaluation에 따라 트랜스포메이션된 RDD는 실제로 액션을 사용하는 늦은 시점에 계산됨
- element wise (데이터 요소 위주), 즉, 한 번에 하나의 요소에서만 작업이 이루어짐
- 모든 트랜스포메이션이 그런 것은 아님
val inputRDD = sc.textFile("log.txt")
val errorsRDD = inputRDD.filter(line => line.contains("error"))
JavaRDD<String> inputRDD = sc.textFile("log.txt");
JavaRDD<String> errorsRDD = inputRDD.filter(
new Function<String, Boolean>() {
public Boolean call(String x) {
return x.contains("error");
}
}
);
- filter() 연산은 이미 존재하는 inputRDD를 변경하지는 않는다 (RDD는 변경 불가능하다)
- RDD 자체를 변경하는 것이 아니라, 완전히 새로운 RDD에 대한 포인터를 리턴한다
- 따라서, inputRDD는 프로그램 내에서 재사용 가능하다
val errorsRDD = inputRDD.filter(line => line.contains("error"))
val warningRDD = inputRDD.filter(line => line.contains("warning"))
val badLinesRDD = errorsRDD.union(warningsRDD)
-
union() 은 두 개의 RDD로 작업한다
-
트랜스포메이션은 입력할 수 있는 RDD 개수에 대한 제한이 없다
각 트랜스포메이션을 적용해서 새로운 RDD를 얻어내면, 스파크는 각 RDD에 대해
lineage graph
라 불리는 관계 그래프를 갖고 있게 된다. 스파크는 이 정보를 활용해서 필요 시, 각 RDD를 재연산하거나 저장된 RDD가 유실될 경우 복구를 하는 등의 경우에 활용한다
- 드라이버 프로그램에 최종 결과 값을 돌려주거나 외부 저장소에 값을 기록하는 연산 작업
- 실제로 결과 값을 내야 하므로 트랜스포메이션이 계산을 수행하도록 강제함
println("Input had " + badLinesRdd.count() + " concerning lines")
println("Here are 10 examples: ")
badLinesRDD.take(10).foreach(println)
System.out.println("Input had " + badLinesRDD.count() + "concerning lines");
System.out.println("Here are 10 examples: ");
for (String line : badLinesRDD.take(10)) {
System.out.println(line);
}
-
take()
- RDD의 데이터 일부를 가져오기 위함
-
collect()
- 전체 RDD 데이터를 가져옴
- RDD를 filter()에 의해 작은 크기의 데이터세트의 RDD로 만든 후 분산이 아닌 로컬에서 데이터를 처리하고 싶을 때 유용함
- 이 함수를 사용할 때는, 전체 데이터세트가 사용하는 단일 컴퓨터의 메모리에 올라올 수 있을 정도의 크기여야 함
- 데이터세트가 너무 크면 collect()를 사용할 수 없다
- RDD가 드라이버 프로그램에 의해 collect()가 불가능한 경우
- 대부분 데이터가 너무 크기에 collect()를 사용할 수 없다
- 위의 경우, HDFS나 AWS S3 같은 분산 파일 시스템에 데이터를 써서 해결함
- RDD의 내용들은 saveAsTextFile() 또는 saveAsSequenceFile() 등의 파일 포맷용 액션을 써서 저장 가능하기 때문
-
새로운 액션을 호출할 때마다 RDD가 처음부터 (from scratch) 계산됨
- 이를 피하려면 중간 결과를 영속화 (persist) 를 이용
RDD의 트랜스포메이션은 lazy 방식으로 처리가 된다. 이 의미는 스파크가 액션을 만나기 전까지는 실제로 트랜스포메이션을 처리하지 않는다는 말이다.
- Lazy Evaluation 이란 RDD에 대한 트랜스포메이션을 호출할 때 그 연산이 즉시 수행되는 것이 아니고
- 대신 내부적으로 스파크는 metadata에 이러한 트랜스포메이션 연산이 호출되었다는 것만 기록을 해둔다
- RDD가 실제로는 어떤 특정 데이터를 가지고 있는 것이 아닌, 트랜스포메이션들이 생성한 데이터를 어떻게 계산할 지에 대한 명령어들을 갖고 있다고 생각하면 됨
- RDD에 데이터를 로드하는 것도 트랜스포메이션과 마찬가지로 lazy evaluation
- sc.textFile() 호출 시, 실제로 필요한 시점이 되기 전까지는 로딩되지 않음
- 왜 lazy evaluation 을 이용?
- 데이터 전달 횟수를 줄이기 위해 이용
- 하둡 맵리듀스 같은 시스템에서는 맵리듀스의 데이터 전달 횟수를 줄이기 위해 어떤 식으로 연산을 그룹화할지 고민해야 하는 것이 포인트인데, 맵리듀스 에서는 연산 개수가 많다는 것은 즉 네트워크로 데이터를 전송하는 단계가 많아짐을 의미하기 때문이다
대부분의 트랜스포메이션과 액션 일부는 스파크가 실제로 연산할 때 쓰일 함수들을 전달해야 하는 구조를 가진다.
- 다른 함수형 API 처럼 인라인으로 정의된 함수나 메소드에 대한 참조, 정적 함수를 전달할 수 있음
- 주의사항
- 전달하는 함수나 참조하는 데이터들이 직렬화 (serialize) 가능해야 함
- 객체의 메소드나 필드를 전달하면 전체 객체에 대한 참조 또한 포함됨
class SearchFunctions(val query: String) {
def isMatch(s: String): Boolean = {
s.contains(query)
}
def getMatchesFunctionReference(rdd: RDD[String]): RDD[Boolean] = {
// 문제: "isMatch"는 "this.isMatch" 이므로 this의 모든 것이 전달된다
rdd.map(isMatch)
}
def getMatchesFieldReference(rdd: RDD[String]): RDD[Array[String]] = {
// 문제: "query"는 "this.query" 이므로 this의 모든 것이 전달된다
rdd.map(x => x.split(query))
}
def getMatchesNoReference(rdd: RDD[String]): RDD[Array[String]] = {
// 언존허미 필요한 필드만 추출하여 지역 변수에 저장해 할당한다
val query_ = this.query
rdd.map(x => x.split(query_))
}
}
- 스칼라에서
NotSerializableException
이 발생하는 것은- 직렬화 불가능한 클래스의 메소드나 필드를 참조하는 문제일 가능성이 높다
- 최상위 객체의 멤버인 지역 변수나 함수 내에서 전달하는 것은 항상 안전하다
- 자바에서 함수들은
org.apache.spark.api.java.function
패키지의 스파크 함수 인터페이스들을 구현한 객체가 된다 - 함수의 반환 타입에 따라 여러 개의 인터페이스들이 아래와 같이 있다
함수 이름 | 구현할 메소드 | 사용법 |
---|---|---|
Function<T, R> | R call(T) | 입력 하나를 받아 출력 하나를 되돌려줌 map() 이나 filter() 같은 연산에 씀 |
Function2<T1, T2, R> | R call(T1, T2) | 입력 두 개를 받아 하나의 출력을 되돌려 줌 aggregate()나 fold() 같은 연산에 씀 |
FlatMapFunction<T, R> | Iterable< R > call(T) | 하나의 입력을 받아 0개 이상 여러 개의 출력을 되돌려줌 flatMap() 같은 연산에 씀 |
함수 클래스를 만들기 위해서는 익명 내부 클래스를 인라인으로 만들거나, 이름 있는 클래스를 따로 만들 수 있다.
RDD<String> errors = lines.filter(new Function<String, Boolean>() {
public Boolean call(String x) {
return x.contains("error");
}
});
class ContainsError implments Function<String, Boolean>() {
public Boolean call(String x) {
return x.contains("error");
}
}
RDD<String> errors = lines.filter(new ContainsError());
- 대규모 프로그램을 작성하는 경우 최상위 레벨의 이름 있는 함수 클래스를 쓰는 것이 코드 가독성이 높음
class ContainsError implements Function<String, Boolean>() {
private String query;
public Contains(String query) {
this.query = query;
}
public Boolean call(String x) {
return x.contains(query);
}
}
RDD<String> errors = lines.filter(new Contains("error"));
RDD<String> errors = lines.filter(s -> s.contains("error"));
스파크에서 가장 흔하게 쓰이는 트랜스포메이션과 액션들에 대해 알아보자.
특별한 데이터 타입을 취급하는 RDD를 위한 추가적인 연산들도 존재한다.
- 통계 함수들이나 key / value pair를 다루는 RDD에서 key를 기준으로 데이터를 집계하는 key / value 연산 같은 것들이 있다
- 데이터에 상관없이 모든 RDD에 대해 적용할 수 있는 트랜스포메이션과 액션들에 대해 먼저 다뤄보자
- map()
- filter()
- 함수를 받아 RDD의 각 데이터에 적용하고 결과 RDD에 각 데이터의 새 결과 값을 담는다
- 반환 타입이 입력 타입과 같지 않아도 됨
- ex) RDD 문자열을 갖고 있고, map()이 문자열을 Double 타입으로 리턴한다면,
- 입력 RDD : RDD[String]
- 리턴 RDD : RDD[Double]
- ex) RDD 문자열을 갖고 있고, map()이 문자열을 Double 타입으로 리턴한다면,
val input = sc.parallelize(List(1, 2, 3, 4))
val result = input.map(x => x * x)
println(result.collect().mkString(","))
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
JavaRDD<Integer> result = rdd.map(new Function<Integer, Integer>() {
public Integer call(Integer x) {
return x*x;
}
});
System.out.println(StringUtils.join(result.collect(), ","));
- 함수를 받아 filter() 함수를 통과한 데이터만 RDD에 담아 리턴한다
- 각 입력 데이터에 대해 여러 개의 아웃풋 데이터를 생성해야 할 때 이용
- map()과 같이 flatMap()에 전달한 함수는 입력 RDD에서 각 데이터마다 호출됨
- 함수에서 단일 값을 리턴하는 대신, 반복자 (iterator)를 리턴해야 함
- 반복자가 포함된 RDD를 리턴받는 것은 아니고, 반복자가 생성한 데이터들이 담긴 RDD를 받게 됨
- 함수에서 단일 값을 리턴하는 대신, 반복자 (iterator)를 리턴해야 함
val lines = sc.parallelize(List("hello world", "hi"))
val words = lines.flatMap(line => line.split(" "))
words.first() // "hello" 를 반환
JavaRDD<String> lines = sc.parallelize(Arrays.asList("hello word", "hi"));
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String line) {
return Arrays.asList(line.split(" "));
}
});
words.first(); // "hello" 를 반환
- flatMap()은 반환 받은 iterator들을 펼쳐놓는다
- 여러 개의 리스트로 구성된 RDD 대신, 그 각각의 리스트 안에 있는 원소로 구성된 RDD를 반환
RDD는 합집합, 교집합 같은 다양한 수학적 집합 연산을 지원한다. 심지어 RDD가 집합의 형태가 아닌 경우에도 지원한다. 4가지 집합 연산은 연산에 포함된 RDD가 서로 같은 타입이어야 한다.
보통 RDD에서 가장 빈번하게 요구되는 집합의 특성은 중복 데이터가 자주 생기므로 uniqueness가 가장 필요하다. 만약, 중복이 없는 데이터세트를 원한다면, RDD.distinct() 트랜스포메이션을 써서 오직 단일 데이터 요소만 포함한 새로운 RDD를 얻을 수 있다. 그러나, distinct() 는 단일 아이템인지를 비교하기 위해 네트워크로 데이터를 전송해서 비교해야 하므로 연산의 cost가 매우 든다. 이런 shuffling 작업과 이를 피하는 방법은 나중에 알아보자.
- 양쪽의 데이터를 합해서 되돌려 줌 (합집합)
- 다양한 경로에서 받은 로그들을 합치는 데 사용
- 원본 데이터들이 중복 되더라도 중복을 유지함
- 양쪽 RDD에 동시에 존재하는 요소만 되돌려 줌 (교집합)
- 동작하면서 모든 중복을 제거함
- 단일 RDD 안에 원래 존재하던 중복은 포함
- 중복을 찾기 위해 distinct() 처럼 셔플링이 수반되므로, union() 보다 성능이 훨씬 떨어짐
- 첫 번째 RDD의 항목 중 두 번째 RDD에 있는 항목을 제외한 항목들을 가진 RDD를 되돌려줌 (차집합)
- 셔플링을 수반되므로 성능이 떨어짐
- 첫 번째 RDD에 있는 데이터 a와 두 번째 RDD에 있는 데이터 b에 대해 모든 가능한 쌍 (a, b) 를 리턴
select * from a,b
- 모든 사용자들에 대해 가능한 쌍들에 대한 유사성을 파악하고 싶은 경우에 이용
- 동일 RDD에 대한 카티시안 곱도 가능
- 사용자 유사성을 위한 작업에 이용 가능
- cost가 매우 크다
함수 이름 | 용도 | 예시 | 결과 |
---|---|---|---|
map() | RDD의 각 요소에 함수를 적용하고 결과 RDD를 되돌려 줌 | rdd.map(x => x + 1) | {2, 3, 4, 4} |
flatMap() | RDD의 각 요소에 함수를 적용하고 반환된 반복자의 내용들로 이루어진 RDD를 되돌려 줌 종종 단어 분해를 위해 쓰임 |
rdd.flatMap(x => x.to(3)) | {1, 2, 3, 2, 3, 3, 3} |
filter() | filter()로 전달된 함수의 조건을 통과한 값으로만 이루어진 RDD를 되돌려 줌 | rdd.filter(x => x != 1) | {2, 3, 3} |
distinct() | 중복 제거 | rdd.distinct() | {1, 2, 3} |
sample(withReplacement, fraction, [seed]) | 복원 추출(withReplacement=true)이나 비복원 추출로 RDD에서 표본을 뽑아낸다 | rdd.sample(false, 0.5) | 생략 |
함수 이름 | 용도 | 예시 | 결과 |
---|---|---|---|
union() | 두 RDD에 있는 데이터들을 합한 RDD를 생성한다 | rdd.union(other) | {1, 2, 3, 3, 4, 5} |
intersection() | 양쪽 RDD에 모두 있는 데이터들만을 가진 RDD를 반환한다 | rdd.intersection(other) | {3} |
subtract() | 한 RDD가 가진 데이터를 다른쪽에서 삭제한다 ex) 교육용으로 썼던 데이터 삭제 |
rdd.subtract(other) | {1, 2} |
cartesian() | 두 RDD의 카티시안 곱 | rdd.cartesian(other) | {(1,3), (1,4), (1,5), ... , (3,5)} |
- 인자로 두 개의 데이터를 합쳐 같은 타입 데이터 하나를 반환하는 함수를 받음
- RDD의 총 합을 구하거나, 개수를 세거나 그 외 다른 타입의 집계 연산을 수행할 수 있음
- 결과 타입이 RDD 내에서 연산하는 데이터 요소들의 타입과 동일해야 함
val sum = rdd.reduce((x, y) => x + y)
Integer sum = rdd.reduce(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer x, Integer y) {
return x + y;
}
});
-
reduce()에 전달되는 것과 동일한 형태의 함수 (두 개의 데이터를 합쳐 같은 타입 데이터 하나를 반환하는 함수)를 인자로 받음
-
추가적으로, 각 파티션의 초기 호출에 쓰이는 "zero value" 를 인자로 받음
- 이 값은 수행하는 연산에 대한 기본값이어야 함
- 즉, 해당 값을 여러 번 적용해도 값이 바뀌지 않아야 함
- ex) 더하기를 위한 zero value 는 0, 곱하기를 위한 zero value는 1이어야 함, 리스트 연결 연산이라면 zero value는 빈 리스트
-
결과 타입이 RDD 내에서 연산하는 데이터 요소들의 타입과 동일해야 함
- ex) 평균을 계산하는 경우, 합계와 각 요소의 개수를 쌍으로 관리하며 추적해야 하는 경우,
map()
을 써서 각 요소를 원하는 형태인 쌍이 되도록(element, 1)
식으로 바꾸면reduce()
가 이 쌍들을 기반으로 동작할 수 있음
aggregate()
함수를 쓰면 RDD 에서 동일한 타입을 되돌려 주어야 한다는 제한에서 벗어날 수 있는데,
- fold() 함수 처럼 리턴 받는 타입에 맞는 zero value 가 필요
- RDD의 값들을 누적 값에 연계해주는 함수가 필요하며, 각 노드에서 자체적으로 값들을 합칠 수 있도록 두 개의 누적값을 합쳐주는 두 번째 함수가 필요
val result = input.aggregate((0,0))(
(acc, value) => (acc._1 + value, acc._2 + 1),
(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
val avg = result._1 / result._2.toDouble
class AvgCount implements Serializable {
public AvgCount(int total, int num) {
this.total = total;
this.num = num;
}
public int total;
public int num;
public double avg() {
return total / (double) num;
}
}
Function2<AvgCount, Integer, AvgCount> addAndCount =
new Function2<AvgCount, Integer, AvgCount> () {
public AvgCount call(AvgCount a, Integer x) {
a.total += x;
a.num += 1;
return a;
}
};
Function2<AvgCount, AvgCount, AvgCount> combine =
new Function2<AvgCount, AvgCount, AvgCount>() {
public AvgCount call(AvgCount a, AvgCount b) {
a.total += b.total;
a.num += b.num;
return a;
}
};
AvgCount initial = new AvgCount(0, 0);
AvgCount result = rdd.aggregate(initial, addAndCount, combine);
System.out.println(result.avg());
- RDD의 몇 몇 액션은 드라이버 프로그램에 일반적인 컬렉션 타입 형태나 값을 되돌려 줌
-
드라이버 프로그램에 값을 되돌려 주는 가장 간단하고 일반적으로 많이 사용하는 연산
-
데이터가 모두 드라이버 프로그램으로 copy 되므로 모든 데이터가 단일 컴퓨터에 메모리에 올라올 수 있어야 한다는 제약이 있음
-
테스트나 디버깅에는 유용하지만, 많은 양의 데이터를 다루기에는 병목 현상을 일으킬 수 있어서 적합하지 않음
- RDD에서 n 개의 데이터값을 되돌려 주는데 가능한 한 접근하는 파티션 개수를 최소화하도록 동작하므로 특정 파티션의 값들만 되돌려 줄 수 있음
- 기대하는 순서대로 값을 되돌려주지 않음
- 테스트나 디버깅에는 유용하지만, 많은 양의 데이터를 다루기에는 병목 현상을 일으킬 수 있어서 적합하지 않음
- 데이터에 특정 순서가 정의되어 있는 경우, RDD에서 상위의 값들만 뽑아 올 수 있음
- default는 내림차순, 원하는 순서를 적용하려면 비교 함수를 넣어주면 된다
함수 이름 | 목적 | 예시 | 결과 |
---|---|---|---|
collect() | RDD의 모든 데이터 요소 리턴 | rdd.collect() | {1, 2, 3, 3} |
count() | RDD의 요소 개수 리턴 | rdd.count() | 4 |
countByValue() | RDD에 있는 각 값의 개수 리턴 | rdd.countByValue() | {(1, 1), (2, 1), (3, 2)} |
take(num) | RDD의 값들 중 num 개 리턴 | rdd.take(2) | {1, 2} |
top(num) | RDD의 값들 중 상위 num개 리턴 | rdd.top(2) | {3, 3} |
takeOrdered(num)(ordering) | 제공된 ordering 기준으로 num개 값 리턴 | rdd.takeOrdered(2)(myOrdering) | {3, 3} |
takeSample(withReplacement, num, [seed]) | 무작위 값들 리턴 | rdd.takeSample(false, 1) | 생략 |
reduce(func) | RDD의 값들을 병렬로 병합 연산 | rdd.reduce((x, y) => x + y) | 9 |
fold(zero)(func) | reduce()와 동일하나 zeroValue를 넣어줌 | rdd.fold(0)((x, y) => x + y) | 9 |
함수 이름 | 목적 | 예시 | 결과 |
---|---|---|---|
aggregate(zeroValue)(seqOp, combOp) | reduce()와 유사하나 다른 타입을 리턴함 | rdd.aggregate((0, 0)) ((x, y) => (x. _ 1 + y, x. _ 2 + 1), (x, y) => (x._1 + y. _1, x. _ 2 + y. _ 2)) | (9, 4) |
foreach(func ) | RDD의 각 값에 func를 적용 | rdd.foreach(func) | 없음 |
- 어떤 함수들은 특정한 타입의 RDD에서만 쓸 수 있음
- ex) mean(), variance(), join() ...
- 스칼라와 자바에서 이런 메소드들은 기본 RDD 클래스에는 정의되어 있지 않으므로, 이런 기능들을 사용하려면 제대로 특정 클래스를 받아 쓰고 있는지 확인해야 함
- 스칼라에서 특정 함수를 갖고 있는 RDD로 변환하는 것은 묵시적 변환에 의해 자동으로 동작함
- 묵시적 변환을 위해
import org.apache.spark.SparkContext._
라인이 필요 - 묵시적 변환은
mean()
이나variance()
같은 것들을 쓸 수 있도록 RDD를 DoubleRDDFunctions (수치 데이터를 가진 RDD용) 나 PairRDDFunctions (키/ 값 페어를 위한 것들) 같은 다양한 포장 (wrapper) 클래스로 변환한다.
스파크 RDD는 여유로운 방식으로 수행되지만, 때때로는 동일한 RDD를 여러 번 사용하고 싶을 때가 생긴다. 이를 별다른 조치 없이 시도한다면 스파크는 RDD와 RDD 에서 호출하는 액션들에 대한 모든 의존성을 재연산하게 된다. 이는 매우 무거운 작업이 될 수 있다.
val result = input.map(x => x*x)
println(result.count())
println(result.collect().mkString(","))
- RDD를 여러 번 반복 연산하는 것을 피하려면 스파크에 데이터 영속화 (persist) 요청을 할 수 있다.
- RDD persist 요청을 하면, RDD를 계산한 노드들은 그 파티션을 저장하고 있게 되며,
- 영속화된 데이터를 갖고 있는 노드에 장애가 생기면 스파크는 필요 시에 유실된 데이터 파티션을 재연산한다.
- 스칼라와 자바에서는 기본적으로
persist()
가 데이터를 JVM heap에 직렬화되지 않은 객체 형태로 저장함
레벨 | 공간 사용 | CPU 사용 시간 | 메모리에 저장 | 디스크에 저장 | 비고 |
---|---|---|---|---|---|
MEMORY_ONLY | 높음 | 낮음예 | 예 | 아니오 | |
MEMORY_ONLY_SER | 낮음 | 높음 | 예 | 아니오 | |
MEMORY_AND_DISK | 높음 | 중간 | 일부 | 일부 | 메모리에 넣기에 데이터가 너무 많으면 디스크에 나눠 저장. |
MEMORY_AND_DISK_SER | 낮음 | 높음 | 일부 | 일부 | 메모리에 넣기에 데이터가 너무 많으면 디스크에 나너ㅜ 저장. 메모리에 직렬화된 형태로 저장 |
DISK_ONLY | 낮음 | 높음 | 아니오 | 예 |
import org.apache.spark.storage.StorageLevel
val result = input.map(x => x * x)
result.persist(StorageLevel.DISK_ONLY)
println(result.count())
println(result.collect().mkString(","))
-
처음 액션을 수행하기 전에 persist() 를 호출하였다
-
persist() 호출은 연산을 강제로 수행하지는 않는다
-
만약, 메모리에 많은 데이터를 올리려고 시도하면, 스파크는 LRU (Least Recently Used) 캐시 정책에 따라 오래된 파티션들을 자동으로 버린다
- 홀든 카로, 앤디 콘빈스키, 패트릭 웬델, 마테이 자하리아, 『러닝 스파크』, 제이펍(2015), p.29~ p.57