Skip to content

Commit e6f829f

Browse files
Add ProcessGuarantee to WindowConfig
1 parent 3c92372 commit e6f829f

File tree

9 files changed

+88
-12
lines changed

9 files changed

+88
-12
lines changed

.ci/helm.sh

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -337,10 +337,22 @@ function ci::verify_crypto_function() {
337337
function ci::send_test_data() {
338338
inputtopic=$1
339339
inputmessage=$2
340-
kubectl exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-client produce -m "${inputmessage}" -n 100 "${inputtopic}"
340+
count=$3
341+
kubectl exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-client produce -m "${inputmessage}" -n $count "${inputtopic}"
341342
return 0
342343
}
343344

345+
function ci::verify_backlog() {
346+
topic=$1
347+
sub=$2
348+
expected=$3
349+
BACKLOG=$(kubectl exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-amdin topic stats $topic | grep msgBacklog)
350+
if [[ "$BACKLOG" == *"\"msgBacklog\" : $expecte"* ]]; then
351+
return 0
352+
fi
353+
return 1
354+
}
355+
344356
function ci::verify_exclamation_function() {
345357
inputtopic=$1
346358
outputtopic=$2

.ci/tests/integration/cases/logging-window-function/manifests.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ spec:
3636
windowConfig:
3737
windowLengthCount: 10
3838
slidingIntervalCount: 5
39+
processingGuarantee: atleast_once
40+
# the processingGuarantee should be manual for window function
41+
# see: https://github.com/apache/pulsar/pull/16279/files#diff-c77c024ccb31c94a7aa80cb8e96d7e370709157bdc104a1be7867fb6c7aa0586R318-R319
42+
processingGuarantee: manual
3943
subscriptionPosition: earliest
4044
---
4145
apiVersion: v1

.ci/tests/integration/cases/logging-window-function/verify.sh

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,17 +43,44 @@ if [ $? -ne 0 ]; then
4343
exit 1
4444
fi
4545

46-
verify_java_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::send_test_data "persistent://public/default/window-function-input-topic" "test-message" 2>&1)
46+
verify_java_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::send_test_data "persistent://public/default/window-function-input-topic" "test-message" 3 2>&1)
4747
if [ $? -ne 0 ]; then
4848
echo "$verify_java_result"
4949
kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true
5050
exit 1
5151
fi
5252

53+
sleep 3
54+
55+
# the 3 messages will not be processed, so backlog should be 3
56+
verify_backlog_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::verify_backlog "persistent://public/default/window-function-input-topic-partition-0" "public/default/window-function-sample" 3 2>&1)
57+
if [ $? -ne 0 ]; then
58+
echo "$verify_backlog_result"
59+
kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true
60+
exit 1
61+
fi
62+
63+
# it will fire the window with first 5 messages when get the 5th message, and then fire again with 10 messages when get 10th message
64+
verify_java_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::send_test_data "persistent://public/default/window-function-input-topic" "test-message" 7 2>&1)
65+
if [ $? -ne 0 ]; then
66+
echo "$verify_java_result"
67+
kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true
68+
exit 1
69+
fi
70+
71+
sleep 3
72+
73+
verify_backlog_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::verify_backlog "persistent://public/default/window-function-input-topic-partition-0" "public/default/window-function-sample" 0 2>&1)
74+
if [ $? -ne 0 ]; then
75+
echo "$verify_backlog_result"
76+
kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true
77+
exit 1
78+
fi
79+
5380
verify_log_result=$(kubectl logs -l compute.functionmesh.io/name=window-function-sample --tail=-1 | grep -e "-window-log" | wc -l)
5481
if [ $verify_log_result -ne 0 ]; then
5582
sub_name=$(echo $RANDOM | md5sum | head -c 20; echo;)
56-
verify_log_topic_result=$(kubectl exec -n ${PULSAR_NAMESPACE} ${PULSAR_RELEASE_NAME}-pulsar-broker-0 -- bin/pulsar-client consume -n 10 -s $sub_name --subscription-position Earliest "persistent://public/default/window-function-logs" | grep -e "-window-log" | wc -l)
83+
verify_log_topic_result=$(kubectl exec -n ${PULSAR_NAMESPACE} ${PULSAR_RELEASE_NAME}-pulsar-broker-0 -- bin/pulsar-client consume -n 15 -s $sub_name --subscription-position Earliest "persistent://public/default/window-function-logs" | grep -e "-window-log" | wc -l)
5784
if [ $verify_log_topic_result -ne 0 ]; then
5885
echo "e2e-test: ok" | yq eval -
5986
else

