Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-45242][SQL] Use DataFrame ID to semantically validate CollectMetrics #43010

Closed

Conversation

amaliujia
Copy link
Contributor

@amaliujia amaliujia commented Sep 20, 2023

What changes were proposed in this pull request?

In existing code, plan matching is used to validate if two CollectMetrics have the same name but different semantic. However, plan matching approach is fragile. A better way to tackle this is to just utilize the unique DataFrame Id. This is because observe API is only supported by DataFrame API. SQL does not have such syntax.

So two CollectMetric are semantic the same if and only if they have same name and same DataFrame id.

Why are the changes needed?

This is to use a more stable approach to replace a fragile approach.

Does this PR introduce any user-facing change?

NO

How was this patch tested?

UT

Was this patch authored or co-authored using generative AI tooling?

NO

@amaliujia amaliujia changed the title [WIP] [SPARK-41086][SQL] Use DataFrame ID to semantically validate CollectMetrics Sep 20, 2023
@amaliujia
Copy link
Contributor Author

@cloud-fan trying this idea

@@ -1197,6 +1197,7 @@ def plan(self, session: "SparkConnectClient") -> proto.Relation:
plan.collect_metrics.input.CopyFrom(self._child.plan(session))
plan.collect_metrics.name = self._name
plan.collect_metrics.metrics.extend([self.col_to_expr(x, session) for x in self._exprs])
plan.collect_metrics.dataframe_id = self._child._plan_id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does Spark Connect DataFrame also have the unique id? cc @HyukjinKwon @zhengruifeng

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the _plan_id is unique:

class LogicalPlan:
_lock: Lock = Lock()
_nextPlanId: int = 0
INDENT = 2
def __init__(self, child: Optional["LogicalPlan"]) -> None:
self._child = child
plan_id: Optional[int] = None
with LogicalPlan._lock:
plan_id = LogicalPlan._nextPlanId
LogicalPlan._nextPlanId += 1
assert plan_id is not None
self._plan_id = plan_id

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. plan_id is enough functionally.

