code

Spark 2.0 이상에서 단위 테스트를 작성하는 방법은 무엇입니까?

codestyles 2021. 1. 6. 08:21
반응형

Spark 2.0 이상에서 단위 테스트를 작성하는 방법은 무엇입니까?


SparkSessionJUnit 테스트 프레임 워크 로 테스트 할 합리적인 방법을 찾으려고 노력하고 있습니다. 에 대한 좋은 예제가있는 것 같지만 SparkContext해당 예제 SparkSessionspark-testing-base 내부적으로 여러 곳에서 사용되지만 해당 예제가에서 작동하는 방법을 알아낼 수 없었 습니다 . 스파크 테스트 기반을 사용하지 않는 솔루션을 시도하는 것이 실제로 올바른 방법이 아니라면 기쁩니다.

간단한 테스트 케이스 ( 전체 MWE 프로젝트 와 함께 build.sbt) :

import com.holdenkarau.spark.testing.DataFrameSuiteBase
import org.junit.Test
import org.scalatest.FunSuite

import org.apache.spark.sql.SparkSession


class SessionTest extends FunSuite with DataFrameSuiteBase {

  implicit val sparkImpl: SparkSession = spark

  @Test
  def simpleLookupTest {

    val homeDir = System.getProperty("user.home")
    val training = spark.read.format("libsvm")
      .load(s"$homeDir\\Documents\\GitHub\\sample_linear_regression_data.txt")
    println("completed simple lookup test")
  }

}

JUnit으로 이것을 실행 한 결과는로드 라인에서 NPE입니다.

java.lang.NullPointerException
    at SessionTest.simpleLookupTest(SessionTest.scala:16)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
    at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

로드되는 파일이 존재하는지 여부는 중요하지 않습니다. 올바르게 구성된 SparkSession에서 더 현명한 오류가 발생 합니다.


이 뛰어난 질문을 보내 주셔서 감사합니다. 어떤 이유로 Spark에 관해서는 모두가 분석에 너무 몰두하여 지난 15 년 동안 등장한 훌륭한 소프트웨어 엔지니어링 관행을 잊어 버립니다. 이것이 우리가 과정에서 테스트 및 지속적 통합 (DevOps와 같은 다른 것들 중에서)에 대해 논의하는 이유입니다.

용어에 대한 간략한 설명

진정한 단위 테스트는 테스트의 모든 구성 요소를 완벽하게 제어 할 수 있습니다 의미합니다. 데이터베이스, REST 호출, 파일 시스템 또는 시스템 시계와 상호 작용할 수 없습니다. Gerard Mezaros가 xUnit 테스트 패턴 에 넣은 것처럼 모든 것이 "두 배"(예 : mocked, stubbed 등)되어야합니다 . 이것이 의미론처럼 보이지만 정말 중요합니다. 이를 이해하지 못하는 것이 지속적인 통합에서 간헐적 인 테스트 실패를 보는 주요 이유 중 하나입니다.

우리는 여전히 단위 테스트를 할 수 있습니다

따라서 이러한 이해를 고려할 때 단위 테스트 RDD는 불가능합니다. 그러나 분석을 개발할 때 여전히 단위 테스트를위한 장소가 있습니다.

간단한 작업을 고려하십시오.

rdd.map(foo).map(bar)

이곳까지 foobar간단한 함수입니다. 그것들은 정상적인 방법으로 단위 테스트를 할 수 있으며, 가능한 한 많은 코너 케이스와 함께 있어야합니다. 결국, 테스트 픽스처이든 아니면 RDD?

Spark Shell을 잊지 마세요

이것은 그 자체로 테스트가 아니지만 이러한 초기 단계에서는 Spark 셸에서 실험하여 변환 및 특히 접근 방식의 결과를 파악해야합니다. 예를 들어, 물리적 및 논리적 쿼리 계획, 분할 전략 및 보존과 같은 많은 다른 기능을 가진 데이터의 상태를 검사 할 수 있습니다 toDebugString, explain, glom, show, printSchema, 등을. 나는 당신이 그것들을 탐구하게 할 것입니다.

또한 local[2]Spark 셸과 테스트에서 마스터를 설정하여 작업 배포를 시작한 후에 만 ​​발생할 수있는 문제를 식별 할 수 있습니다.

Spark와의 통합 테스트

이제 재미있는 부분입니다.

도우미 함수 및 / 변환 논리 의 품질에 대해 확신 한 후 Spark 통합 테스트 하려면 빌드 도구 및 테스트 프레임 워크에 관계없이 몇 가지 작업을 수행하는 것이 중요합니다.RDDDataFrame

  • JVM 메모리를 늘리십시오.
  • 분기를 활성화하고 병렬 실행을 비활성화합니다.
  • 테스트 프레임 워크를 사용하여 Spark 통합 테스트를 제품군으로 축적하고 SparkContext모든 테스트 전을 초기화하고 모든 테스트 후에 중지합니다.