api/compute/v1alpha1/common.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -533,15 +533,16 @@ type LogConfig struct {
533533
}
534534

535535
type WindowConfig struct {
536-
ActualWindowFunctionClassName string `json:"actualWindowFunctionClassName"`
537-
WindowLengthCount *int32 `json:"windowLengthCount,omitempty"`
538-
WindowLengthDurationMs *int64 `json:"windowLengthDurationMs,omitempty"`
539-
SlidingIntervalCount *int32 `json:"slidingIntervalCount,omitempty"`
540-
SlidingIntervalDurationMs *int64 `json:"slidingIntervalDurationMs,omitempty"`
541-
LateDataTopic string `json:"lateDataTopic,omitempty"`
542-
MaxLagMs *int64 `json:"maxLagMs,omitempty"`
543-
WatermarkEmitIntervalMs *int64 `json:"watermarkEmitIntervalMs,omitempty"`
544-
TimestampExtractorClassName *string `json:"timestampExtractorClassName,omitempty"`
536+
ActualWindowFunctionClassName string `json:"actualWindowFunctionClassName"`
537+
WindowLengthCount *int32 `json:"windowLengthCount,omitempty"`
538+
WindowLengthDurationMs *int64 `json:"windowLengthDurationMs,omitempty"`
539+
SlidingIntervalCount *int32 `json:"slidingIntervalCount,omitempty"`
540+
SlidingIntervalDurationMs *int64 `json:"slidingIntervalDurationMs,omitempty"`
541+
LateDataTopic string `json:"lateDataTopic,omitempty"`
542+
MaxLagMs *int64 `json:"maxLagMs,omitempty"`
543+
WatermarkEmitIntervalMs *int64 `json:"watermarkEmitIntervalMs,omitempty"`
544+
TimestampExtractorClassName *string `json:"timestampExtractorClassName,omitempty"`
545+
ProcessingGuarantee ProcessGuarantee `json:"processingGuarantee,omitempty"`
545546
}
546547

547548
type VPASpec struct {

charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functionmeshes.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3786,6 +3786,13 @@ spec:
37863786
maxLagMs:
37873787
format: int64
37883788
type: integer
3789+
processingGuarantee:
3790+
enum:
3791+
- atleast_once
3792+
- atmost_once
3793+
- effectively_once
3794+
- manual
3795+
type: string
37893796
slidingIntervalCount:
37903797
format: int32
37913798
type: integer

charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functions.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3805,6 +3805,13 @@ spec:
38053805
maxLagMs:
38063806
format: int64
38073807
type: integer
3808+
processingGuarantee:
3809+
enum:
3810+
- atleast_once
3811+
- atmost_once
3812+
- effectively_once
3813+
- manual
3814+
type: string
38083815
slidingIntervalCount:
38093816
format: int32
38103817
type: integer

config/crd/bases/compute.functionmesh.io_functionmeshes.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3786,6 +3786,13 @@ spec:
37863786
maxLagMs:
37873787
format: int64
37883788
type: integer
3789+
processingGuarantee:
3790+
enum:
3791+
- atleast_once
3792+
- atmost_once
3793+
- effectively_once
3794+
- manual
3795+
type: string
37893796
slidingIntervalCount:
37903797
format: int32
37913798
type: integer

config/crd/bases/compute.functionmesh.io_functions.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3783,6 +3783,13 @@ spec:
37833783
maxLagMs:
37843784
format: int64
37853785
type: integer
3786+
processingGuarantee:
3787+
enum:
3788+
- atleast_once
3789+
- atmost_once
3790+
- effectively_once
3791+
- manual
3792+
type: string
37863793
slidingIntervalCount:
37873794
format: int32
37883795
type: integer

pkg/webhook/validate.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,10 @@ func validateWindowConfigs(windowConfig *v1alpha1.WindowConfig) *field.Error {
389389
"Watermark interval must be positive")
390390
}
391391
}
392+
if windowConfig.ProcessingGuarantee == v1alpha1.Manual || windowConfig.ProcessingGuarantee == v1alpha1.EffectivelyOnce {
393+
return field.Invalid(field.NewPath("spec").Child("windowConfig"), windowConfig.ProcessingGuarantee,
394+
"Window function only supports atleast_once and atmost_once")
395+
}
392396
}
393397
return nil
394398
}

0 commit comments

Comments
 (0)