Skip to content

Integrate kafka capacity planner with buffers #133

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions service_capacity_modeling/models/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -718,10 +718,11 @@ def get_cores_from_current_capacity(
cpu_success_buffer = (1 - cpu_headroom_target(instance, buffers)) * 100
current_cpu_utilization = current_capacity.cpu_utilization.mid

if current_capacity.cluster_instance is None:
cluster_instance = shapes.instance(current_capacity.cluster_instance_name)
else:
cluster_instance = current_capacity.cluster_instance
cluster_instance = (
current_capacity.cluster_instance
if current_capacity.cluster_instance is not None
else shapes.instance(current_capacity.cluster_instance_name)
)

current_cores = cluster_instance.cpu * current_capacity.cluster_instance_count.mid

Expand Down Expand Up @@ -826,14 +827,16 @@ def get_disk_from_current_capacity(
current_disk_utilization = current_capacity.disk_utilization_gib.mid

if current_capacity.cluster_instance is None:
cluster_instance = shapes.instance(current_capacity.cluster_instance_name)
current_cluster_instance = shapes.instance(
current_capacity.cluster_instance_name
)
else:
cluster_instance = current_capacity.cluster_instance
current_cluster_instance = current_capacity.cluster_instance

assert cluster_instance.drive is not None, "Drive should not be None"
assert current_cluster_instance.drive is not None, "Drive should not be None"

zonal_disk_allocated = (
cluster_instance.drive.max_size_gib
current_cluster_instance.drive.max_size_gib
* current_capacity.cluster_instance_count.mid
)

Expand Down
112 changes: 62 additions & 50 deletions service_capacity_modeling/models/org/netflix/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@

from service_capacity_modeling.interface import AccessConsistency
from service_capacity_modeling.interface import AccessPattern
from service_capacity_modeling.interface import Buffer
from service_capacity_modeling.interface import BufferComponent
from service_capacity_modeling.interface import Buffers
from service_capacity_modeling.interface import CapacityDesires
from service_capacity_modeling.interface import CapacityPlan
from service_capacity_modeling.interface import CapacityRequirement
Expand All @@ -33,6 +36,7 @@
from service_capacity_modeling.models.common import compute_stateful_zone
from service_capacity_modeling.models.common import normalize_cores
from service_capacity_modeling.models.common import sqrt_staffed_cores
from service_capacity_modeling.models.common import zonal_requirements_from_current
from service_capacity_modeling.models.org.netflix.iso_date_math import iso_to_seconds

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -90,7 +94,7 @@ def _estimate_kafka_requirement( # pylint: disable=too-many-positional-argument
write_mib_per_second
)
# use the current cluster capacity if available
current_zonal_cluster = (
current_zonal_capacity = (
None
if desires.current_clusters is None
else (
Expand All @@ -100,39 +104,34 @@ def _estimate_kafka_requirement( # pylint: disable=too-many-positional-argument
)
)

bw_in = (
(write_mib_per_second * MIB_IN_BYTES) * copies_per_region
) / MEGABIT_IN_BYTES
bw_out = (
(
(read_mib_per_second * MIB_IN_BYTES)
+ ((write_mib_per_second * MIB_IN_BYTES) * (copies_per_region - 1))
)
) / MEGABIT_IN_BYTES
if (
current_zonal_cluster
and current_zonal_cluster.cluster_instance
current_zonal_capacity
and current_zonal_capacity.cluster_instance
and required_zone_size is not None
and desires.current_clusters is not None
):
# For now, use the highest average CPU utilization seen on the cluster
cpu_utilization = current_zonal_cluster.cpu_utilization.high
# Validate with data if we should instead estimate 99th percentile here to get
# rid of spikes in collected cpu usage data ?
# Use the formula: 99th Percentile ≈ Average + (Z-score * SD).
# https://en.wikipedia.org/wiki/Normal_curve_equivalent#:~:text=The%2099th%20percentile%20in%20a,49/2.3263%20=%2021.06.
# curr_cpu_util = cpu_util.mid + (2.33 * (cpu_util.high - cpu_util.low) / 6)
current_utilized_cores = (
current_zonal_cluster.cluster_instance.cpu
* required_zone_size
* zones_per_region
* cpu_utilization
) / 100

# compute needed core capacity for cluster so avg cpu utilization for the
# cluster stays under threshold for that tier
needed_cores = int(
current_utilized_cores / target_cpu_utilization(desires.service_tier)
capacity_requirement = zonal_requirements_from_current(
current_cluster=desires.current_clusters,
buffers=desires.buffers,
instance=current_zonal_capacity.cluster_instance,
reference_shape=desires.reference_shape,
)
logger.debug("kafka needed cores: %s", needed_cores)
# Normalize those cores to the target shape
reference_shape = current_zonal_cluster.cluster_instance
needed_cores = normalize_cores(
core_count=needed_cores,
target_shape=instance,
reference_shape=reference_shape,
needed_cores = int(capacity_requirement.cpu_cores.mid)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just using mid cpu cores for needed cores felt not very accurate, hence I had the logic that you have removed. What is rationale for using the cpu.mid here instead?

needed_memory = int(capacity_requirement.mem_gib.mid)
needed_disk = int(capacity_requirement.disk_gib.mid)
needed_network_mbps = int(capacity_requirement.network_mbps.mid)
logger.debug(
"kafka normalized needed cores: %s", capacity_requirement.cpu_cores
)
logger.debug("kafka normalized needed cores: %s", needed_cores)
else:
# We have no existing utilization to go from
needed_cores = normalize_cores(
Expand All @@ -141,29 +140,20 @@ def _estimate_kafka_requirement( # pylint: disable=too-many-positional-argument
reference_shape=desires.reference_shape,
)

# (Nick): Keep 40% of available bandwidth for node recovery
# (Joey): For kafka BW = BW_write + BW_reads
# let X = input write BW
# BW_in = X * RF
# BW_out = X * (consumers) + X * (RF - 1)
bw_in = (
(write_mib_per_second * MIB_IN_BYTES) * copies_per_region
) / MEGABIT_IN_BYTES
bw_out = (
(
(read_mib_per_second * MIB_IN_BYTES)
+ ((write_mib_per_second * MIB_IN_BYTES) * (copies_per_region - 1))
)
) / MEGABIT_IN_BYTES
# BW = (in + out) because duplex then 40% headroom.
needed_network_mbps = max(bw_in, bw_out) * 1.40
# (Nick): Keep 40% of available bandwidth for node recovery
# (Joey): For kafka BW = BW_write + BW_reads
# let X = input write BW
# BW_in = X * RF
# BW_out = X * (consumers) + X * (RF - 1)
# BW = (in + out) because duplex then 40% headroom.
needed_network_mbps = max(bw_in, bw_out) * 1.40

needed_disk = math.ceil(
desires.data_shape.estimated_state_size_gib.mid * copies_per_region,
)
needed_disk = math.ceil(
desires.data_shape.estimated_state_size_gib.mid * copies_per_region,
)

# Keep the last N seconds hot in cache
needed_memory = (write_mib_per_second * hot_retention_seconds) // 1024
# Keep the last N seconds hot in cache
needed_memory = (write_mib_per_second * hot_retention_seconds) // 1024

# Now convert to per zone
needed_cores = max(1, needed_cores // zones_per_region)
Expand Down Expand Up @@ -506,6 +496,27 @@ def default_desires(user_desires, extra_model_arguments: Dict[str, Any]):
# write throughput * retention = usage
state_gib = (write_bytes.mid * retention_secs) / GIB_IN_BYTES

# By supplying these buffers we can deconstruct observed utilization into
# load versus buffer.
buffers = Buffers(
default=Buffer(ratio=1.5),
desired={
# Amount of compute buffer that we need to reserve in addition to
# cpu_headroom_target that is reserved on a per instance basis
"compute": Buffer(ratio=1.5, components=[BufferComponent.compute]),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ratio=2.5 here for tier0 and tier1 (to target 40% CPU utilization max on instance). It can be lower(1.5) for tier=2 and tier=3

# This makes sure we use only 25% of the available storage
"storage": Buffer(ratio=4.0, components=[BufferComponent.storage]),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be 2.5X for storage (we are ok using upto 40% of disk)

"background": Buffer(
ratio=2.0,
components=[
BufferComponent.cpu,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how is background cpu buffer different from the compute buffer above?

BufferComponent.network,
"background",
],
),
},
)

return CapacityDesires(
query_pattern=QueryPattern(
access_pattern=AccessPattern.throughput,
Expand Down Expand Up @@ -545,6 +556,7 @@ def default_desires(user_desires, extra_model_arguments: Dict[str, Any]):
# Connection overhead, kernel, etc ...
reserved_instance_system_mem_gib=3,
),
buffers=buffers,
)


Expand Down
Loading