Spark 2.0 이상에서 단위 테스트를 작성하는 방법은 무엇입니까?
SparkSession
JUnit 테스트 프레임 워크 로 테스트 할 합리적인 방법을 찾으려고 노력하고 있습니다. 에 대한 좋은 예제가있는 것 같지만 SparkContext
해당 예제 SparkSession
가 spark-testing-base 내부적으로 여러 곳에서 사용되지만 해당 예제가에서 작동하는 방법을 알아낼 수 없었 습니다 . 스파크 테스트 기반을 사용하지 않는 솔루션을 시도하는 것이 실제로 올바른 방법이 아니라면 기쁩니다.
간단한 테스트 케이스 ( 전체 MWE 프로젝트 와 함께 build.sbt
) :
import com.holdenkarau.spark.testing.DataFrameSuiteBase
import org.junit.Test
import org.scalatest.FunSuite
import org.apache.spark.sql.SparkSession
class SessionTest extends FunSuite with DataFrameSuiteBase {
implicit val sparkImpl: SparkSession = spark
@Test
def simpleLookupTest {
val homeDir = System.getProperty("user.home")
val training = spark.read.format("libsvm")
.load(s"$homeDir\\Documents\\GitHub\\sample_linear_regression_data.txt")
println("completed simple lookup test")
}
}
JUnit으로 이것을 실행 한 결과는로드 라인에서 NPE입니다.
java.lang.NullPointerException
at SessionTest.simpleLookupTest(SessionTest.scala:16)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
로드되는 파일이 존재하는지 여부는 중요하지 않습니다. 올바르게 구성된 SparkSession에서 더 현명한 오류가 발생 합니다.
이 뛰어난 질문을 보내 주셔서 감사합니다. 어떤 이유로 Spark에 관해서는 모두가 분석에 너무 몰두하여 지난 15 년 동안 등장한 훌륭한 소프트웨어 엔지니어링 관행을 잊어 버립니다. 이것이 우리가 과정에서 테스트 및 지속적 통합 (DevOps와 같은 다른 것들 중에서)에 대해 논의하는 이유입니다.
용어에 대한 간략한 설명
진정한 단위 테스트는 테스트의 모든 구성 요소를 완벽하게 제어 할 수 있습니다 의미합니다. 데이터베이스, REST 호출, 파일 시스템 또는 시스템 시계와 상호 작용할 수 없습니다. Gerard Mezaros가 xUnit 테스트 패턴 에 넣은 것처럼 모든 것이 "두 배"(예 : mocked, stubbed 등)되어야합니다 . 이것이 의미론처럼 보이지만 정말 중요합니다. 이를 이해하지 못하는 것이 지속적인 통합에서 간헐적 인 테스트 실패를 보는 주요 이유 중 하나입니다.
우리는 여전히 단위 테스트를 할 수 있습니다
따라서 이러한 이해를 고려할 때 단위 테스트 RDD
는 불가능합니다. 그러나 분석을 개발할 때 여전히 단위 테스트를위한 장소가 있습니다.
간단한 작업을 고려하십시오.
rdd.map(foo).map(bar)
이곳까지 foo
와 bar
간단한 함수입니다. 그것들은 정상적인 방법으로 단위 테스트를 할 수 있으며, 가능한 한 많은 코너 케이스와 함께 있어야합니다. 결국, 테스트 픽스처이든 아니면 RDD
?
Spark Shell을 잊지 마세요
이것은 그 자체로 테스트가 아니지만 이러한 초기 단계에서는 Spark 셸에서 실험하여 변환 및 특히 접근 방식의 결과를 파악해야합니다. 예를 들어, 물리적 및 논리적 쿼리 계획, 분할 전략 및 보존과 같은 많은 다른 기능을 가진 데이터의 상태를 검사 할 수 있습니다 toDebugString
, explain
, glom
, show
, printSchema
, 등을. 나는 당신이 그것들을 탐구하게 할 것입니다.
또한 local[2]
Spark 셸과 테스트에서 마스터를 로 설정하여 작업 배포를 시작한 후에 만 발생할 수있는 문제를 식별 할 수 있습니다.
Spark와의 통합 테스트
이제 재미있는 부분입니다.
도우미 함수 및 / 변환 논리 의 품질에 대해 확신 한 후 Spark 를 통합 테스트 하려면 빌드 도구 및 테스트 프레임 워크에 관계없이 몇 가지 작업을 수행하는 것이 중요합니다.RDD
DataFrame
- JVM 메모리를 늘리십시오.
- 분기를 활성화하고 병렬 실행을 비활성화합니다.
- 테스트 프레임 워크를 사용하여 Spark 통합 테스트를 제품군으로 축적하고
SparkContext
모든 테스트 전을 초기화하고 모든 테스트 후에 중지합니다.
ScalaTest를 사용하면 BeforeAndAfterAll
(일반적으로 선호하는) 혼합 하거나 BeforeAndAfterEach
@ShankarKoirala가 Spark 아티팩트를 초기화하고 해체 할 수 있습니다. 나는 이것이 예외를 만들기에 합당한 장소라는 것을 알고 있지만, 나는 var
당신이 사용해야 하는 변경 가능한들을 정말로 좋아하지 않습니다 .
대출 패턴
또 다른 접근 방식은 대출 패턴 을 사용하는 것 입니다.
예 (ScalaTest 사용) :
class MySpec extends WordSpec with Matchers with SparkContextSetup {
"My analytics" should {
"calculate the right thing" in withSparkContext { (sparkContext) =>
val data = Seq(...)
val rdd = sparkContext.parallelize(data)
val total = rdd.map(...).filter(...).map(...).reduce(_ + _)
total shouldBe 1000
}
}
}
trait SparkContextSetup {
def withSparkContext(testMethod: (SparkContext) => Any) {
val conf = new SparkConf()
.setMaster("local")
.setAppName("Spark test")
val sparkContext = new SparkContext(conf)
try {
testMethod(sparkContext)
}
finally sparkContext.stop()
}
}
보시다시피, 대출 패턴은 고차 함수를 사용 SparkContext
하여 테스트 에 "대출" 한 다음 완료된 후 폐기합니다.
고통 지향적 프로그래밍 (감사합니다, Nathan)
그것은 전적으로 선호도의 문제이지만, 다른 프레임 워크를 가져 오기 전에 가능한 한 Loan Pattern을 사용하고 직접 연결하는 것을 선호합니다. 경량화를 유지하려는 것 외에도 프레임 워크는 때때로 디버깅 테스트 실패를 추론하기 어렵게 만드는 많은 "마법"을 추가합니다. 그래서 저는 고통 지향 프로그래밍 접근 방식을 취합니다. 여기서 새로운 프레임 워크를 사용하지 않는 데 따른 고통이 너무 많을 때까지 추가하지 않습니다. 그러나 이것은 당신에게 달려 있습니다.
대체 프레임 워크에 대한 최선의 선택은 물론 @ShankarKoirala가 언급했듯이 spark-testing-base 입니다. 이 경우 위의 테스트는 다음과 같습니다.
class MySpec extends WordSpec with Matchers with SharedSparkContext {
"My analytics" should {
"calculate the right thing" in {
val data = Seq(...)
val rdd = sc.parallelize(data)
val total = rdd.map(...).filter(...).map(...).reduce(_ + _)
total shouldBe 1000
}
}
}
을 처리하기 위해 아무것도 할 필요가 없었습니다 SparkContext
. SharedSparkContext
그 모든 것을 무료 sc
로 주었습니다 SparkContext
. 개인적으로 나는 대출 패턴이 내가 필요한 것을 정확히 수행하기 때문에이 목적을 위해이 의존성을 가져 오지 않을 것입니다. 또한 분산 시스템에서 발생하는 예측 불가능 성이 너무 많아 지속적인 통합에서 문제가 발생할 때 타사 라이브러리의 소스 코드에서 발생하는 마법을 추적해야하는 것은 정말 고통 스러울 수 있습니다.
이제 어디 스파크 테스트 기반이 정말 빛나는 하둡 기반 같은 헬퍼 함께 HDFSClusterLike
하고 YARNClusterLike
. 이러한 특성을 혼합하면 많은 설정 문제를 줄일 수 있습니다. 그것이 빛을 발하는 또 다른 곳은 Scalacheck 와 유사한 속성 및 생성기입니다. 물론 속성 기반 테스트가 작동하는 방식과 이것이 유용한 이유를 이해하고 있다고 가정합니다. 그러나 다시 한 번, 분석과 테스트가 정교함 수준에 도달 할 때까지 개인적으로 사용을 보류했습니다.
"Sith만이 절대를 다룬다." -오비완 케노비
물론 둘 중 하나를 선택할 필요는 없습니다. 대부분의 테스트에 대출 패턴 접근 방식을 사용하고 좀 더 엄격한 테스트에 대해서만 스파크 테스트 기반을 사용할 수 있습니다. 선택은 바이너리가 아닙니다. 둘 다 할 수 있습니다.
Spark Streaming과 통합 테스트
마지막으로, 인 메모리 값이있는 SparkStreaming 통합 테스트 설정이 spark-testing-base 없이 어떻게 보일지에 대한 스 니펫을 제시하고 싶습니다 .
val sparkContext: SparkContext = ...
val data: Seq[(String, String)] = Seq(("a", "1"), ("b", "2"), ("c", "3"))
val rdd: RDD[(String, String)] = sparkContext.parallelize(data)
val strings: mutable.Queue[RDD[(String, String)]] = mutable.Queue.empty[RDD[(String, String)]]
val streamingContext = new StreamingContext(sparkContext, Seconds(1))
val dStream: InputDStream = streamingContext.queueStream(strings)
strings += rdd
이것은보기보다 간단합니다. 실제로 데이터 시퀀스를 대기열로 전환하여 DStream
. 대부분은 실제로 Spark API와 함께 작동하는 상용구 설정입니다. 그럼에도 불구하고 이것을 spark-testing-base StreamingSuiteBase
에서 찾은 것과 비교하여 선호하는 것을 결정할 수 있습니다.
이것은 내 게시물 중 가장 긴 게시물 일 수 있으므로 여기에 남겨 두겠습니다. 다른 모든 애플리케이션 개발을 개선 한 것과 동일한 민첩한 소프트웨어 엔지니어링 관행을 통해 분석 품질을 개선하는 데 도움이되는 다른 아이디어가 있기를 바랍니다.
그리고 뻔뻔한 플러그에 대한 사과 로 Apache Spark를 사용한 분석 과정을 확인할 수 있습니다 . 여기에서 이러한 많은 아이디어 등을 다룹니다. 곧 온라인 버전이 나오기를 바랍니다.
FunSuite와 BeforeAndAfterEach를 사용하여 아래와 같이 간단한 테스트를 작성할 수 있습니다.
class Tests extends FunSuite with BeforeAndAfterEach {
var sparkSession : SparkSession = _
override def beforeEach() {
sparkSession = SparkSession.builder().appName("udf testings")
.master("local")
.config("", "")
.getOrCreate()
}
test("your test name here"){
//your unit test assert here like below
assert("True".toLowerCase == "true")
}
override def afterEach() {
sparkSession.stop()
}
}
테스트에서 함수를 만들 필요가 없습니다. 간단히 다음과 같이 작성할 수 있습니다.
test ("test name") {//implementation and assert}
Holden Karau는 정말 멋진 테스트 스파크 테스트베이스를 작성했습니다.
아래는 간단한 예입니다.
class TestSharedSparkContext extends FunSuite with SharedSparkContext {
val expectedResult = List(("a", 3),("b", 2),("c", 4))
test("Word counts should be equal to expected") {
verifyWordCount(Seq("c a a b a c b c c"))
}
def verifyWordCount(seq: Seq[String]): Unit = {
assertResult(expectedResult)(new WordCount().transform(sc.makeRDD(seq)).collect().toList)
}
}
도움이 되었기를 바랍니다!
나는 SparkSessionTestWrapper
테스트 클래스에 혼합 될 수 있는 특성 을 만드는 것을 좋아합니다 . Shankar의 접근 방식은 작동하지만 여러 파일이있는 테스트 스위트의 경우 엄청나게 느립니다.
import org.apache.spark.sql.SparkSession
trait SparkSessionTestWrapper {
lazy val spark: SparkSession = {
SparkSession.builder().master("local").appName("spark session").getOrCreate()
}
}
특성은 다음과 같이 사용할 수 있습니다.
class DatasetSpec extends FunSpec with SparkSessionTestWrapper {
import spark.implicits._
describe("#count") {
it("returns a count of all the rows in a DataFrame") {
val sourceDF = Seq(
("jets"),
("barcelona")
).toDF("team")
assert(sourceDF.count === 2)
}
}
}
접근 방식 을 사용하는 실제 예제 는 spark-spec 프로젝트를 확인하십시오 SparkSessionTestWrapper
.
최신 정보
스파크 테스트 기반 라이브러리는 자동으로 (경우 예를 들어, 특정 특성은 테스트 클래스에 혼합하는 SparkSession을 추가 DataFrameSuiteBase
로 혼합, 당신은을 통해 SparkSession에 액세스해야합니다 spark
변수).
사용자가 테스트 를 실행할 때 SparkSession을 완전히 제어 할 수 있도록 spark-fast-tests 라는 별도의 테스트 라이브러리를 만들었습니다 . 테스트 도우미 라이브러리가 SparkSession을 설정해야한다고 생각하지 않습니다. 사용자는 적절하다고 판단되는대로 SparkSession을 시작하고 중지 할 수 있어야합니다 (SparkSession 하나를 만들고 테스트 스위트 실행 전체에 걸쳐 사용하고 싶습니다).
다음은 작동중인 spark-fast-tests assertSmallDatasetEquality
메서드 의 예입니다 .
import com.github.mrpowers.spark.fast.tests.DatasetComparer
class DatasetSpec extends FunSpec with SparkSessionTestWrapper with DatasetComparer {
import spark.implicits._
it("aliases a DataFrame") {
val sourceDF = Seq(
("jose"),
("li"),
("luisa")
).toDF("name")
val actualDF = sourceDF.select(col("name").alias("student"))
val expectedDF = Seq(
("jose"),
("li"),
("luisa")
).toDF("student")
assertSmallDatasetEquality(actualDF, expectedDF)
}
}
}
Spark 1.6 부터 사용할 수 SharedSparkContext
있거나 SharedSQLContext
Spark가 자체 단위 테스트에 사용합니다.
class YourAppTest extends SharedSQLContext {
var app: YourApp = _
protected override def beforeAll(): Unit = {
super.beforeAll()
app = new YourApp
}
protected override def afterAll(): Unit = {
super.afterAll()
}
test("Your test") {
val df = sqlContext.read.json("examples/src/main/resources/people.json")
app.run(df)
}
Spark 2.3 SharedSparkSession
을 사용할 수 있으므로 :
class YourAppTest extends SharedSparkSession {
var app: YourApp = _
protected override def beforeAll(): Unit = {
super.beforeAll()
app = new YourApp
}
protected override def afterAll(): Unit = {
super.afterAll()
}
test("Your test") {
df = spark.read.json("examples/src/main/resources/people.json")
app.run(df)
}
최신 정보:
Maven 종속성 :
<dependency>
<groupId>org.scalactic</groupId>
<artifactId>scalactic</artifactId>
<version>SCALATEST_VERSION</version>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest</artifactId>
<version>SCALATEST_VERSION</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core</artifactId>
<version>SPARK_VERSION</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql</artifactId>
<version>SPARK_VERSION</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
SBT 종속성 :
"org.scalactic" %% "scalactic" % SCALATEST_VERSION
"org.scalatest" %% "scalatest" % SCALATEST_VERSION % "test"
"org.apache.spark" %% "spark-core" % SPARK_VERSION % Test classifier "tests"
"org.apache.spark" %% "spark-sql" % SPARK_VERSION % Test classifier "tests"
또한 다양한 테스트 슈트가 방대한 세트가있는 Spark의 테스트 소스 를 확인할 수 있습니다 .
아래 코드로 문제를 해결할 수 있습니다.
spark-hive 종속성이 프로젝트 pom에 추가되었습니다.
class DataFrameTest extends FunSuite with DataFrameSuiteBase{
test("test dataframe"){
val sparkSession=spark
import sparkSession.implicits._
var df=sparkSession.read.format("csv").load("path/to/csv")
//rest of the operations.
}
}
JUnit을 사용한 단위 테스트의 또 다른 방법
import org.apache.spark.sql.SparkSession
import org.junit.Assert._
import org.junit.{After, Before, _}
@Test
class SessionSparkTest {
var spark: SparkSession = _
@Before
def beforeFunction(): Unit = {
//spark = SessionSpark.getSparkSession()
spark = SparkSession.builder().appName("App Name").master("local").getOrCreate()
System.out.println("Before Function")
}
@After
def afterFunction(): Unit = {
spark.stop()
System.out.println("After Function")
}
@Test
def testRddCount() = {
val rdd = spark.sparkContext.parallelize(List(1, 2, 3))
val count = rdd.count()
assertTrue(3 == count)
}
@Test
def testDfNotEmpty() = {
val sqlContext = spark.sqlContext
import sqlContext.implicits._
val numDf = spark.sparkContext.parallelize(List(1, 2, 3)).toDF("nums")
assertFalse(numDf.head(1).isEmpty)
}
@Test
def testDfEmpty() = {
val sqlContext = spark.sqlContext
import sqlContext.implicits._
val emptyDf = spark.sqlContext.createDataset(spark.sparkContext.emptyRDD[Num])
assertTrue(emptyDf.head(1).isEmpty)
}
}
case class Num(id: Int)
참조 URL : https://stackoverflow.com/questions/43729262/how-to-write-unit-tests-in-spark-2-0
'code' 카테고리의 다른 글
NotNull 또는 Nullable 가져 오기 및 Android Studio가 컴파일되지 않음 (0) | 2021.01.06 |
---|---|
mysqli_real_connect () : (HY000 / 2002) : 해당 파일 또는 디렉토리 없음 (0) | 2021.01.06 |
List () 대신 AsQueryable ()을 사용하는 이유는 무엇입니까? (0) | 2021.01.06 |
Notepad ++ 정규식 줄 찾기 및 삭제 (0) | 2021.01.06 |
ExpandableListView에서 프로그래밍 방식으로 그룹 축소 (0) | 2021.01.05 |