ScalaTest를 사용하면 BeforeAndAfterAll(일반적으로 선호하는) 혼합 하거나 BeforeAndAfterEach@ShankarKoirala가 Spark 아티팩트를 초기화하고 해체 할 수 있습니다. 나는 이것이 예외를 만들기에 합당한 장소라는 것을 알고 있지만, 나는 var당신이 사용해야 하는 변경 가능한들을 정말로 좋아하지 않습니다 .

대출 패턴

또 다른 접근 방식은 대출 패턴 을 사용하는 것 입니다.

예 (ScalaTest 사용) :

class MySpec extends WordSpec with Matchers with SparkContextSetup {
  "My analytics" should {
    "calculate the right thing" in withSparkContext { (sparkContext) =>
      val data = Seq(...)
      val rdd = sparkContext.parallelize(data)
      val total = rdd.map(...).filter(...).map(...).reduce(_ + _)

      total shouldBe 1000
    }
  }
}

trait SparkContextSetup {
  def withSparkContext(testMethod: (SparkContext) => Any) {
    val conf = new SparkConf()
      .setMaster("local")
      .setAppName("Spark test")
    val sparkContext = new SparkContext(conf)
    try {
      testMethod(sparkContext)
    }
    finally sparkContext.stop()
  }
} 

보시다시피, 대출 패턴은 고차 함수를 사용 SparkContext하여 테스트 에 "대출" 한 다음 완료된 후 폐기합니다.

고통 지향적 프로그래밍 (감사합니다, Nathan)

그것은 전적으로 선호도의 문제이지만, 다른 프레임 워크를 가져 오기 전에 가능한 한 Loan Pattern을 사용하고 직접 연결하는 것을 선호합니다. 경량화를 유지하려는 것 외에도 프레임 워크는 때때로 디버깅 테스트 실패를 추론하기 어렵게 만드는 많은 "마법"을 추가합니다. 그래서 저는 고통 지향 프로그래밍 접근 방식을 취합니다. 여기서 새로운 프레임 워크를 사용하지 않는 데 따른 고통이 너무 많을 때까지 추가하지 않습니다. 그러나 이것은 당신에게 달려 있습니다.

대체 프레임 워크에 대한 최선의 선택은 물론 @ShankarKoirala가 언급했듯이 spark-testing-base 입니다. 이 경우 위의 테스트는 다음과 같습니다.

class MySpec extends WordSpec with Matchers with SharedSparkContext {
      "My analytics" should {
        "calculate the right thing" in { 
          val data = Seq(...)
          val rdd = sc.parallelize(data)
          val total = rdd.map(...).filter(...).map(...).reduce(_ + _)

          total shouldBe 1000
        }
      }
 }

을 처리하기 위해 아무것도 할 필요가 없었습니다 SparkContext. SharedSparkContext그 모든 것을 무료 sc주었습니다 SparkContext. 개인적으로 나는 대출 패턴이 내가 필요한 것을 정확히 수행하기 때문에이 목적을 위해이 의존성을 가져 오지 않을 것입니다. 또한 분산 시스템에서 발생하는 예측 불가능 성이 너무 많아 지속적인 통합에서 문제가 발생할 때 타사 라이브러리의 소스 코드에서 발생하는 마법을 추적해야하는 것은 정말 고통 스러울 수 있습니다.

이제 어디 스파크 테스트 기반이 정말 빛나는 하둡 기반 같은 헬퍼 함께 HDFSClusterLike하고 YARNClusterLike. 이러한 특성을 혼합하면 많은 설정 문제를 줄일 수 있습니다. 그것이 빛을 발하는 또 다른 곳은 Scalacheck 와 유사한 속성 및 생성기입니다. 물론 속성 기반 테스트가 작동하는 방식과 이것이 유용한 이유를 이해하고 있다고 가정합니다. 그러나 다시 한 번, 분석과 테스트가 정교함 수준에 도달 할 때까지 개인적으로 사용을 보류했습니다.

"Sith만이 절대를 다룬다." -오비완 케노비

물론 둘 중 하나를 선택할 필요는 없습니다. 대부분의 테스트에 대출 패턴 접근 방식을 사용하고 좀 더 엄격한 테스트에 대해서만 스파크 테스트 기반을 사용할 수 있습니다. 선택은 바이너리가 아닙니다. 둘 다 할 수 있습니다.

Spark Streaming과 통합 테스트

마지막으로, 인 메모리 값이있는 SparkStreaming 통합 테스트 설정이 spark-testing-base 없이 어떻게 보일지에 대한 스 니펫을 제시하고 싶습니다 .

val sparkContext: SparkContext = ...
val data: Seq[(String, String)] = Seq(("a", "1"), ("b", "2"), ("c", "3"))
val rdd: RDD[(String, String)] = sparkContext.parallelize(data)
val strings: mutable.Queue[RDD[(String, String)]] = mutable.Queue.empty[RDD[(String, String)]]
val streamingContext = new StreamingContext(sparkContext, Seconds(1))
val dStream: InputDStream = streamingContext.queueStream(strings)
strings += rdd

이것은보기보다 간단합니다. 실제로 데이터 시퀀스를 대기열로 전환하여 DStream. 대부분은 실제로 Spark API와 함께 작동하는 상용구 설정입니다. 그럼에도 불구하고 이것을 spark-testing-base StreamingSuiteBase 에서 찾은 것과 비교하여 선호하는 것을 결정할 수 있습니다.

이것은 내 게시물 중 가장 긴 게시물 일 수 있으므로 여기에 남겨 두겠습니다. 다른 모든 애플리케이션 개발을 개선 한 것과 동일한 민첩한 소프트웨어 엔지니어링 관행을 통해 분석 품질을 개선하는 데 도움이되는 다른 아이디어가 있기를 바랍니다.

그리고 뻔뻔한 플러그에 대한 사과 로 Apache Spark를 사용한 분석 과정을 확인할 수 있습니다 . 여기에서 이러한 많은 아이디어 등을 다룹니다. 곧 온라인 버전이 나오기를 바랍니다.


FunSuite와 BeforeAndAfterEach를 사용하여 아래와 같이 간단한 테스트를 작성할 수 있습니다.

class Tests extends FunSuite with BeforeAndAfterEach {

  var sparkSession : SparkSession = _
  override def beforeEach() {
    sparkSession = SparkSession.builder().appName("udf testings")
      .master("local")
      .config("", "")
      .getOrCreate()
  }

  test("your test name here"){
    //your unit test assert here like below
    assert("True".toLowerCase == "true")
  }

  override def afterEach() {
    sparkSession.stop()
  }
}

테스트에서 함수를 만들 필요가 없습니다. 간단히 다음과 같이 작성할 수 있습니다.

test ("test name") {//implementation and assert}

Holden Karau는 정말 멋진 테스트 스파크 테스트베이스를 작성했습니다.

아래는 간단한 예입니다.

class TestSharedSparkContext extends FunSuite with SharedSparkContext {

  val expectedResult = List(("a", 3),("b", 2),("c", 4))

  test("Word counts should be equal to expected") {
    verifyWordCount(Seq("c a a b a c b c c"))
  }

  def verifyWordCount(seq: Seq[String]): Unit = {
    assertResult(expectedResult)(new WordCount().transform(sc.makeRDD(seq)).collect().toList)
  }
}

도움이 되었기를 바랍니다!


나는 SparkSessionTestWrapper테스트 클래스에 혼합 될 수 있는 특성 을 만드는 것을 좋아합니다 . Shankar의 접근 방식은 작동하지만 여러 파일이있는 테스트 스위트의 경우 엄청나게 느립니다.

import org.apache.spark.sql.SparkSession

trait SparkSessionTestWrapper {

  lazy val spark: SparkSession = {
    SparkSession.builder().master("local").appName("spark session").getOrCreate()
  }

}

특성은 다음과 같이 사용할 수 있습니다.

class DatasetSpec extends FunSpec with SparkSessionTestWrapper {

  import spark.implicits._

  describe("#count") {

    it("returns a count of all the rows in a DataFrame") {

      val sourceDF = Seq(
        ("jets"),
        ("barcelona")
      ).toDF("team")

      assert(sourceDF.count === 2)

    }

  }

}

접근 방식 을 사용하는 실제 예제 spark-spec 프로젝트를 확인하십시오 SparkSessionTestWrapper.

최신 정보

스파크 테스트 기반 라이브러리는 자동으로 (경우 예를 들어, 특정 특성은 테스트 클래스에 혼합하는 SparkSession을 추가 DataFrameSuiteBase로 혼합, 당신은을 통해 SparkSession에 액세스해야합니다 spark변수).

사용자가 테스트 를 실행할 때 SparkSession을 완전히 제어 할 수 있도록 spark-fast-tests 라는 별도의 테스트 라이브러리를 만들었습니다 . 테스트 도우미 라이브러리가 SparkSession을 설정해야한다고 생각하지 않습니다. 사용자는 적절하다고 판단되는대로 SparkSession을 시작하고 중지 할 수 있어야합니다 (SparkSession 하나를 만들고 테스트 스위트 실행 전체에 걸쳐 사용하고 싶습니다).

다음은 작동중인 spark-fast-tests assertSmallDatasetEquality메서드 의 예입니다 .

import com.github.mrpowers.spark.fast.tests.DatasetComparer

class DatasetSpec extends FunSpec with SparkSessionTestWrapper with DatasetComparer {

  import spark.implicits._

    it("aliases a DataFrame") {

      val sourceDF = Seq(
        ("jose"),
        ("li"),
        ("luisa")
      ).toDF("name")

      val actualDF = sourceDF.select(col("name").alias("student"))

      val expectedDF = Seq(
        ("jose"),
        ("li"),
        ("luisa")
      ).toDF("student")

      assertSmallDatasetEquality(actualDF, expectedDF)

    }

  }

}

