-
Notifications
You must be signed in to change notification settings - Fork 17
https://github.com/cloudpilot-ai/priceserver source code is not publically available. #223
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Hi, @dm3ch Here is the core code: package alibabacloud
import (
"context"
"embed"
"encoding/json"
"fmt"
"io"
"math"
"math/rand"
"net/http"
"net/url"
"os"
"regexp"
"sort"
"strconv"
"strings"
"sync"
"time"
openapi "github.com/alibabacloud-go/darabonba-openapi/v2/client"
ecsclient "github.com/alibabacloud-go/ecs-20140526/v4/client"
util "github.com/alibabacloud-go/tea-utils/v2/service"
"github.com/alibabacloud-go/tea/tea"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"github.com/cloudpilot-ai/priceserver/pkg/apis"
"github.com/cloudpilot-ai/priceserver/pkg/tools"
)
//go:embed builtin-data/*.json
var file embed.FS
type AKSKPair struct {
AK string
SK string
}
func ExtractAlibabaCloudAKSKPool() []AKSKPair {
akskPool := os.Getenv(apis.AlibabaCloudAKSKPoolEnv)
if akskPool == "" {
return nil
}
var akskPair []AKSKPair
for _, aksk := range strings.Split(akskPool, ",") {
aksk = strings.TrimSpace(aksk)
if aksk == "" {
continue
}
akskArray := strings.Split(aksk, ":")
if len(akskArray) != 2 {
continue
}
akskPair = append(akskPair, AKSKPair{AK: akskArray[0], SK: akskArray[1]})
}
return akskPair
}
type AlibabaCloudPriceClient struct {
akskPool []AKSKPair
regionList []string
dataMutex sync.RWMutex
priceData map[string]*apis.RegionalInstancePrice
instanceTypes []*apis.Instance
// instanceTypeName -> instanceInfo
instanceInfos map[string]*apis.InstanceInfo
// GPUArchitecture -> GPUSpec
gpuInfos map[string]*apis.GPUSpec
}
func NewAlibabaCloudPriceClient(akskPool []AKSKPair, initialSpotUpdate bool, loadPriceData bool) (*AlibabaCloudPriceClient, error) {
client := &AlibabaCloudPriceClient{
akskPool: akskPool,
regionList: []string{},
priceData: map[string]*apis.RegionalInstancePrice{},
}
if err := client.initialRegions(); err != nil {
return nil, err
}
if err := client.loadGPUModels(); err != nil {
return nil, err
}
if loadPriceData {
data, err := file.ReadFile("builtin-data/alibabacloud_price.json")
if err != nil {
return nil, err
}
if err = json.Unmarshal(data, &client.priceData); err != nil {
return nil, err
}
if initialSpotUpdate {
client.refreshSpotPrice()
}
client.refreshInstanceTypeMetadataAndAvailableRegion()
}
return client, nil
}
func (a *AlibabaCloudPriceClient) Run(ctx context.Context) {
odTicker := time.NewTicker(time.Hour * 24 * 7)
defer odTicker.Stop()
spotTicker := time.NewTicker(time.Minute * 30)
defer spotTicker.Stop()
for {
select {
case <-odTicker.C:
a.RefreshOnDemandPrice()
a.refreshInstanceTypeMetadataAndAvailableRegion()
case <-spotTicker.C:
a.refreshSpotPrice()
case <-ctx.Done():
return
}
}
}
func (a *AlibabaCloudPriceClient) loadGPUModels() error {
data, err := file.ReadFile("builtin-data/gpu_models.json")
if err != nil {
return err
}
a.gpuInfos = map[string]*apis.GPUSpec{}
return json.Unmarshal(data, &a.gpuInfos)
}
func (a *AlibabaCloudPriceClient) getGPUSpec(name string, gpuCount int) *apis.GPUSpec {
if name == "" {
return nil
}
info, ok := a.gpuInfos[name]
if !ok {
klog.Warningf("The GPU info not found: %s", name)
return nil
}
newInfo := info.DeepCopy()
newInfo.GPUCount = gpuCount
// We save the single GPU spec on the cache if the instance has multi GPU, calculate it.
if gpuCount > 1 {
newInfo.GPUMemory *= gpuCount
newInfo.CUDACores *= gpuCount
}
if newInfo.Partition > 0 {
newInfo.GPUMemory /= newInfo.Partition
newInfo.CUDACores /= newInfo.Partition
}
return newInfo
}
func (a *AlibabaCloudPriceClient) refreshInstanceTypeMetadataAndAvailableRegion() {
a.dataMutex.Lock()
defer a.dataMutex.Unlock()
a.instanceTypes = []*apis.Instance{}
a.instanceInfos = map[string]*apis.InstanceInfo{}
for region, data := range a.priceData {
for instanceType, priceData := range data.InstanceTypePrices {
if _, ok := a.instanceInfos[instanceType]; !ok {
a.instanceInfos[instanceType] = &apis.InstanceInfo{
InstanceTypeMetadata: priceData.InstanceTypeMetadata,
RegionsSet: sets.Set[string]{},
GPU: a.getGPUSpec(priceData.InstanceTypeMetadata.GPUArchitecture, int(priceData.InstanceTypeMetadata.GPU)),
}
a.instanceTypes = append(a.instanceTypes, &apis.Instance{
Name: instanceType,
InstanceTypeMetadata: priceData.InstanceTypeMetadata,
})
}
a.instanceInfos[instanceType].RegionsSet.Insert(region)
}
}
for _, info := range a.instanceInfos {
regions := info.RegionsSet.UnsortedList()
// Sort the regions, the "us" region has a bigger priority.
sort.Slice(regions, func(i, j int) bool {
startsWithUSi := strings.HasPrefix(regions[i], "cn")
startsWithUSj := strings.HasPrefix(regions[j], "cn")
if startsWithUSi && !startsWithUSj {
return true
} else if !startsWithUSi && startsWithUSj {
return false
}
return regions[i] < regions[j]
})
info.Regions = regions
}
// Sort the instanceTypes by 1. Family 2. CPU 3. Memory 4. GPU
sort.Slice(a.instanceTypes, func(i, j int) bool {
familyI := strings.Split(a.instanceTypes[i].Name, ".")[0]
familyJ := strings.Split(a.instanceTypes[j].Name, ".")[0]
if familyI != familyJ {
return familyI < familyJ
}
infoI := a.instanceTypes[i].InstanceTypeMetadata
infoJ := a.instanceTypes[j].InstanceTypeMetadata
if infoI.VCPU != infoJ.VCPU {
return infoI.VCPU < infoJ.VCPU
}
if infoI.Memory != infoJ.Memory {
return infoI.Memory < infoJ.Memory
}
return infoI.GPU < infoJ.GPU
})
}
func getTargetFormatDate() string {
timeTarget := time.Now().In(time.UTC)
// if the current time is, like 14:00:1, if we use 14:00:00, there maybe no entries, let's go with one hour earlier
return fmt.Sprintf("%04d-%02d-%02dT%02d:00:00Z",
timeTarget.Year(), timeTarget.Month(), timeTarget.Day(), timeTarget.Hour()-1)
}
func getSpotPrice(client *ecsclient.Client, region, instanceType string) (map[string]float64, error) {
timeCurrent := getTargetFormatDate()
describeSpotPriceHistoryRequest := &ecsclient.DescribeSpotPriceHistoryRequest{
RegionId: tea.String(region),
InstanceType: tea.String(instanceType),
NetworkType: tea.String("vpc"),
StartTime: tea.String(timeCurrent),
}
priceResp, err := client.DescribeSpotPriceHistoryWithOptions(describeSpotPriceHistoryRequest, &util.RuntimeOptions{})
if err != nil {
klog.Errorf("Failed to get price of instance %s in region %s:%v", instanceType, region, err)
return nil, err
}
if len(priceResp.Body.SpotPrices.SpotPriceType) == 0 {
klog.Warningf("No spot price available for instance %s in region %s", instanceType, region)
return nil, nil
}
ret := map[string]float64{}
for _, spotPrice := range priceResp.Body.SpotPrices.SpotPriceType {
ret[*spotPrice.ZoneId] = float64(tea.Float32Value(spotPrice.SpotPrice))
}
return ret, nil
}
type RegionalSpotInstancePrice struct {
Region string `json:"region"`
InstanceTypes *map[string]*apis.InstanceTypePrice `json:"instanceTypes"`
}
type RegionalSpotInstancePriceSplit struct {
Region string `json:"region"`
InstanceType string `json:"instanceType"`
Info *apis.InstanceTypePrice `json:"info"`
}
func (a *AlibabaCloudPriceClient) refreshSpotPrice() {
rsiPrices := make([]*RegionalSpotInstancePrice, len(a.regionList))
workqueue.ParallelizeUntil(context.Background(), 7, len(a.regionList), func(i int) {
instanceTypes, err := a.listInstanceTypes(a.regionList[i])
if err != nil {
klog.Errorf("Failed to list instance types in region %s, err: %v", a.regionList[i], err)
return
}
rsiPrices[i] = &RegionalSpotInstancePrice{
Region: a.regionList[i],
InstanceTypes: &instanceTypes,
}
})
n := 0
for i := range rsiPrices {
if rsiPrices[i] != nil {
n += len(*rsiPrices[i].InstanceTypes)
}
}
rsiPricess := make([]RegionalSpotInstancePriceSplit, 0, n)
for i := range rsiPrices {
if rsiPrices[i] != nil {
for instanceType, info := range *rsiPrices[i].InstanceTypes {
rsiPricess = append(rsiPricess, RegionalSpotInstancePriceSplit{
Region: rsiPrices[i].Region,
InstanceType: instanceType,
Info: info,
})
}
}
}
workqueue.ParallelizeUntil(context.Background(), 50, n, func(i int) {
client, err := a.createECSClient(rsiPricess[i].Region)
if err != nil {
klog.Errorf("Failed to create ECS client in region %s:%v", rsiPricess[i].Region, err)
return
}
spotPrice, err := getSpotPrice(client, rsiPricess[i].Region, rsiPricess[i].InstanceType)
if err != nil {
klog.Errorf("Failed to get spot price in region %s:%v", rsiPricess[i].Region, err)
return
}
rsiPricess[i].Info.SpotPricePerHour = spotPrice
})
a.dataMutex.Lock()
defer a.dataMutex.Unlock()
for i := range rsiPricess {
if _, ok := a.priceData[rsiPricess[i].Region]; !ok {
a.priceData[rsiPricess[i].Region] = &apis.RegionalInstancePrice{
InstanceTypePrices: map[string]*apis.InstanceTypePrice{},
}
}
if _, ok := a.priceData[rsiPricess[i].Region].InstanceTypePrices[rsiPricess[i].InstanceType]; ok {
rsiPricess[i].Info.OnDemandPricePerHour = a.priceData[rsiPricess[i].Region].InstanceTypePrices[rsiPricess[i].InstanceType].OnDemandPricePerHour
}
a.priceData[rsiPricess[i].Region].InstanceTypePrices[rsiPricess[i].InstanceType] = rsiPricess[i].Info
}
klog.Infof("All spot prices are refreshed for AlibabaCloud")
}
type ECSPrice struct {
PricingInfo map[string]ECSPriceDetail `json:"pricingInfo"`
}
type ECSPriceDetail struct {
Hours []ECSHoursPrice `json:"hours"`
}
type ECSHoursPrice struct {
Price string `json:"price"`
}
func getECSPrice() (map[string]map[string]float64, error) {
baseUrl := "https://www.aliyun.com/price/ecs/ecs-pricing/zh"
baseResp, err := http.Get(baseUrl)
if err != nil {
klog.Errorf("Get ecs price failed: %v", err)
return nil, err
}
defer baseResp.Body.Close()
// Extract content
data, err := io.ReadAll(baseResp.Body)
if err != nil {
klog.Errorf("Get ecs price failed: %v", err)
return nil, err
}
// This is referring https://www.aliyun.com/price/ecs/ecs-pricing/zh#/?_k=zmi2qe
pattern := `https://g.alicdn.com/aliyun/ecs-price-info/[0-9.]+`
re := regexp.MustCompile(pattern)
priceUrl := re.FindString(string(data))
if priceUrl == "" {
klog.Errorf("Cloud not find price url")
return nil, fmt.Errorf("cloud not find price url")
}
reqUrl, err := url.JoinPath(priceUrl, "/price/download/instancePrice.json")
if err != nil {
klog.Errorf("Failed to get price request url: %v", err)
return nil, err
}
resp, err := http.Get(reqUrl)
if err != nil {
klog.Errorf("Get ecs price failed: %v", err)
return nil, err
}
defer resp.Body.Close()
data, err = io.ReadAll(resp.Body)
if err != nil {
klog.Errorf("Get ecs price failed: %v", err)
return nil, err
}
var ecsPrice ECSPrice
err = json.Unmarshal(data, &ecsPrice)
if err != nil {
klog.Errorf("Get ecs price failed: %v", err)
return nil, err
}
ret := map[string]map[string]float64{}
for k, v := range ecsPrice.PricingInfo {
parts := strings.Split(k, "::")
regions := parts[0]
os := parts[3]
instanceType := parts[1]
if os == "linux" {
price, err := strconv.ParseFloat(v.Hours[0].Price, 64)
if err != nil {
klog.Errorf("Get ecs price failed: %v", err)
return nil, err
}
if ret[regions] == nil {
ret[regions] = map[string]float64{}
}
ret[regions][instanceType] = price
}
}
return ret, nil
}
func (a *AlibabaCloudPriceClient) RefreshOnDemandPrice() {
priceInfo, err := getECSPrice()
if err != nil {
return
}
handleFunc := func(paras ...interface{}) {
region := paras[0].(string)
instanceTypes, err := a.listInstanceTypes(region)
if err != nil {
return
}
for instanceType := range instanceTypes {
instanceTypes[instanceType].OnDemandPricePerHour = priceInfo[region][instanceType]
// Update the SpotPricePerHour if we can fine it on the priceData.
if _, ok := a.priceData[region]; !ok {
continue
}
if _, ok := a.priceData[region].InstanceTypePrices[instanceType]; !ok {
continue
}
instanceTypes[instanceType].SpotPricePerHour = a.priceData[region].InstanceTypePrices[instanceType].SpotPricePerHour
}
a.dataMutex.Lock()
a.priceData[region] = &apis.RegionalInstancePrice{InstanceTypePrices: instanceTypes}
a.dataMutex.Unlock()
}
priceTask := tools.NewParallelTask(handleFunc)
for _, region := range a.regionList {
klog.Infof("Start to handle region %s for on-demand", region)
priceTask.Add([]interface{}{region})
}
priceTask.Process()
klog.Infof("All on-demand prices are refreshed for AlibabaCloud")
}
func (a *AlibabaCloudPriceClient) listInstanceTypes(region string) (map[string]*apis.InstanceTypePrice, error) {
client, err := a.createECSClient(region)
if err != nil {
return nil, err
}
typesResp, err := client.DescribeInstanceTypes(&ecsclient.DescribeInstanceTypesRequest{})
if err != nil {
klog.Errorf("Failed to list instance types in region %s, err: %v", region, err)
return nil, err
}
availableTypesResp, err := client.DescribeAvailableResource(
&ecsclient.DescribeAvailableResourceRequest{
RegionId: tea.String(region),
DestinationResource: tea.String("InstanceType"),
InstanceChargeType: tea.String("PostPaid"),
SpotStrategy: tea.String("SpotAsPriceGo"),
})
if err != nil {
klog.Errorf("Failed to list available instance types in region %s, err: %v", region, err)
return nil, err
}
if availableTypesResp.Body == nil ||
availableTypesResp.Body.AvailableZones == nil ||
len(availableTypesResp.Body.AvailableZones.AvailableZone) == 0 {
klog.Error("Failed to get availabe instance types data")
return nil, fmt.Errorf("failed to get availabe instance types data")
}
availableTypesZone := map[string]sets.Set[string]{}
availableTypesRegion := map[string]sets.Set[string]{}
for _, az := range availableTypesResp.Body.AvailableZones.AvailableZone {
if az.AvailableResources == nil || len(az.AvailableResources.AvailableResource) == 0 {
continue
}
azZone := tea.StringValue(az.ZoneId)
azRegion := tea.StringValue(az.RegionId)
for _, ar := range az.AvailableResources.AvailableResource {
if ar.SupportedResources == nil || len(ar.SupportedResources.SupportedResource) == 0 {
continue
}
for _, sr := range ar.SupportedResources.SupportedResource {
instanceType := tea.StringValue(sr.Value)
if _, ok := availableTypesZone[instanceType]; !ok {
availableTypesZone[instanceType] = sets.Set[string]{}
}
if _, ok := availableTypesRegion[instanceType]; !ok {
availableTypesRegion[instanceType] = sets.Set[string]{}
}
availableTypesZone[instanceType].Insert(azZone)
availableTypesRegion[instanceType].Insert(azRegion)
}
}
}
ret := map[string]*apis.InstanceTypePrice{}
for _, item := range typesResp.Body.InstanceTypes.InstanceType {
regions, ok := availableTypesRegion[tea.StringValue(item.InstanceTypeId)]
if !ok || !regions.Has(region) {
continue
}
gpuSpec := tea.StringValue(item.GPUSpec)
// AlibabaCloud GPUSpec nil value.
if gpuSpec == "0" || gpuSpec == "NULL" {
gpuSpec = ""
}
zones := availableTypesZone[tea.StringValue(item.InstanceTypeId)].UnsortedList()
sort.Strings(zones)
ret[tea.StringValue(item.InstanceTypeId)] = &apis.InstanceTypePrice{
InstanceTypeMetadata: apis.InstanceTypeMetadata{
Arch: extractECSArch(tea.StringValue(item.CpuArchitecture)),
PhysicalProcessor: tea.StringValue(item.PhysicalProcessorModel),
ClockSpeed: extractClockSpeed(item.CpuSpeedFrequency, item.CpuTurboFrequency),
VCPU: float64(tea.Int32Value(item.CpuCoreCount)),
Memory: float64(tea.Float32Value(item.MemorySize)),
GPU: float64(tea.Int32Value(item.GPUAmount)),
GPUArchitecture: gpuSpec,
},
Zones: zones,
}
}
return ret, nil
}
func extractECSArch(unFormatedArch string) string {
switch unFormatedArch {
case "X86":
return "amd64"
case "ARM":
return "arm64"
default:
klog.Warningf("Failed to parsh the arch %s", unFormatedArch)
return "amd64"
}
}
// Export clock speed and keep two decimal places.
func extractClockSpeed(frequency, turboFrequency *float32) float64 {
clockSpeed := max(tea.Float32Value(frequency), tea.Float32Value(turboFrequency))
return math.Round(float64(clockSpeed*100)) / 100
}
var (
ignoreRegions = map[string]struct{}{
"ap-southeast-2": {}, // ap-southeast-2(Sydney) is shutdown
}
)
func (a *AlibabaCloudPriceClient) initialRegions() error {
// We use cn-hangzhou as the default region to list regions
client, err := a.createECSClient("cn-hangzhou")
if err != nil {
return err
}
resp, err := client.DescribeRegionsWithOptions(&ecsclient.DescribeRegionsRequest{}, &util.RuntimeOptions{})
if err != nil {
klog.Errorf("Failed to list regions:%v", err)
return err
}
for _, regionData := range resp.Body.Regions.Region {
if _, ok := ignoreRegions[tea.StringValue(regionData.RegionId)]; ok {
continue
}
a.regionList = append(a.regionList, tea.StringValue(regionData.RegionId))
}
return nil
}
func (a *AlibabaCloudPriceClient) createECSClient(region string) (*ecsclient.Client, error) {
// Take random one ak/sk from the pool, avoid the request limit.
pick := a.akskPool[rand.Intn(len(a.akskPool))]
config := &openapi.Config{
AccessKeyId: tea.String(pick.AK),
AccessKeySecret: tea.String(pick.SK),
RegionId: tea.String(region),
}
client, err := ecsclient.NewClient(config)
if err != nil {
klog.Errorf("Failed to create ecs client:%v", err)
return nil, err
}
return client, nil
}
func (a *AlibabaCloudPriceClient) ListRegionsInstancesPrice() map[string]*apis.RegionalInstancePrice {
a.dataMutex.RLock()
defer a.dataMutex.RUnlock()
ret := make(map[string]*apis.RegionalInstancePrice)
for k, v := range a.priceData {
ret[k] = v.DeepCopy()
}
return ret
}
func (a *AlibabaCloudPriceClient) ListInstancesPrice(region string) *map[string]apis.RegionalInstancePrice {
a.dataMutex.RLock()
defer a.dataMutex.RUnlock()
d, ok := a.priceData[region]
if !ok {
return nil
}
return &map[string]apis.RegionalInstancePrice{
region: *d.DeepCopy(),
}
}
func (a *AlibabaCloudPriceClient) GetInstancePrice(region, instanceType string) *apis.InstanceTypePrice {
a.dataMutex.RLock()
defer a.dataMutex.RUnlock()
regionData, ok := a.priceData[region]
if !ok {
return nil
}
d, ok := regionData.InstanceTypePrices[instanceType]
if !ok {
return nil
}
return d
}
func (a *AlibabaCloudPriceClient) ListInstanceTypes() []*apis.Instance {
a.dataMutex.RLock()
defer a.dataMutex.RUnlock()
return a.instanceTypes
}
func (a *AlibabaCloudPriceClient) GetInstanceInfo(instanceType string) *apis.InstanceInfo {
a.dataMutex.RLock()
defer a.dataMutex.RUnlock()
return a.instanceInfos[instanceType]
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
https://github.com/cloudpilot-ai/karpenter-provider-alibabacloud/blob/e92dee8aa5e8de7c9b2e1268238b8b163a1eb310/pkg/providers/pricing/pricing.go#L92C52-L92C96 mentions that it's available for everyone, while it is not.
The text was updated successfully, but these errors were encountered: