Skip to content

Commit db3e06b

Browse files
authored
YQ-4188 Fix StartingMessageTs for federated topic (#17600)
1 parent d573ec3 commit db3e06b

File tree

3 files changed

+134
-1
lines changed

3 files changed

+134
-1
lines changed

ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -574,6 +574,7 @@ void TDqPqRdReadActor::InitChild() {
574574
NextOffsetFromRD[partitionKey.PartitionId] = offset;
575575
}
576576
}
577+
StartingMessageTimestamp = Parent->StartingMessageTimestamp;
577578
SRC_LOG_I("Send TEvCoordinatorChangesSubscribe to local RD (" << LocalRowDispatcherActorId << ")");
578579
Send(LocalRowDispatcherActorId, new NFq::TEvRowDispatcher::TEvCoordinatorChangesSubscribe());
579580
Schedule(TDuration::Seconds(PrintStatePeriodSec), new TEvPrivate::TEvPrintState());

ydb/library/yql/providers/pq/async_io/dq_pq_read_actor_base.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ void TDqPqReadActorBase::LoadState(const TSourceState& state) {
120120
ingressBytes += stateProto.GetIngressBytes();
121121
}
122122
TStringStream str;
123-
str << "SessionId: " << GetSessionId() << " Restoring offset: ";
123+
str << "SessionId: " << GetSessionId() << " StartingMessageTs " << minStartingMessageTs << " Restoring offset: ";
124124
for (const auto& [key, value] : PartitionToOffset) {
125125
str << "{" << key << "," << value << "},";
126126
}

ydb/tests/fq/yds/test_row_dispatcher.py

