Skip to content

Commit 73ac91d

Browse files
author
ramsrivatsa
committed
Integrate kafka capacity planner with buffers
1 parent d1f228f commit 73ac91d

File tree

2 files changed

+67
-57
lines changed

2 files changed

+67
-57
lines changed

service_capacity_modeling/models/common.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -826,14 +826,14 @@ def get_disk_from_current_capacity(
826826
current_disk_utilization = current_capacity.disk_utilization_gib.mid
827827

828828
if current_capacity.cluster_instance is None:
829-
cluster_instance = shapes.instance(current_capacity.cluster_instance_name)
829+
current_cluster_instance = shapes.instance(current_capacity.cluster_instance_name)
830830
else:
831-
cluster_instance = current_capacity.cluster_instance
831+
current_cluster_instance = current_capacity.cluster_instance
832832

833-
assert cluster_instance.drive is not None, "Drive should not be None"
833+
assert current_cluster_instance.drive is not None, "Drive should not be None"
834834

835835
zonal_disk_allocated = (
836-
cluster_instance.drive.max_size_gib
836+
current_cluster_instance.drive.max_size_gib
837837
* current_capacity.cluster_instance_count.mid
838838
)
839839

service_capacity_modeling/models/org/netflix/kafka.py

+63-53
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@
99
from pydantic import BaseModel
1010
from pydantic import Field
1111

12-
from service_capacity_modeling.interface import AccessConsistency
12+
from service_capacity_modeling.interface import AccessConsistency, Buffers
13+
from service_capacity_modeling.interface import Buffer
14+
from service_capacity_modeling.interface import BufferComponent
1315
from service_capacity_modeling.interface import AccessPattern
1416
from service_capacity_modeling.interface import CapacityDesires
1517
from service_capacity_modeling.interface import CapacityPlan
@@ -30,7 +32,8 @@
3032
from service_capacity_modeling.interface import RegionContext
3133
from service_capacity_modeling.interface import Requirements
3234
from service_capacity_modeling.models import CapacityModel
33-
from service_capacity_modeling.models.common import compute_stateful_zone
35+
from service_capacity_modeling.models.common import compute_stateful_zone, \
36+
zonal_requirements_from_current
3437
from service_capacity_modeling.models.common import normalize_cores
3538
from service_capacity_modeling.models.common import sqrt_staffed_cores
3639
from service_capacity_modeling.models.org.netflix.iso_date_math import iso_to_seconds
@@ -90,7 +93,7 @@ def _estimate_kafka_requirement( # pylint: disable=too-many-positional-argument
9093
write_mib_per_second
9194
)
9295
# use the current cluster capacity if available
93-
current_zonal_cluster = (
96+
current_zonal_capacity = (
9497
None
9598
if desires.current_clusters is None
9699
else (
@@ -100,39 +103,33 @@ def _estimate_kafka_requirement( # pylint: disable=too-many-positional-argument
100103
)
101104
)
102105

106+
bw_in = (
107+
(write_mib_per_second * MIB_IN_BYTES) * copies_per_region
108+
) / MEGABIT_IN_BYTES
109+
bw_out = (
110+
(
111+
(read_mib_per_second * MIB_IN_BYTES)
112+
+ ((write_mib_per_second * MIB_IN_BYTES) * (
113+
copies_per_region - 1))
114+
)
115+
) / MEGABIT_IN_BYTES
103116
if (
104-
current_zonal_cluster
105-
and current_zonal_cluster.cluster_instance
117+
current_zonal_capacity
118+
and current_zonal_capacity.cluster_instance
106119
and required_zone_size is not None
120+
and desires.current_clusters is not None
107121
):
108-
# For now, use the highest average CPU utilization seen on the cluster
109-
cpu_utilization = current_zonal_cluster.cpu_utilization.high
110-
# Validate with data if we should instead estimate 99th percentile here to get
111-
# rid of spikes in collected cpu usage data ?
112-
# Use the formula: 99th Percentile ≈ Average + (Z-score * SD).
113-
# https://en.wikipedia.org/wiki/Normal_curve_equivalent#:~:text=The%2099th%20percentile%20in%20a,49/2.3263%20=%2021.06.
114-
# curr_cpu_util = cpu_util.mid + (2.33 * (cpu_util.high - cpu_util.low) / 6)
115-
current_utilized_cores = (
116-
current_zonal_cluster.cluster_instance.cpu
117-
* required_zone_size
118-
* zones_per_region
119-
* cpu_utilization
120-
) / 100
121-
122-
# compute needed core capacity for cluster so avg cpu utilization for the
123-
# cluster stays under threshold for that tier
124-
needed_cores = int(
125-
current_utilized_cores / target_cpu_utilization(desires.service_tier)
122+
capacity_requirement = zonal_requirements_from_current(
123+
current_cluster=desires.current_clusters,
124+
buffers=desires.buffers,
125+
instance=current_zonal_capacity.cluster_instance,
126+
reference_shape=desires.reference_shape
126127
)
127-
logger.debug("kafka needed cores: %s", needed_cores)
128-
# Normalize those cores to the target shape
129-
reference_shape = current_zonal_cluster.cluster_instance
130-
needed_cores = normalize_cores(
131-
core_count=needed_cores,
132-
target_shape=instance,
133-
reference_shape=reference_shape,
134-
)
135-
logger.debug("kafka normalized needed cores: %s", needed_cores)
128+
needed_cores = int(capacity_requirement.cpu_cores.mid)
129+
needed_memory = int(capacity_requirement.mem_gib.mid)
130+
needed_disk = int(capacity_requirement.disk_gib.mid)
131+
needed_network_mbps = int(capacity_requirement.network_mbps.mid)
132+
logger.debug("kafka normalized needed cores: %s", capacity_requirement.cpu_cores)
136133
else:
137134
# We have no existing utilization to go from
138135
needed_cores = normalize_cores(
@@ -141,29 +138,20 @@ def _estimate_kafka_requirement( # pylint: disable=too-many-positional-argument
141138
reference_shape=desires.reference_shape,
142139
)
143140

144-
# (Nick): Keep 40% of available bandwidth for node recovery
145-
# (Joey): For kafka BW = BW_write + BW_reads
146-
# let X = input write BW
147-
# BW_in = X * RF
148-
# BW_out = X * (consumers) + X * (RF - 1)
149-
bw_in = (
150-
(write_mib_per_second * MIB_IN_BYTES) * copies_per_region
151-
) / MEGABIT_IN_BYTES
152-
bw_out = (
153-
(
154-
(read_mib_per_second * MIB_IN_BYTES)
155-
+ ((write_mib_per_second * MIB_IN_BYTES) * (copies_per_region - 1))
156-
)
157-
) / MEGABIT_IN_BYTES
158-
# BW = (in + out) because duplex then 40% headroom.
159-
needed_network_mbps = max(bw_in, bw_out) * 1.40
141+
# (Nick): Keep 40% of available bandwidth for node recovery
142+
# (Joey): For kafka BW = BW_write + BW_reads
143+
# let X = input write BW
144+
# BW_in = X * RF
145+
# BW_out = X * (consumers) + X * (RF - 1)
146+
# BW = (in + out) because duplex then 40% headroom.
147+
needed_network_mbps = max(bw_in, bw_out) * 1.40
160148

161-
needed_disk = math.ceil(
162-
desires.data_shape.estimated_state_size_gib.mid * copies_per_region,
163-
)
149+
needed_disk = math.ceil(
150+
desires.data_shape.estimated_state_size_gib.mid * copies_per_region,
151+
)
164152

165-
# Keep the last N seconds hot in cache
166-
needed_memory = (write_mib_per_second * hot_retention_seconds) // 1024
153+
# Keep the last N seconds hot in cache
154+
needed_memory = (write_mib_per_second * hot_retention_seconds) // 1024
167155

168156
# Now convert to per zone
169157
needed_cores = max(1, needed_cores // zones_per_region)
@@ -506,6 +494,27 @@ def default_desires(user_desires, extra_model_arguments: Dict[str, Any]):
506494
# write throughput * retention = usage
507495
state_gib = (write_bytes.mid * retention_secs) / GIB_IN_BYTES
508496

497+
# By supplying these buffers we can deconstruct observed utilization into
498+
# load versus buffer.
499+
buffers = Buffers(
500+
default=Buffer(ratio=1.5),
501+
desired={
502+
# Amount of compute buffer that we need to reserve in addition to
503+
# cpu_headroom_target that is reserved on a per instance basis
504+
"compute": Buffer(ratio=1.5, components=[BufferComponent.compute]),
505+
# This makes sure we use only 25% of the available storage
506+
"storage": Buffer(ratio=4.0, components=[BufferComponent.storage]),
507+
"background": Buffer(
508+
ratio=2.0,
509+
components=[
510+
BufferComponent.cpu,
511+
BufferComponent.network,
512+
"background",
513+
],
514+
),
515+
},
516+
)
517+
509518
return CapacityDesires(
510519
query_pattern=QueryPattern(
511520
access_pattern=AccessPattern.throughput,
@@ -545,6 +554,7 @@ def default_desires(user_desires, extra_model_arguments: Dict[str, Any]):
545554
# Connection overhead, kernel, etc ...
546555
reserved_instance_system_mem_gib=3,
547556
),
557+
buffers=buffers
548558
)
549559

550560

0 commit comments

Comments
 (0)