-
Notifications
You must be signed in to change notification settings - Fork 24
Integrate kafka capacity planner with buffers #133
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,6 +11,9 @@ | |
|
||
from service_capacity_modeling.interface import AccessConsistency | ||
from service_capacity_modeling.interface import AccessPattern | ||
from service_capacity_modeling.interface import Buffer | ||
from service_capacity_modeling.interface import BufferComponent | ||
from service_capacity_modeling.interface import Buffers | ||
from service_capacity_modeling.interface import CapacityDesires | ||
from service_capacity_modeling.interface import CapacityPlan | ||
from service_capacity_modeling.interface import CapacityRequirement | ||
|
@@ -33,6 +36,7 @@ | |
from service_capacity_modeling.models.common import compute_stateful_zone | ||
from service_capacity_modeling.models.common import normalize_cores | ||
from service_capacity_modeling.models.common import sqrt_staffed_cores | ||
from service_capacity_modeling.models.common import zonal_requirements_from_current | ||
from service_capacity_modeling.models.org.netflix.iso_date_math import iso_to_seconds | ||
|
||
logger = logging.getLogger(__name__) | ||
|
@@ -90,7 +94,7 @@ def _estimate_kafka_requirement( # pylint: disable=too-many-positional-argument | |
write_mib_per_second | ||
) | ||
# use the current cluster capacity if available | ||
current_zonal_cluster = ( | ||
current_zonal_capacity = ( | ||
None | ||
if desires.current_clusters is None | ||
else ( | ||
|
@@ -100,39 +104,34 @@ def _estimate_kafka_requirement( # pylint: disable=too-many-positional-argument | |
) | ||
) | ||
|
||
bw_in = ( | ||
(write_mib_per_second * MIB_IN_BYTES) * copies_per_region | ||
) / MEGABIT_IN_BYTES | ||
bw_out = ( | ||
( | ||
(read_mib_per_second * MIB_IN_BYTES) | ||
+ ((write_mib_per_second * MIB_IN_BYTES) * (copies_per_region - 1)) | ||
) | ||
) / MEGABIT_IN_BYTES | ||
if ( | ||
current_zonal_cluster | ||
and current_zonal_cluster.cluster_instance | ||
current_zonal_capacity | ||
and current_zonal_capacity.cluster_instance | ||
and required_zone_size is not None | ||
and desires.current_clusters is not None | ||
): | ||
# For now, use the highest average CPU utilization seen on the cluster | ||
cpu_utilization = current_zonal_cluster.cpu_utilization.high | ||
# Validate with data if we should instead estimate 99th percentile here to get | ||
# rid of spikes in collected cpu usage data ? | ||
# Use the formula: 99th Percentile ≈ Average + (Z-score * SD). | ||
# https://en.wikipedia.org/wiki/Normal_curve_equivalent#:~:text=The%2099th%20percentile%20in%20a,49/2.3263%20=%2021.06. | ||
# curr_cpu_util = cpu_util.mid + (2.33 * (cpu_util.high - cpu_util.low) / 6) | ||
current_utilized_cores = ( | ||
current_zonal_cluster.cluster_instance.cpu | ||
* required_zone_size | ||
* zones_per_region | ||
* cpu_utilization | ||
) / 100 | ||
|
||
# compute needed core capacity for cluster so avg cpu utilization for the | ||
# cluster stays under threshold for that tier | ||
needed_cores = int( | ||
current_utilized_cores / target_cpu_utilization(desires.service_tier) | ||
capacity_requirement = zonal_requirements_from_current( | ||
current_cluster=desires.current_clusters, | ||
buffers=desires.buffers, | ||
instance=current_zonal_capacity.cluster_instance, | ||
reference_shape=desires.reference_shape, | ||
) | ||
logger.debug("kafka needed cores: %s", needed_cores) | ||
# Normalize those cores to the target shape | ||
reference_shape = current_zonal_cluster.cluster_instance | ||
needed_cores = normalize_cores( | ||
core_count=needed_cores, | ||
target_shape=instance, | ||
reference_shape=reference_shape, | ||
needed_cores = int(capacity_requirement.cpu_cores.mid) | ||
needed_memory = int(capacity_requirement.mem_gib.mid) | ||
needed_disk = int(capacity_requirement.disk_gib.mid) | ||
needed_network_mbps = int(capacity_requirement.network_mbps.mid) | ||
logger.debug( | ||
"kafka normalized needed cores: %s", capacity_requirement.cpu_cores | ||
) | ||
logger.debug("kafka normalized needed cores: %s", needed_cores) | ||
else: | ||
# We have no existing utilization to go from | ||
needed_cores = normalize_cores( | ||
|
@@ -141,29 +140,20 @@ def _estimate_kafka_requirement( # pylint: disable=too-many-positional-argument | |
reference_shape=desires.reference_shape, | ||
) | ||
|
||
# (Nick): Keep 40% of available bandwidth for node recovery | ||
# (Joey): For kafka BW = BW_write + BW_reads | ||
# let X = input write BW | ||
# BW_in = X * RF | ||
# BW_out = X * (consumers) + X * (RF - 1) | ||
bw_in = ( | ||
(write_mib_per_second * MIB_IN_BYTES) * copies_per_region | ||
) / MEGABIT_IN_BYTES | ||
bw_out = ( | ||
( | ||
(read_mib_per_second * MIB_IN_BYTES) | ||
+ ((write_mib_per_second * MIB_IN_BYTES) * (copies_per_region - 1)) | ||
) | ||
) / MEGABIT_IN_BYTES | ||
# BW = (in + out) because duplex then 40% headroom. | ||
needed_network_mbps = max(bw_in, bw_out) * 1.40 | ||
# (Nick): Keep 40% of available bandwidth for node recovery | ||
# (Joey): For kafka BW = BW_write + BW_reads | ||
# let X = input write BW | ||
# BW_in = X * RF | ||
# BW_out = X * (consumers) + X * (RF - 1) | ||
# BW = (in + out) because duplex then 40% headroom. | ||
needed_network_mbps = max(bw_in, bw_out) * 1.40 | ||
|
||
needed_disk = math.ceil( | ||
desires.data_shape.estimated_state_size_gib.mid * copies_per_region, | ||
) | ||
needed_disk = math.ceil( | ||
desires.data_shape.estimated_state_size_gib.mid * copies_per_region, | ||
) | ||
|
||
# Keep the last N seconds hot in cache | ||
needed_memory = (write_mib_per_second * hot_retention_seconds) // 1024 | ||
# Keep the last N seconds hot in cache | ||
needed_memory = (write_mib_per_second * hot_retention_seconds) // 1024 | ||
|
||
# Now convert to per zone | ||
needed_cores = max(1, needed_cores // zones_per_region) | ||
|
@@ -506,6 +496,27 @@ def default_desires(user_desires, extra_model_arguments: Dict[str, Any]): | |
# write throughput * retention = usage | ||
state_gib = (write_bytes.mid * retention_secs) / GIB_IN_BYTES | ||
|
||
# By supplying these buffers we can deconstruct observed utilization into | ||
# load versus buffer. | ||
buffers = Buffers( | ||
default=Buffer(ratio=1.5), | ||
desired={ | ||
# Amount of compute buffer that we need to reserve in addition to | ||
# cpu_headroom_target that is reserved on a per instance basis | ||
"compute": Buffer(ratio=1.5, components=[BufferComponent.compute]), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ratio=2.5 here for tier0 and tier1 (to target 40% CPU utilization max on instance). It can be lower(1.5) for tier=2 and tier=3 |
||
# This makes sure we use only 25% of the available storage | ||
"storage": Buffer(ratio=4.0, components=[BufferComponent.storage]), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this can be 2.5X for storage (we are ok using upto 40% of disk) |
||
"background": Buffer( | ||
ratio=2.0, | ||
components=[ | ||
BufferComponent.cpu, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how is |
||
BufferComponent.network, | ||
"background", | ||
], | ||
), | ||
}, | ||
) | ||
|
||
return CapacityDesires( | ||
query_pattern=QueryPattern( | ||
access_pattern=AccessPattern.throughput, | ||
|
@@ -545,6 +556,7 @@ def default_desires(user_desires, extra_model_arguments: Dict[str, Any]): | |
# Connection overhead, kernel, etc ... | ||
reserved_instance_system_mem_gib=3, | ||
), | ||
buffers=buffers, | ||
) | ||
|
||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just using mid cpu cores for needed cores felt not very accurate, hence I had the logic that you have removed. What is rationale for using the cpu.mid here instead?