+132
Original file line numberDiff line numberDiff line change
@@ -1001,3 +1001,135 @@ def test_sensors(self, kikimr, client):
10011001
filtered_bytes = stat['Graph=0']['IngressFilteredBytes']['sum']
10021002
filtered_rows = stat['Graph=0']['IngressFilteredRows']['sum']
10031003
assert filtered_bytes > 1 and filtered_rows > 0
1004+
1005+
@yq_v1
1006+
@pytest.mark.skip(reason="Is not implemented")
1007+
def test_group_by_hop_restart_query(self, kikimr, client):
1008+
client.create_yds_connection(
1009+
YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True
1010+
)
1011+
self.init_topics("test_group_by_hop_restart")
1012+
1013+
sql1 = Rf'''
1014+
$data = SELECT * FROM {YDS_CONNECTION}.`{self.input_topic}`
1015+
WITH (format=json_each_row, SCHEMA (time String NOT NULL, project String NOT NULL));
1016+
$hop_data = SELECT
1017+
CAST(HOP_END() as String) as time,
1018+
COUNT(*) as count
1019+
FROM $data
1020+
GROUP BY
1021+
HOP(CAST(time as TimeStamp), "PT10S", "PT10S", "PT0S");
1022+
INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`
1023+
SELECT ToBytes(Unwrap(Json::SerializeJson(Yson::From(TableRow())))) FROM $hop_data;'''
1024+
1025+
query_id = start_yds_query(kikimr, client, sql1)
1026+
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1)
1027+
1028+
data = [
1029+
'{"time": "2025-04-23T09:00:00.000000Z", "project": "project1"}',
1030+
'{"time": "2025-04-23T09:00:01.000000Z", "project": "project1"}',
1031+
'{"time": "2025-04-23T09:00:02.000000Z", "project": "project1"}',
1032+
'{"time": "2025-04-23T09:00:03.000000Z", "project": "project1"}',
1033+
'{"time": "2025-04-23T09:00:04.000000Z", "project": "project1"}',
1034+
'{"time": "2025-04-23T09:00:15.000000Z", "project": "project1"}',
1035+
'{"time": "2025-04-23T09:00:16.000000Z", "project": "project1"}',
1036+
]
1037+
self.write_stream(data)
1038+
expected = ['{"count":5,"time":"2025-04-23T09:00:10Z"}']
1039+
assert self.read_stream(len(expected), topic_path=self.output_topic) == expected
1040+
1041+
kikimr.compute_plane.wait_completed_checkpoints(
1042+
query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2
1043+
)
1044+
stop_yds_query(client, query_id)
1045+
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 0)
1046+
1047+
data = [
1048+
'{"time": "2025-04-23T09:00:17.000000Z", "project": "project1"}',
1049+
'{"time": "2025-04-23T09:00:18.000000Z", "project": "project1"}',
1050+
'{"time": "2025-04-23T09:00:21.000000Z", "project": "project1"}',
1051+
'{"time": "2025-04-23T09:00:25.000000Z", "project": "project1"}',
1052+
'{"time": "2025-04-23T09:00:31.000000Z", "project": "project1"}',
1053+
]
1054+
self.write_stream(data)
1055+
1056+
client.modify_query(
1057+
query_id,
1058+
"continue",
1059+
sql1,
1060+
type=fq.QueryContent.QueryType.STREAMING,
1061+
state_load_mode=fq.StateLoadMode.FROM_LAST_CHECKPOINT,
1062+
streaming_disposition=StreamingDisposition.from_last_checkpoint(),
1063+
)
1064+
client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
1065+
1066+
expected = [
1067+
'{"count":4,"time":"2025-04-23T09:00:20Z"}',
1068+
'{"count":2,"time":"2025-04-23T09:00:30Z"}']
1069+
assert self.read_stream(len(expected), topic_path=self.output_topic) == expected
1070+
1071+
stop_yds_query(client, query_id)
1072+
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 0)
1073+
1074+
@yq_v1
1075+
def test_group_by_hop_restart_node(self, kikimr, client):
1076+
client.create_yds_connection(
1077+
YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True
1078+
)
1079+
self.init_topics("test_group_by_hop_restart")
1080+
1081+
sql1 = Rf'''
1082+
$data = SELECT * FROM {YDS_CONNECTION}.`{self.input_topic}`
1083+
WITH (format=json_each_row, SCHEMA (time String NOT NULL, project String NOT NULL));
1084+
$hop_data = SELECT
1085+
project,
1086+
CAST(HOP_END() as String) as time,
1087+
COUNT(*) as count
1088+
FROM $data
1089+
GROUP BY
1090+
HOP(CAST(time as TimeStamp), "PT10S", "PT10S", "PT0S"), project;
1091+
INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`
1092+
SELECT ToBytes(Unwrap(Json::SerializeJson(Yson::From(TableRow())))) FROM $hop_data;'''
1093+
1094+
query_id = start_yds_query(kikimr, client, sql1)
1095+
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1)
1096+
1097+
data = [
1098+
'{"time": "2025-04-23T09:00:00.000000Z", "project": "project1"}',
1099+
'{"time": "2025-04-23T09:00:04.000000Z", "project": "project1"}',
1100+
'{"time": "2025-04-23T09:00:05.000000Z", "project": "project2"}',
1101+
'{"time": "2025-04-23T09:00:15.000000Z", "project": "project1"}',
1102+
'{"time": "2025-04-23T09:00:16.000000Z", "project": "project2"}',
1103+
]
1104+
self.write_stream(data)
1105+
expected = [
1106+
'{"count":2,"project":"project1","time":"2025-04-23T09:00:10Z"}',
1107+
'{"count":1,"project":"project2","time":"2025-04-23T09:00:10Z"}']
1108+
assert self.read_stream(len(expected), topic_path=self.output_topic) == expected
1109+
1110+
kikimr.compute_plane.wait_completed_checkpoints(
1111+
query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2
1112+
)
1113+
1114+
node_index = 1
1115+
logging.debug("Restart compute node {}".format(node_index))
1116+
kikimr.compute_plane.kikimr_cluster.nodes[node_index].stop()
1117+
1118+
data = [
1119+
'{"time": "2025-04-23T09:00:17.000000Z", "project": "project1"}',
1120+
'{"time": "2025-04-23T09:00:18.000000Z", "project": "project2"}',
1121+
'{"time": "2025-04-23T09:00:21.000000Z", "project": "project1"}',
1122+
'{"time": "2025-04-23T09:00:25.000000Z", "project": "project2"}'
1123+
]
1124+
self.write_stream(data)
1125+
1126+
kikimr.compute_plane.kikimr_cluster.nodes[node_index].start()
1127+
kikimr.compute_plane.wait_bootstrap(node_index)
1128+
1129+
expected = [
1130+
'{"count":2,"project":"project1","time":"2025-04-23T09:00:20Z"}',
1131+
'{"count":2,"project":"project2","time":"2025-04-23T09:00:20Z"}']
1132+
assert self.read_stream(len(expected), topic_path=self.output_topic) == expected
1133+
1134+
stop_yds_query(client, query_id)
1135+
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 0)

0 commit comments

Comments
 (0)