New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-45242][SQL] Use DataFrame ID to semantically validate CollectMetrics #43010
[SPARK-45242][SQL] Use DataFrame ID to semantically validate CollectMetrics #43010
Conversation
@cloud-fan trying this idea |
python/pyspark/sql/connect/plan.py
Outdated
@@ -1197,6 +1197,7 @@ def plan(self, session: "SparkConnectClient") -> proto.Relation: | |||
plan.collect_metrics.input.CopyFrom(self._child.plan(session)) | |||
plan.collect_metrics.name = self._name | |||
plan.collect_metrics.metrics.extend([self.col_to_expr(x, session) for x in self._exprs]) | |||
plan.collect_metrics.dataframe_id = self._child._plan_id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does Spark Connect DataFrame also have the unique id? cc @HyukjinKwon @zhengruifeng
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the _plan_id
is unique:
spark/python/pyspark/sql/connect/plan.py
Lines 57 to 72 in 5299e54
class LogicalPlan: | |
_lock: Lock = Lock() | |
_nextPlanId: int = 0 | |
INDENT = 2 | |
def __init__(self, child: Optional["LogicalPlan"]) -> None: | |
self._child = child | |
plan_id: Optional[int] = None | |
with LogicalPlan._lock: | |
plan_id = LogicalPlan._nextPlanId | |
LogicalPlan._nextPlanId += 1 | |
assert plan_id is not None | |
self._plan_id = plan_id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. plan_id is enough functionally.
def check(plan: LogicalPlan): Unit = plan.foreach { node => | ||
node match { | ||
case metrics @ CollectMetrics(name, _, _) => | ||
val simplifiedMetrics = simplifyPlanForCollectedMetrics(metrics.canonicalized) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we can remove simplifyPlanForCollectedMetrics
now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
expectedErrorClass = "INVALID_OBSERVED_METRICS.NON_AGGREGATE_FUNC_ARG_IS_ATTRIBUTE", | ||
expectedMessageParameters = Map("expr" -> "\"a\"") | ||
) | ||
|
||
// Unwrapped non-deterministic expression | ||
assertAnalysisErrorClass( | ||
CollectMetrics("event", Rand(10).as("rnd") :: Nil, testRelation), | ||
CollectMetrics("event", Rand(10).as("rnd") :: Nil, testRelation, 5), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can always use 0
in this test suite? Unless we have multiple CollectMetrics
in one plan tree
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I keep different dataframe id when it makes sense, otherwise changes back to 0.
@@ -990,6 +990,9 @@ message CollectMetrics { | |||
|
|||
// (Required) The metric sequence. | |||
repeated Expression metrics = 3; | |||
|
|||
// (Required) A unique DataFrame id. | |||
int64 dataframe_id = 4; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since dataframe_id
is set the plan_id
why not reusing the plan_id
in RelationCommon
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. done
b907596
to
ed9b814
Compare
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
Show resolved
Hide resolved
@cloud-fan @zhengruifeng This PR is ready for another review. |
@@ -1192,6 +1192,7 @@ def plan(self, session: "SparkConnectClient") -> proto.Relation: | |||
assert self._child is not None | |||
|
|||
plan = proto.Relation() | |||
plan.common.plan_id = self._child._plan_id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can add a comment here: we treat the id of the to-be-observed plan as the dataframe id for CollectMetrics
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am doing this because CollectMetrics
does not re-use LogicalPlan
's _create_proto_relation
which helps set the plan_id
. Looks like majority of the plans set the plan_id
by default. However I do not have context on why CollectMetrcis
is implemented by the current way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you change here is fine, it is equivalent to _create_proto_relation
I notice there are a few places not using _create_proto_relation
, I will check and modify them in separate work
thanks, merging to master/3.5! |
…etrics ### What changes were proposed in this pull request? In existing code, plan matching is used to validate if two CollectMetrics have the same name but different semantic. However, plan matching approach is fragile. A better way to tackle this is to just utilize the unique DataFrame Id. This is because observe API is only supported by DataFrame API. SQL does not have such syntax. So two CollectMetric are semantic the same if and only if they have same name and same DataFrame id. ### Why are the changes needed? This is to use a more stable approach to replace a fragile approach. ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? UT ### Was this patch authored or co-authored using generative AI tooling? NO Closes #43010 from amaliujia/another_approch_for_collect_metrics. Authored-by: Rui Wang <rui.wang@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 7c3c7c5) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@@ -1969,7 +1969,8 @@ trait SupportsSubquery extends LogicalPlan | |||
case class CollectMetrics( | |||
name: String, | |||
metrics: Seq[NamedExpression], | |||
child: LogicalPlan) | |||
child: LogicalPlan, | |||
dataframeId: Long) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@amaliujia when I execute the following command:
build/mvn clean install -pl connector/connect/server -am -DskipTests
mvn test -pl connector/connect/server
The following test failures will occur:
- Test observe *** FAILED ***
== FAIL: Plans do not match ===
!CollectMetrics my_metric, [min(id#0) AS min_val#0, max(id#0) AS max_val#0, sum(id#0) AS sum(id)#0L], 0 CollectMetrics my_metric, [min(id#0) AS min_val#0, max(id#0) AS max_val#0, sum(id#0) AS sum(id)#0L], 53
+- LocalRelation <empty>, [id#0, name#0] +- LocalRelation <empty>, [id#0, name#0] (PlanTest.scala:179)
It seems that the failure is due to the differences in dataframeId
when comparing plans.
For this, I've created SPARK-45357
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lines 958 to 1017 in ab92cae
test("Test observe") { | |
val connectPlan0 = | |
connectTestRelation.observe( | |
"my_metric", | |
proto_min("id".protoAttr).as("min_val"), | |
proto_max("id".protoAttr).as("max_val"), | |
proto_sum("id".protoAttr)) | |
val sparkPlan0 = | |
sparkTestRelation.observe( | |
"my_metric", | |
min(Column("id")).as("min_val"), | |
max(Column("id")).as("max_val"), | |
sum(Column("id"))) | |
comparePlans(connectPlan0, sparkPlan0) | |
val connectPlan1 = | |
connectTestRelation.observe("my_metric", proto_min("id".protoAttr).as("min_val")) | |
val sparkPlan1 = | |
sparkTestRelation.observe("my_metric", min(Column("id")).as("min_val")) | |
comparePlans(connectPlan1, sparkPlan1) | |
checkError( | |
exception = intercept[AnalysisException] { | |
analyzePlan( | |
transform(connectTestRelation.observe("my_metric", "id".protoAttr.cast("string")))) | |
}, | |
errorClass = "INVALID_OBSERVED_METRICS.NON_AGGREGATE_FUNC_ARG_IS_ATTRIBUTE", | |
parameters = Map("expr" -> "\"id AS id\"")) | |
val connectPlan2 = | |
connectTestRelation.observe( | |
Observation("my_metric"), | |
proto_min("id".protoAttr).as("min_val"), | |
proto_max("id".protoAttr).as("max_val"), | |
proto_sum("id".protoAttr)) | |
val sparkPlan2 = | |
sparkTestRelation.observe( | |
Observation("my_metric"), | |
min(Column("id")).as("min_val"), | |
max(Column("id")).as("max_val"), | |
sum(Column("id"))) | |
comparePlans(connectPlan2, sparkPlan2) | |
val connectPlan3 = | |
connectTestRelation.observe( | |
Observation("my_metric"), | |
proto_min("id".protoAttr).as("min_val")) | |
val sparkPlan3 = | |
sparkTestRelation.observe(Observation("my_metric"), min(Column("id")).as("min_val")) | |
comparePlans(connectPlan3, sparkPlan3) | |
checkError( | |
exception = intercept[AnalysisException] { | |
analyzePlan( | |
transform( | |
connectTestRelation.observe(Observation("my_metric"), "id".protoAttr.cast("string")))) | |
}, | |
errorClass = "INVALID_OBSERVED_METRICS.NON_AGGREGATE_FUNC_ARG_IS_ATTRIBUTE", | |
parameters = Map("expr" -> "\"id AS id\"")) | |
} |
For this test case, should we ignore the comparison of dataframeId?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GA can pass the test now, which seems to be because the test order makes sparkTestRelation.id
exactly 0. However, the test arrangement order of Maven is different from sbt, so sparkTestRelation.id
is not 0 when using maven
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
omitting df id in the comparison of this test case makes sense to me.
### What changes were proposed in this pull request? In #43010, a new DataFrameId field is added to `CollectMetrics`. We should also canonicalize the new DataFrame id field to avoid downstream plan comparison failures. ### Why are the changes needed? avoid downstream plan comparison failures. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT ### Was this patch authored or co-authored using generative AI tooling? NO Closes #43594 from amaliujia/do_not_canonicalize_dataframe_id. Authored-by: Rui Wang <rui.wang@databricks.com> Signed-off-by: Gengliang Wang <gengliang@apache.org>
What changes were proposed in this pull request?
In existing code, plan matching is used to validate if two CollectMetrics have the same name but different semantic. However, plan matching approach is fragile. A better way to tackle this is to just utilize the unique DataFrame Id. This is because observe API is only supported by DataFrame API. SQL does not have such syntax.
So two CollectMetric are semantic the same if and only if they have same name and same DataFrame id.
Why are the changes needed?
This is to use a more stable approach to replace a fragile approach.
Does this PR introduce any user-facing change?
NO
How was this patch tested?
UT
Was this patch authored or co-authored using generative AI tooling?
NO