Spark 1.6 부터 사용할 수 SharedSparkContext있거나 SharedSQLContextSpark가 자체 단위 테스트에 사용합니다.

class YourAppTest extends SharedSQLContext {

  var app: YourApp = _

  protected override def beforeAll(): Unit = {
    super.beforeAll()

    app = new YourApp
  }

  protected override def afterAll(): Unit = {
    super.afterAll()
  }

  test("Your test") {
    val df = sqlContext.read.json("examples/src/main/resources/people.json")

    app.run(df)
  }

Spark 2.3 SharedSparkSession 을 사용할 수 있으므로 :

class YourAppTest extends SharedSparkSession {

  var app: YourApp = _

  protected override def beforeAll(): Unit = {
    super.beforeAll()

    app = new YourApp
  }

  protected override def afterAll(): Unit = {
    super.afterAll()
  }

  test("Your test") {
    df = spark.read.json("examples/src/main/resources/people.json")

    app.run(df)
  }

최신 정보:

Maven 종속성 :

<dependency>
  <groupId>org.scalactic</groupId>
  <artifactId>scalactic</artifactId>
  <version>SCALATEST_VERSION</version>
</dependency>
<dependency>
  <groupId>org.scalatest</groupId>
  <artifactId>scalatest</artifactId>
  <version>SCALATEST_VERSION</version>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core</artifactId>
  <version>SPARK_VERSION</version>
  <type>test-jar</type>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql</artifactId>
  <version>SPARK_VERSION</version>
  <type>test-jar</type>
  <scope>test</scope>
</dependency>

SBT 종속성 :

"org.scalactic" %% "scalactic" % SCALATEST_VERSION
"org.scalatest" %% "scalatest" % SCALATEST_VERSION % "test"
"org.apache.spark" %% "spark-core" % SPARK_VERSION % Test classifier "tests"
"org.apache.spark" %% "spark-sql" % SPARK_VERSION % Test classifier "tests"

또한 다양한 테스트 슈트가 방대한 세트가있는 Spark의 테스트 소스확인할 수 있습니다 .


아래 코드로 문제를 해결할 수 있습니다.

spark-hive 종속성이 프로젝트 pom에 추가되었습니다.

class DataFrameTest extends FunSuite with DataFrameSuiteBase{
        test("test dataframe"){
        val sparkSession=spark
        import sparkSession.implicits._
        var df=sparkSession.read.format("csv").load("path/to/csv")
        //rest of the operations.
        }
        }

JUnit을 사용한 단위 테스트의 또 다른 방법

import org.apache.spark.sql.SparkSession
import org.junit.Assert._
import org.junit.{After, Before, _}

@Test
class SessionSparkTest {
  var spark: SparkSession = _

  @Before
  def beforeFunction(): Unit = {
    //spark = SessionSpark.getSparkSession()
    spark = SparkSession.builder().appName("App Name").master("local").getOrCreate()
    System.out.println("Before Function")
  }

  @After
  def afterFunction(): Unit = {
    spark.stop()
    System.out.println("After Function")
  }

  @Test
  def testRddCount() = {
    val rdd = spark.sparkContext.parallelize(List(1, 2, 3))
    val count = rdd.count()
    assertTrue(3 == count)
  }

  @Test
  def testDfNotEmpty() = {
    val sqlContext = spark.sqlContext
    import sqlContext.implicits._
    val numDf = spark.sparkContext.parallelize(List(1, 2, 3)).toDF("nums")
    assertFalse(numDf.head(1).isEmpty)
  }

  @Test
  def testDfEmpty() = {
    val sqlContext = spark.sqlContext
    import sqlContext.implicits._
    val emptyDf = spark.sqlContext.createDataset(spark.sparkContext.emptyRDD[Num])
    assertTrue(emptyDf.head(1).isEmpty)
  }
}

case class Num(id: Int)

참조 URL : https://stackoverflow.com/questions/43729262/how-to-write-unit-tests-in-spark-2-0

반응형