Skip to content

feat(scheduler): support optional cpu constraints and make sort by guest count optional #50

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions api/v1alpha1/proxmoxcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,15 @@ type SchedulerHints struct {
// By default 100% of a node's memory will be used for allocation.
// +optional
MemoryAdjustment *uint64 `json:"memoryAdjustment,omitempty"`

// Like MemoryAdjustment, but for CPU resources.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please write a meaningful description here.
Re-explain the adjustment term here as well.

// Defaults to 0 (disabled), as CPU is a compressible resource.
// +optional
CPUAdjustment *uint64 `json:"cpuAdjustment,omitempty"`

// +optional
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we also need a description and why this is needed.

// +kubebuilder:default=true
PreferLowerGuestCount bool `json:"preferLowerGuestCount,omitempty"`
}

// GetMemoryAdjustment returns the memory adjustment percentage to use within the scheduler.
Expand All @@ -91,6 +100,17 @@ func (sh *SchedulerHints) GetMemoryAdjustment() uint64 {
return memoryAdjustment
}

// GetCPUAdjustment returns the cpu adjustment percentage to use within the scheduler.
func (sh *SchedulerHints) GetCPUAdjustment() uint64 {
cpuAdjustment := uint64(0)

if sh != nil {
cpuAdjustment = ptr.Deref(sh.CPUAdjustment, 0)
}

return cpuAdjustment
}

// ProxmoxClusterStatus defines the observed state of ProxmoxCluster.
type ProxmoxClusterStatus struct {
// Ready indicates that the cluster is ready.
Expand Down
5 changes: 5 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ spec:
to a node's resources, to allow for overprovisioning or to ensure
a node will always have a safety buffer.
properties:
cpuAdjustment:
description: Like MemoryAdjustment, but for CPU resources. Defaults
to 0 (disabled), as CPU is a compressible resource.
format: int64
type: integer
memoryAdjustment:
description: MemoryAdjustment allows to adjust a node's memory
by a given percentage. For example, setting it to 300 allows
Expand All @@ -146,6 +151,9 @@ spec:
default 100% of a node's memory will be used for allocation.
format: int64
type: integer
preferLowerGuestCount:
default: true
type: boolean
type: object
required:
- dnsServers
Expand Down
103 changes: 65 additions & 38 deletions internal/service/scheduler/vmscheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,16 @@ import (
"sigs.k8s.io/cluster-api/util"
)

// InsufficientMemoryError is used when the scheduler cannot assign a VM to a node because it would
// exceed the node's memory limit.
type InsufficientMemoryError struct {
node string
available uint64
requested uint64
// InsufficientResourcesError is used when the scheduler cannot assign a VM to a node because no node
// would be able to provide the requested resources.
type InsufficientResourcesError struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess its better to separate the error by CPU, Memory.

requestedMemory uint64
requestedCores uint64
}

func (err InsufficientMemoryError) Error() string {
return fmt.Sprintf("cannot reserve %dB of memory on node %s: %dB available memory left",
err.requested, err.node, err.available)
func (err InsufficientResourcesError) Error() string {
return fmt.Sprintf("cannot reserve %dB of memory and/or %d vCores in cluster",
err.requestedMemory, err.requestedCores)
}

// ScheduleVM decides which node to a ProxmoxMachine should be scheduled on.
Expand All @@ -64,69 +63,92 @@ func selectNode(
allowedNodes []string,
schedulerHints *infrav1.SchedulerHints,
) (string, error) {
byMemory := make(sortByAvailableMemory, len(allowedNodes))
for i, nodeName := range allowedNodes {
mem, err := client.GetReservableMemoryBytes(ctx, nodeName, schedulerHints.GetMemoryAdjustment())
var nodes []nodeInfo

requestedMemory := uint64(machine.Spec.MemoryMiB) * 1024 * 1024 // convert to bytes
requestedCores := uint64(machine.Spec.NumCores)

for _, nodeName := range allowedNodes {
mem, cpu, err := client.GetReservableResources(
ctx,
nodeName,
schedulerHints.GetMemoryAdjustment(),
schedulerHints.GetCPUAdjustment(),
)
if err != nil {
return "", err
}
byMemory[i] = nodeInfo{Name: nodeName, AvailableMemory: mem}
}

sort.Sort(byMemory)
// if MemoryAdjustment is explicitly set to 0 (zero), pretend we have enough mem for the guest
if schedulerHints.GetMemoryAdjustment() == 0 {
mem = requestedMemory
}
// if CPUAdjustment is explicitly set to 0 (zero), pretend we have enough cpu for the guest
if schedulerHints.GetCPUAdjustment() == 0 {
cpu = requestedCores
}

requestedMemory := uint64(machine.Spec.MemoryMiB) * 1024 * 1024 // convert to bytes
if requestedMemory > byMemory[0].AvailableMemory {
// no more space on the node with the highest amount of available memory
return "", InsufficientMemoryError{
node: byMemory[0].Name,
available: byMemory[0].AvailableMemory,
requested: requestedMemory,
node := nodeInfo{Name: nodeName, AvailableMemory: mem, AvailableCPU: cpu}
if node.AvailableMemory >= requestedMemory && node.AvailableCPU >= requestedCores {
nodes = append(nodes, node)
}
}

if len(nodes) == 0 {
return "", InsufficientResourcesError{requestedMemory, requestedCores}
}

// Sort nodes by free memory and then free CPU in descending order
byResources := make(sortByResources, len(nodes))
copy(byResources, nodes)
sort.Sort(byResources)

decision := byResources[0].Name

// count the existing vms per node
nodeCounter := make(map[string]int)
for _, nl := range locations {
nodeCounter[nl.Node]++
}

for i, info := range byMemory {
for i, info := range byResources {
info.ScheduledVMs = nodeCounter[info.Name]
byMemory[i] = info
byResources[i] = info
}

byReplicas := make(sortByReplicas, len(byMemory))
copy(byReplicas, byMemory)
byReplicas := make(sortByReplicas, len(byResources))
copy(byReplicas, byResources)

sort.Sort(byReplicas)

decision := byMemory[0].Name
if requestedMemory < byReplicas[0].AvailableMemory {
// distribute round-robin when memory allows it
// if memory allocation allows it, pick the node with the least amount of guests
if schedulerHints.PreferLowerGuestCount {
decision = byReplicas[0].Name
}

if logger := logr.FromContextOrDiscard(ctx); logger.V(4).Enabled() {
// only construct values when message should actually be logged
logger.Info("Scheduler decision",
"byReplicas", byReplicas.String(),
"byMemory", byMemory.String(),
"byResources", byResources.String(),
"requestedMemory", requestedMemory,
"requestedCores", requestedCores,
"resultNode", decision,
"schedulerHints", schedulerHints,
)
}

return decision, nil
}

type resourceClient interface {
GetReservableMemoryBytes(context.Context, string, uint64) (uint64, error)
GetReservableResources(context.Context, string, uint64, uint64) (uint64, uint64, error)
}

type nodeInfo struct {
Name string `json:"node"`
AvailableMemory uint64 `json:"mem"`
AvailableCPU uint64 `json:"cpu"`
ScheduledVMs int `json:"vms"`
}

Expand All @@ -143,16 +165,21 @@ func (a sortByReplicas) String() string {
return string(o)
}

type sortByAvailableMemory []nodeInfo
type sortByResources []nodeInfo

func (a sortByResources) Len() int { return len(a) }
func (a sortByResources) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a sortByResources) Less(i, j int) bool {
// Compare by free memory and free CPU in descending order
if a[i].AvailableMemory != a[j].AvailableMemory {
return a[i].AvailableMemory > a[j].AvailableMemory
}

func (a sortByAvailableMemory) Len() int { return len(a) }
func (a sortByAvailableMemory) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a sortByAvailableMemory) Less(i, j int) bool {
// more available memory = lower index
return a[i].AvailableMemory > a[j].AvailableMemory
// If free memory is equal, sort by free CPU in descending order
return a[i].AvailableCPU > a[j].AvailableCPU || (a[i].AvailableCPU == a[j].AvailableCPU && a[i].ScheduledVMs < a[j].ScheduledVMs)
}

func (a sortByAvailableMemory) String() string {
func (a sortByResources) String() string {
o, _ := json.Marshal(a)
return string(o)
}
55 changes: 35 additions & 20 deletions internal/service/scheduler/vmscheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ import (
"github.com/stretchr/testify/require"
)

type fakeResourceClient map[string]uint64
type fakeResourceClient map[string]nodeInfo

func (c fakeResourceClient) GetReservableMemoryBytes(_ context.Context, nodeName string, _ uint64) (uint64, error) {
return c[nodeName], nil
func (c fakeResourceClient) GetReservableResources(_ context.Context, nodeName string, _ uint64, _ uint64) (uint64, uint64, error) {
return c[nodeName].AvailableMemory, c[nodeName].AvailableCPU, nil
}

func miBytes(in uint64) uint64 {
Expand All @@ -39,10 +39,18 @@ func TestSelectNode(t *testing.T) {
allowedNodes := []string{"pve1", "pve2", "pve3"}
var locations []infrav1.NodeLocation
const requestMiB = 8
availableMem := map[string]uint64{
"pve1": miBytes(20),
"pve2": miBytes(30),
"pve3": miBytes(15),
const requestCores = 2
cpuAdjustment := uint64(100)

schedulerHints := &infrav1.SchedulerHints{
// This defaults to true in our CRD
PreferLowerGuestCount: true,
CPUAdjustment: &cpuAdjustment,
}
availableResources := map[string]nodeInfo{
"pve1": {AvailableMemory: miBytes(20), AvailableCPU: uint64(16)},
"pve2": {AvailableMemory: miBytes(30), AvailableCPU: uint64(16)},
"pve3": {AvailableMemory: miBytes(15), AvailableCPU: uint64(16)},
}

expectedNodes := []string{
Expand All @@ -57,40 +65,47 @@ func TestSelectNode(t *testing.T) {
proxmoxMachine := &infrav1.ProxmoxMachine{
Spec: infrav1.ProxmoxMachineSpec{
MemoryMiB: requestMiB,
NumCores: requestCores,
},
}

client := fakeResourceClient(availableMem)
client := fakeResourceClient(availableResources)

node, err := selectNode(context.Background(), client, proxmoxMachine, locations, allowedNodes, &infrav1.SchedulerHints{})
node, err := selectNode(context.Background(), client, proxmoxMachine, locations, allowedNodes, schedulerHints)
require.NoError(t, err)
require.Equal(t, expectedNode, node)

require.Greater(t, availableMem[node], miBytes(requestMiB))
availableMem[node] -= miBytes(requestMiB)
require.Greater(t, availableResources[node].AvailableMemory, miBytes(requestMiB))
if entry, ok := availableResources[node]; ok {
entry.AvailableMemory -= miBytes(requestMiB)
entry.AvailableCPU -= requestCores
availableResources[node] = entry
}

locations = append(locations, infrav1.NodeLocation{Node: node})
})
}

t.Run("out of memory", func(t *testing.T) {
t.Run("out of resources", func(t *testing.T) {
proxmoxMachine := &infrav1.ProxmoxMachine{
Spec: infrav1.ProxmoxMachineSpec{
MemoryMiB: requestMiB,
NumCores: requestCores,
},
}

client := fakeResourceClient(availableMem)
client := fakeResourceClient(availableResources)

node, err := selectNode(context.Background(), client, proxmoxMachine, locations, allowedNodes, &infrav1.SchedulerHints{})
require.ErrorAs(t, err, &InsufficientMemoryError{})
node, err := selectNode(context.Background(), client, proxmoxMachine, locations, allowedNodes, schedulerHints)
require.ErrorAs(t, err, &InsufficientResourcesError{})
require.Empty(t, node)

expectMem := map[string]uint64{
"pve1": miBytes(4), // 20 - 8 x 2
"pve2": miBytes(6), // 30 - 8 x 3
"pve3": miBytes(7), // 15 - 8 x 1
expectResources := map[string]nodeInfo{
"pve1": {AvailableMemory: miBytes(4), AvailableCPU: uint64(12)}, // 20 - 8 x 2
"pve2": {AvailableMemory: miBytes(6), AvailableCPU: uint64(10)}, // 30 - 8 x 3
"pve3": {AvailableMemory: miBytes(7), AvailableCPU: uint64(14)}, // 15 - 8 x 1
}
require.Equal(t, expectMem, availableMem)

require.Equal(t, expectResources, availableResources)
})
}
2 changes: 1 addition & 1 deletion internal/service/vmservice/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func createVM(ctx context.Context, scope *scope.MachineScope) (proxmox.VMCloneRe
var err error
options.Target, err = selectNextNode(ctx, scope)
if err != nil {
if errors.As(err, &scheduler.InsufficientMemoryError{}) {
if errors.As(err, &scheduler.InsufficientResourcesError{}) {
scope.SetFailureMessage(err)
scope.SetFailureReason(capierrors.InsufficientResourcesMachineError)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/service/vmservice/vm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestEnsureVirtualMachine_CreateVM_SelectNode_InsufficientMemory(t *testing.
machineScope.InfraCluster.ProxmoxCluster.Spec.AllowedNodes = []string{"node1"}

selectNextNode = func(context.Context, *scope.MachineScope) (string, error) {
return "", fmt.Errorf("error: %w", scheduler.InsufficientMemoryError{})
return "", fmt.Errorf("error: %w", scheduler.InsufficientResourcesError{})
}
t.Cleanup(func() { selectNextNode = scheduler.ScheduleVM })

Expand Down
2 changes: 1 addition & 1 deletion pkg/proxmox/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type Client interface {

GetTask(ctx context.Context, upID string) (*proxmox.Task, error)

GetReservableMemoryBytes(ctx context.Context, nodeName string, nodeMemoryAdjustment uint64) (uint64, error)
GetReservableResources(ctx context.Context, nodeName string, nodeMemoryAdjustment uint64, nodeCPUAdjustment uint64) (uint64, uint64, error)

ResizeDisk(ctx context.Context, vm *proxmox.VirtualMachine, disk, size string) error

Expand Down
Loading