def check(plan: LogicalPlan): Unit = plan.foreach { node =>
node match {
case metrics @ CollectMetrics(name, _, _) =>
val simplifiedMetrics = simplifyPlanForCollectedMetrics(metrics.canonicalized)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can remove simplifyPlanForCollectedMetrics now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

expectedErrorClass = "INVALID_OBSERVED_METRICS.NON_AGGREGATE_FUNC_ARG_IS_ATTRIBUTE",
expectedMessageParameters = Map("expr" -> "\"a\"")
)

// Unwrapped non-deterministic expression
assertAnalysisErrorClass(
CollectMetrics("event", Rand(10).as("rnd") :: Nil, testRelation),
CollectMetrics("event", Rand(10).as("rnd") :: Nil, testRelation, 5),
Copy link
Contributor

@cloud-fan cloud-fan Sep 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can always use 0 in this test suite? Unless we have multiple CollectMetrics in one plan tree

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I keep different dataframe id when it makes sense, otherwise changes back to 0.

@@ -990,6 +990,9 @@ message CollectMetrics {

// (Required) The metric sequence.
repeated Expression metrics = 3;

// (Required) A unique DataFrame id.
int64 dataframe_id = 4;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since dataframe_id is set the plan_id

why not reusing the plan_id in RelationCommon?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. done

@amaliujia amaliujia force-pushed the another_approch_for_collect_metrics branch from b907596 to ed9b814 Compare September 21, 2023 07:15
@amaliujia
Copy link
Contributor Author

@cloud-fan @zhengruifeng This PR is ready for another review.

@@ -1192,6 +1192,7 @@ def plan(self, session: "SparkConnectClient") -> proto.Relation:
assert self._child is not None

plan = proto.Relation()
plan.common.plan_id = self._child._plan_id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can add a comment here: we treat the id of the to-be-observed plan as the dataframe id for CollectMetrics

Copy link
Contributor Author

@amaliujia amaliujia Sep 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am doing this because CollectMetrics does not re-use LogicalPlan's _create_proto_relation which helps set the plan_id. Looks like majority of the plans set the plan_id by default. However I do not have context on why CollectMetrcis is implemented by the current way.

cc @zhengruifeng

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you change here is fine, it is equivalent to _create_proto_relation

I notice there are a few places not using _create_proto_relation, I will check and modify them in separate work

@cloud-fan
Copy link
Contributor

cloud-fan commented Sep 22, 2023

thanks, merging to master/3.5!

@cloud-fan cloud-fan closed this in 7c3c7c5 Sep 22, 2023
cloud-fan pushed a commit that referenced this pull request Sep 22, 2023
…etrics

### What changes were proposed in this pull request?

In existing code, plan matching is used to validate if two CollectMetrics have the same name but different semantic. However, plan matching approach is fragile. A better way to tackle this is to just utilize the unique DataFrame Id. This is because observe API is only supported by DataFrame API. SQL does not have such syntax.

So two CollectMetric are semantic the same if and only if they have same name and same DataFrame id.

### Why are the changes needed?

This is to use a more stable approach to replace a fragile approach.

### Does this PR introduce _any_ user-facing change?

NO

### How was this patch tested?

UT

### Was this patch authored or co-authored using generative AI tooling?

NO

Closes #43010 from amaliujia/another_approch_for_collect_metrics.

Authored-by: Rui Wang <rui.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 7c3c7c5)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@amaliujia amaliujia changed the title [SPARK-41086][SQL] Use DataFrame ID to semantically validate CollectMetrics [SPARK-45242][SQL] Use DataFrame ID to semantically validate CollectMetrics Sep 22, 2023
@@ -1969,7 +1969,8 @@ trait SupportsSubquery extends LogicalPlan
case class CollectMetrics(
name: String,
metrics: Seq[NamedExpression],
child: LogicalPlan)
child: LogicalPlan,
dataframeId: Long)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amaliujia when I execute the following command:

build/mvn clean install -pl connector/connect/server -am -DskipTests
mvn test -pl connector/connect/server 

The following test failures will occur:

- Test observe *** FAILED ***
  == FAIL: Plans do not match ===
  !CollectMetrics my_metric, [min(id#0) AS min_val#0, max(id#0) AS max_val#0, sum(id#0) AS sum(id)#0L], 0   CollectMetrics my_metric, [min(id#0) AS min_val#0, max(id#0) AS max_val#0, sum(id#0) AS sum(id)#0L], 53
   +- LocalRelation <empty>, [id#0, name#0]                                                                 +- LocalRelation <empty>, [id#0, name#0] (PlanTest.scala:179) 

It seems that the failure is due to the differences in dataframeId when comparing plans.

For this, I've created SPARK-45357

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test("Test observe") {
val connectPlan0 =
connectTestRelation.observe(
"my_metric",
proto_min("id".protoAttr).as("min_val"),
proto_max("id".protoAttr).as("max_val"),
proto_sum("id".protoAttr))
val sparkPlan0 =
sparkTestRelation.observe(
"my_metric",
min(Column("id")).as("min_val"),
max(Column("id")).as("max_val"),
sum(Column("id")))
comparePlans(connectPlan0, sparkPlan0)
val connectPlan1 =
connectTestRelation.observe("my_metric", proto_min("id".protoAttr).as("min_val"))
val sparkPlan1 =
sparkTestRelation.observe("my_metric", min(Column("id")).as("min_val"))
comparePlans(connectPlan1, sparkPlan1)
checkError(
exception = intercept[AnalysisException] {
analyzePlan(
transform(connectTestRelation.observe("my_metric", "id".protoAttr.cast("string"))))
},
errorClass = "INVALID_OBSERVED_METRICS.NON_AGGREGATE_FUNC_ARG_IS_ATTRIBUTE",
parameters = Map("expr" -> "\"id AS id\""))
val connectPlan2 =
connectTestRelation.observe(
Observation("my_metric"),
proto_min("id".protoAttr).as("min_val"),
proto_max("id".protoAttr).as("max_val"),
proto_sum("id".protoAttr))
val sparkPlan2 =
sparkTestRelation.observe(
Observation("my_metric"),
min(Column("id")).as("min_val"),
max(Column("id")).as("max_val"),
sum(Column("id")))
comparePlans(connectPlan2, sparkPlan2)
val connectPlan3 =
connectTestRelation.observe(
Observation("my_metric"),
proto_min("id".protoAttr).as("min_val"))
val sparkPlan3 =
sparkTestRelation.observe(Observation("my_metric"), min(Column("id")).as("min_val"))
comparePlans(connectPlan3, sparkPlan3)
checkError(
exception = intercept[AnalysisException] {
analyzePlan(
transform(
connectTestRelation.observe(Observation("my_metric"), "id".protoAttr.cast("string"))))
},
errorClass = "INVALID_OBSERVED_METRICS.NON_AGGREGATE_FUNC_ARG_IS_ATTRIBUTE",
parameters = Map("expr" -> "\"id AS id\""))
}

For this test case, should we ignore the comparison of dataframeId?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GA can pass the test now, which seems to be because the test order makes sparkTestRelation.id exactly 0. However, the test arrangement order of Maven is different from sbt, so sparkTestRelation.id is not 0 when using maven

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

omitting df id in the comparison of this test case makes sense to me.

gengliangwang pushed a commit that referenced this pull request Oct 31, 2023
### What changes were proposed in this pull request?

In #43010, a new DataFrameId field is added to `CollectMetrics`. We should also canonicalize the new DataFrame id field to avoid downstream plan comparison failures.

### Why are the changes needed?

avoid downstream plan comparison failures.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

UT

### Was this patch authored or co-authored using generative AI tooling?

NO

Closes #43594 from amaliujia/do_not_canonicalize_dataframe_id.

Authored-by: Rui Wang <rui.wang@databricks.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants