Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-45357][CONNECT][TESTS] Normalize dataframeId when comparing CollectMetrics in SparkConnectProtoSuite #43155

Closed
wants to merge 8 commits into from

Conversation

LuciferYang
Copy link
Contributor

@LuciferYang LuciferYang commented Sep 27, 2023

What changes were proposed in this pull request?

This PR add a new function normalizeDataframeId to sets the dataframeId to the constant 0 of CollectMetrics before comparing LogicalPlan in the test case of SparkConnectProtoSuite.

Why are the changes needed?

The test scenario in SparkConnectProtoSuite does not need to compare the dataframeId in CollectMetrics

Does this PR introduce any user-facing change?

No

How was this patch tested?

  • Manually check

run

build/mvn clean install -pl connector/connect/server -am -DskipTests
build/mvn test -pl connector/connect/server 

Before

- Test observe *** FAILED ***
  == FAIL: Plans do not match ===
  !CollectMetrics my_metric, [min(id#0) AS min_val#0, max(id#0) AS max_val#0, sum(id#0) AS sum(id)#0L], 0   CollectMetrics my_metric, [min(id#0) AS min_val#0, max(id#0) AS max_val#0, sum(id#0) AS sum(id)#0L], 53
   +- LocalRelation <empty>, [id#0, name#0]                                                                 +- LocalRelation <empty>, [id#0, name#0] (PlanTest.scala:179) 

After

Run completed in 41 seconds, 631 milliseconds.
Total number of tests run: 882
Suites: completed 24, aborted 0
Tests: succeeded 882, failed 0, canceled 0, ignored 0, pending 0
All tests passed.

Was this patch authored or co-authored using generative AI tooling?

No

@LuciferYang LuciferYang marked this pull request as draft September 27, 2023 17:39
@LuciferYang LuciferYang changed the title [SPARK-45357][CONNECT][TESTS] Ignore dataframeId when comparing CollectMetrics in SparkConnectProtoSuite. [SPARK-45357][CONNECT][TESTS] Ignore dataframeId when comparing CollectMetrics in SparkConnectProtoSuite. Sep 27, 2023
@@ -1068,6 +1068,10 @@ class SparkConnectProtoSuite extends PlanTest with SparkConnectPlanTest {
// Compares proto plan with LogicalPlan.
private def comparePlans(connectPlan: proto.Relation, sparkPlan: LogicalPlan): Unit = {
val connectAnalyzed = analyzePlan(transform(connectPlan))
comparePlans(connectAnalyzed, sparkPlan, false)
(connectAnalyzed, sparkPlan) match {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is the first case, this pr only made a simple fix.

@LuciferYang LuciferYang changed the title [SPARK-45357][CONNECT][TESTS] Ignore dataframeId when comparing CollectMetrics in SparkConnectProtoSuite. [SPARK-45357][CONNECT][TESTS] Ignore dataframeId when comparing CollectMetrics in SparkConnectProtoSuite Sep 27, 2023
@LuciferYang LuciferYang changed the title [SPARK-45357][CONNECT][TESTS] Ignore dataframeId when comparing CollectMetrics in SparkConnectProtoSuite [SPARK-45357][CONNECT][TESTS] Normalize dataframeId when comparing CollectMetrics in SparkConnectProtoSuite Sep 27, 2023
@@ -1068,6 +1068,10 @@ class SparkConnectProtoSuite extends PlanTest with SparkConnectPlanTest {
// Compares proto plan with LogicalPlan.
private def comparePlans(connectPlan: proto.Relation, sparkPlan: LogicalPlan): Unit = {
val connectAnalyzed = analyzePlan(transform(connectPlan))
comparePlans(connectAnalyzed, sparkPlan, false)
(connectAnalyzed, sparkPlan) match {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the current scenario, connectAnalyzed is transformed from proto.Relation. When it is CollectMetrics, the dataframeId is always 0.

But if the sparkPlan is CollectMetrics, its dataframeId value is determined by its corresponding DataFrame.

In the sbt test, SparkConnectProtoSuite is tested earlier, sparkTestRelation is the first created DataFrame with id as 0, thus the GA test didn't trigger the failure described in the pr.

When using Maven for testing, SparkConnectProtoSuite is tested later, sparkTestRelation is not the first created DataFrame with id not being 0, thus causing the test to fail.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clarification!

@LuciferYang LuciferYang marked this pull request as ready for review September 27, 2023 18:10
@LuciferYang
Copy link
Contributor Author

cc @cloud-fan @zhengruifeng

@LuciferYang
Copy link
Contributor Author

also cc @amaliujia for double check

@@ -1068,6 +1068,10 @@ class SparkConnectProtoSuite extends PlanTest with SparkConnectPlanTest {
// Compares proto plan with LogicalPlan.
private def comparePlans(connectPlan: proto.Relation, sparkPlan: LogicalPlan): Unit = {
val connectAnalyzed = analyzePlan(transform(connectPlan))
comparePlans(connectAnalyzed, sparkPlan, false)
(connectAnalyzed, sparkPlan) match {
case (l: CollectMetrics, r: CollectMetrics) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just add a small normalize function to reset df id to 0 for all CollectMetrics in the query plan?

@@ -1067,7 +1067,11 @@ class SparkConnectProtoSuite extends PlanTest with SparkConnectPlanTest {

// Compares proto plan with LogicalPlan.
private def comparePlans(connectPlan: proto.Relation, sparkPlan: LogicalPlan): Unit = {
def normalizeDataframeId(plan: LogicalPlan): LogicalPlan = plan match {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a new small function, is it ok?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we use transform? why only top-level CollectMetrics?

@amaliujia
Copy link
Contributor

LGTM

Another way to fix is manually construct the CollectMetrics to compare with the proto generated version. But this PR's approach is fine too.

@zhengruifeng
Copy link
Contributor

In PlanGenerationTestSuite, the planId was reset before each test

override protected def beforeEach(): Unit = {
session.resetPlanIdGenerator()
}

IIRC, the dataframeId in CollectMetrics is also the planId, so is it possible to simply reset the planId before problematic test suites?

@LuciferYang
Copy link
Contributor Author

LuciferYang commented Oct 4, 2023

In PlanGenerationTestSuite, the planId was reset before each test

override protected def beforeEach(): Unit = {
session.resetPlanIdGenerator()
}

IIRC, the dataframeId in CollectMetrics is also the planId, so is it possible to simply reset the planId before problematic test suites?

No, maybe it's not feasible. The current test case is comparing the plans generated by connectTestRelation.observe and sparkTestRelation.observe respectively, we can't just clear one side's dataframeId. If follow your argument, we would also need to add a new function for Dataset to reset Dataset#curId.

val curId = new java.util.concurrent.atomic.AtomicLong()

private val id = Dataset.curId.getAndIncrement()

@varargs
def observe(name: String, expr: Column, exprs: Column*): Dataset[T] = withTypedPlan {
CollectMetrics(name, (expr +: exprs).map(_.named), logicalPlan, id)
}

Personally, I think this change is not worth it. On the other hand, even if we are willing to make the above changes, we might still need to address how to synchronize curId and planId, otherwise, they still cannot achieve the same value.

@LuciferYang
Copy link
Contributor Author

The GA failure is unrelated to the current pr:

starting mypy annotations test...
annotations failed mypy checks:
/usr/local/lib/python3.9/dist-packages/torch/_dynamo/variables/tensor.py:369: error: INTERNAL ERROR -- Please try using mypy master on GitHub:
https://mypy.readthedocs.io/en/stable/common_issues.html#using-a-development-mypy-build
If this issue continues with mypy master, please report a bug at https://github.com/python/mypy/issues
version: 0.982
/usr/local/lib/python3.9/dist-packages/torch/_dynamo/variables/tensor.py:369: : note: please use --show-traceback to print a traceback when reporting a bug
2
Error: Process completed with exit code 2.

@LuciferYang
Copy link
Contributor Author

LuciferYang commented Oct 4, 2023

@zhengruifeng Moreover, this test case is in the connect-server module, the function used by connectTestRelation.observe in the test is

def observe(name: String, expr: Expression, exprs: Expression*): Relation = {
Relation
.newBuilder()
.setCollectMetrics(
CollectMetrics
.newBuilder()
.setInput(logicalPlan)
.setName(name)
.addAllMetrics((expr +: exprs).asJava))
.build()
}
def observe(observation: Observation, expr: Expression, exprs: Expression*): Relation = {
Relation
.newBuilder()
.setCollectMetrics(
CollectMetrics
.newBuilder()
.setInput(logicalPlan)
.setName(observation.name)
.addAllMetrics((expr +: exprs).asJava))
.build()
}

It seems that there is no planId here and there's no way of calling session.resetPlanIdGenerator() .

And the observe function on the client side has not been implemented yet.

def observe(name: String, expr: Column, exprs: Column*): Dataset[T] = {
throw new UnsupportedOperationException("observe is not implemented.")
}

@LuciferYang
Copy link
Contributor Author

rebase to fix python lint

@zhengruifeng
Copy link
Contributor

also cc @hvanhovell

@zhengruifeng
Copy link
Contributor

merged to master

@LuciferYang
Copy link
Contributor Author

Thanks @zhengruifeng @cloud-fan @amaliujia ~

LuciferYang added a commit to LuciferYang/spark that referenced this pull request Oct 7, 2023
…`CollectMetrics` in `SparkConnectProtoSuite`

### What changes were proposed in this pull request?
This PR add a new function `normalizeDataframeId` to sets the `dataframeId` to the constant 0 of `CollectMetrics`  before comparing `LogicalPlan` in the test case of `SparkConnectProtoSuite`.

### Why are the changes needed?
The test scenario in `SparkConnectProtoSuite` does not need to compare the `dataframeId` in `CollectMetrics`

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
- Manually check

run

```
build/mvn clean install -pl connector/connect/server -am -DskipTests
build/mvn test -pl connector/connect/server
```

**Before**

```
- Test observe *** FAILED ***
  == FAIL: Plans do not match ===
  !CollectMetrics my_metric, [min(id#0) AS min_val#0, max(id#0) AS max_val#0, sum(id#0) AS sum(id)#0L], 0   CollectMetrics my_metric, [min(id#0) AS min_val#0, max(id#0) AS max_val#0, sum(id#0) AS sum(id)#0L], 53
   +- LocalRelation <empty>, [id#0, name#0]                                                                 +- LocalRelation <empty>, [id#0, name#0] (PlanTest.scala:179)
```

**After**

```
Run completed in 41 seconds, 631 milliseconds.
Total number of tests run: 882
Suites: completed 24, aborted 0
Tests: succeeded 882, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
```

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#43155 from LuciferYang/SPARK-45357.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
LuciferYang added a commit to LuciferYang/spark that referenced this pull request Feb 16, 2024
…`CollectMetrics` in `SparkConnectProtoSuite`

### What changes were proposed in this pull request?
This PR add a new function `normalizeDataframeId` to sets the `dataframeId` to the constant 0 of `CollectMetrics`  before comparing `LogicalPlan` in the test case of `SparkConnectProtoSuite`.

### Why are the changes needed?
The test scenario in `SparkConnectProtoSuite` does not need to compare the `dataframeId` in `CollectMetrics`

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
- Manually check

run

```
build/mvn clean install -pl connector/connect/server -am -DskipTests
build/mvn test -pl connector/connect/server
```

**Before**

```
- Test observe *** FAILED ***
  == FAIL: Plans do not match ===
  !CollectMetrics my_metric, [min(id#0) AS min_val#0, max(id#0) AS max_val#0, sum(id#0) AS sum(id)#0L], 0   CollectMetrics my_metric, [min(id#0) AS min_val#0, max(id#0) AS max_val#0, sum(id#0) AS sum(id)#0L], 53
   +- LocalRelation <empty>, [id#0, name#0]                                                                 +- LocalRelation <empty>, [id#0, name#0] (PlanTest.scala:179)
```

**After**

```
Run completed in 41 seconds, 631 milliseconds.
Total number of tests run: 882
Suites: completed 24, aborted 0
Tests: succeeded 882, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
```

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#43155 from LuciferYang/SPARK-45357.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants