Skip to content

Storable scheduler #211

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 56 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
481277b
Adding base function.
itouri Nov 17, 2017
782f0b7
Adding Testing function
itouri Nov 17, 2017
e62e3a7
Change parameter of lxc_huge.json
itouri Nov 17, 2017
e2c78dd
Fix import cycle
itouri Nov 23, 2017
28d8a54
Adding instance assertion
itouri Nov 23, 2017
2bb1dc4
Adding temporary code.
itouri Nov 23, 2017
33d095a
Fix spell miss.
itouri Nov 23, 2017
a1fb0fc
test
itouri Nov 23, 2017
e5bc540
Adding switch function
itouri Nov 24, 2017
ed5ec6b
fix to Null and Esxi type return true
itouri Nov 24, 2017
ad4281c
Change order of Create function
itouri Nov 24, 2017
b27be60
Delete switch-case function from store.go that modified to interface …
itouri Dec 2, 2017
70a344e
Adding function of schedule.
itouri Dec 15, 2017
0cce17f
Add structure of VDC
itouri Dec 15, 2017
e173261
Implemented base of scheduler
itouri Dec 22, 2017
6be3e08
Pass the compile
itouri Dec 25, 2017
5ba5de2
Delete esxi folders to avoid marge confrict
itouri Dec 25, 2017
1250c29
Modified esxi folder file to avoid marge confrict
itouri Dec 25, 2017
b5c529f
Modify test argument to match the modifing of NewAPIServer()
itouri Dec 25, 2017
77c253e
Delete mesos_test.go
itouri Dec 25, 2017
2104ca3
Modify parameter of api.NewAPIServer
itouri Dec 25, 2017
a08f213
Delete extra test line
itouri Dec 25, 2017
7eb9d11
Modify function name miss
itouri Dec 25, 2017
79245ba
Refactoring ScheduleInstance function
itouri Dec 25, 2017
020383e
Add sample mesos offer
itouri Dec 25, 2017
ba1c0e8
Modify sample mesos offer struct
itouri Dec 25, 2017
c69ffb3
Modify testing function that convert mesos offer to vdc offer
itouri Jan 10, 2018
f9979ea
Rename the struct of VDCOffer
itouri Jan 19, 2018
06eb058
Add flog to Assign()
itouri Jan 19, 2018
c00f6c2
Merge branch 'master' into storable_scheduler
itouri Jan 19, 2018
61b1a97
Marge confrict
itouri Jan 19, 2018
6ddb895
Refactor names of VDCOffer struct
itouri Jan 19, 2018
0d462a8
Comment out Mutex for test
itouri Jan 24, 2018
db6a53b
Out Comment out
itouri Jan 24, 2018
1ff0eb9
Added newSchedule() and Modified VDCScheduler struct
itouri Feb 11, 2018
8a2127d
Modified schedule to global var
itouri Feb 11, 2018
7469907
Modified RegisterInstanceScheduleHandler name
itouri Feb 11, 2018
59efbff
Added nil checking function of instanceSchedulerHandlers
itouri Feb 11, 2018
28b841a
Delete extra flog
itouri Feb 11, 2018
8a910ad
Modified null scheduler
itouri Feb 11, 2018
ad9f2f8
Modified logic of schedule instance
itouri Feb 11, 2018
4433f58
Added waiting offer process for Assign
itouri Feb 11, 2018
044fd09
Added initialize of waitOfferSec
itouri Feb 11, 2018
65a14ac
Modified LxcHugeTemplate test to expect fail
itouri Feb 11, 2018
7810a39
Added logger to test
itouri Feb 12, 2018
9a3ae0e
Change to test
itouri Feb 12, 2018
fa549e2
Added to test
itouri Feb 12, 2018
4b1a232
Modified storable_scheduler_test contents
itouri Feb 12, 2018
baee10c
Added vm/esxi scheduler
itouri Feb 13, 2018
e3493b2
Changed model.VDCOffer to pointer
itouri Feb 15, 2018
f957a4d
Modified mesos test
itouri Feb 15, 2018
b9f5bba
Added new mutex to newSchedule
itouri Feb 15, 2018
e2cbabf
Deleted InstanceResourceHandler
itouri Feb 23, 2018
bf2d40f
Modified mesos_test to using proto library for convert to pointer
itouri Feb 23, 2018
48de403
Modified mutex from sync.Mutex to sync.RWMutex
itouri Feb 23, 2018
1a37579
Change sync mutex to non pointer
itouri Feb 23, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions api/instance_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,21 @@ func (s *InstanceAPI) Start(ctx context.Context, in *StartRequest) (*StartReply,
"instance_id": in.GetInstanceId(),
"state": lastState.String(),
})

switch lastState.GetState() {
case model.InstanceState_REGISTERED:
if err := lastState.ValidateGoalState(model.InstanceState_QUEUED); err != nil {
flog.Error(err)
// TODO: Investigate gRPC error response
return nil, err
}

err := s.api.instanceScheduler.Assign(inst)
if err != nil {
flog.Error(err)
return nil, err
}

if err := model.Instances(ctx).UpdateState(in.GetInstanceId(), model.InstanceState_QUEUED); err != nil {
flog.Error(err)
return nil, err
Expand Down
21 changes: 12 additions & 9 deletions api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
log "github.com/Sirupsen/logrus"
"github.com/axsh/openvdc/model"
"github.com/axsh/openvdc/model/backend"
"github.com/axsh/openvdc/scheduler"
"github.com/gogo/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
sched "github.com/mesos/mesos-go/scheduler"
Expand All @@ -18,11 +19,12 @@ import (
//go:generate protoc -I../proto -I${GOPATH}/src --go_out=plugins=grpc:${GOPATH}/src ../proto/v1.proto

type APIServer struct {
server *grpc.Server
modelStoreAddr backend.ConnectionAddress
scheduler sched.SchedulerDriver
mesosMasterAddr *mesos.Address
mMesosMasterAddr sync.Mutex
server *grpc.Server
modelStoreAddr backend.ConnectionAddress
scheduler sched.SchedulerDriver
mesosMasterAddr *mesos.Address
mMesosMasterAddr sync.Mutex
instanceScheduler scheduler.Schedule
}

type serverStreamWithContext struct {
Expand All @@ -34,7 +36,7 @@ func (ss *serverStreamWithContext) Context() context.Context {
return ss.ctx
}

func NewAPIServer(modelAddr backend.ConnectionAddress, driver sched.SchedulerDriver, ctx context.Context) *APIServer {
func NewAPIServer(modelAddr backend.ConnectionAddress, driver sched.SchedulerDriver, instanceScheduler scheduler.Schedule, ctx context.Context) *APIServer {
// Assert the ctx has "cluster.backend" key
model.GetClusterBackendCtx(ctx)

Expand Down Expand Up @@ -66,9 +68,10 @@ func NewAPIServer(modelAddr backend.ConnectionAddress, driver sched.SchedulerDri
grpc.StreamInterceptor(insertFullMethodStream),
}
s := &APIServer{
server: grpc.NewServer(sopts...),
modelStoreAddr: modelAddr,
scheduler: driver,
server: grpc.NewServer(sopts...),
modelStoreAddr: modelAddr,
scheduler: driver,
instanceScheduler: instanceScheduler,
}

RegisterInstanceServer(s.server, &InstanceAPI{api: s})
Expand Down
11 changes: 9 additions & 2 deletions api/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,18 @@ import (
"github.com/axsh/openvdc/internal/unittest"
"github.com/axsh/openvdc/model"
"github.com/axsh/openvdc/model/backend"
"github.com/axsh/openvdc/scheduler"
"golang.org/x/net/context"
)

func TestNewAPIServer(t *testing.T) {
ze := &backend.ZkEndpoint{}
ze.Set(unittest.TestZkServer)

sched := scheduler.Schedule{}

// TODO: Set mock SchedulerDriver
s := NewAPIServer(ze, nil, model.WithMockClusterBackendCtx(context.Background()))
s := NewAPIServer(ze, nil, sched, model.WithMockClusterBackendCtx(context.Background()))
if s == nil {
t.Error("NewAPIServer() returned nil")
}
Expand All @@ -28,8 +32,11 @@ func TestAPIServerRun(t *testing.T) {
}
ze := &backend.ZkEndpoint{}
ze.Set(unittest.TestZkServer)

sched := scheduler.Schedule{}

// TODO: Set mock SchedulerDriver
s := NewAPIServer(ze, nil, model.WithMockClusterBackendCtx(context.Background()))
s := NewAPIServer(ze, nil, sched, model.WithMockClusterBackendCtx(context.Background()))
go func() {
time.Sleep(2 * time.Second)
s.Stop()
Expand Down
Binary file not shown.
14 changes: 14 additions & 0 deletions ci/citest/acceptance-test/tests/storable_scheduler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// +build acceptance

package tests

import (
"testing"
)

func TestStorable_scheduler_LxcHugeTemplate(t *testing.T) {
_, stderr := RunCmdAndExpectFail(t, "openvdc", "run", "centos/7/lxc_huge")
if stderr == nil {
t.Error("There is no machine can satisfy resource requirement but work")
}
}
5 changes: 4 additions & 1 deletion cmd/openvdc-scheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,10 @@ func execute(cmd *cobra.Command, args []string) {
log.Fatalln("Faild to bind address for gRPC API: ", viper.GetString("api.endpoint"))
}
log.Info("Listening gRPC API on: ", viper.GetString("api.endpoint"))
grpcServer := api.NewAPIServer(zkAddr, mesosDriver, ctx)

instanceScheduler := scheduler.Schedule{}

grpcServer := api.NewAPIServer(zkAddr, mesosDriver, instanceScheduler, ctx)

if err := detector.Detect(grpcServer); err != nil {
log.WithError(err).Fatal("Failed to start mesos detector")
Expand Down
7 changes: 5 additions & 2 deletions handlers/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ import (
"encoding/json"
"fmt"
"io"
"strings"

"reflect"
"strings"

"github.com/axsh/openvdc/model"
)
Expand Down Expand Up @@ -64,3 +63,7 @@ func FindByType(name string) (p ResourceHandler, ok bool) {
p, ok = resourceHandlers[name]
return
}

type InstanceScheduleHandler interface {
ScheduleInstance(model.InstanceResource, *model.VDCOffer) (bool, error) // compare with offer and resrouce request.
}
30 changes: 30 additions & 0 deletions handlers/vm/esxi/scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package esxi

import (
"github.com/axsh/openvdc/handlers/vm"
"github.com/axsh/openvdc/model"
"github.com/axsh/openvdc/scheduler"
)

func init() {
scheduler.RegisterInstanceScheduleHandler("vm/esxi", &EsxiScheduler{})
}

type EsxiScheduler struct {
}

func NewScheduler() *EsxiScheduler {
return new(EsxiScheduler)
}

func (*EsxiScheduler) ScheduleInstance(ir model.InstanceResource, offer *model.VDCOffer) (bool, error) {
cpus := ir.GetVcpu()
mem := ir.GetMemoryGb()

offerCpus := int32(vm.GetOfferScalar(offer, "cpus"))
offerMem := int32(vm.GetOfferScalar(offer, "mem") / 1000)
if cpus < offerCpus && mem < offerMem {
return true, nil
}
return false, nil
}
1 change: 0 additions & 1 deletion handlers/vm/lxc/lxc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ func TestResourceName(t *testing.T) {

func TestTypes(t *testing.T) {
assert := assert.New(t)
assert.Implements((*handlers.ResourceHandler)(nil), &LxcHandler{})
assert.Implements((*handlers.CLIHandler)(nil), &LxcHandler{})
}

Expand Down
30 changes: 30 additions & 0 deletions handlers/vm/lxc/scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package lxc

import (
"github.com/axsh/openvdc/handlers/vm"
"github.com/axsh/openvdc/model"
"github.com/axsh/openvdc/scheduler"
)

func init() {
scheduler.RegisterInstanceScheduleHandler("vm/lxc", &LxcScheduler{})
}

type LxcScheduler struct {
}

func NewScheduler() *LxcScheduler {
return new(LxcScheduler)
}

func (*LxcScheduler) ScheduleInstance(ir model.InstanceResource, offer *model.VDCOffer) (bool, error) {
cpus := ir.GetVcpu()
mem := ir.GetMemoryGb()

offerCpus := int32(vm.GetOfferScalar(offer, "cpus"))
offerMem := int32(vm.GetOfferScalar(offer, "mem") / 1000)
if cpus < offerCpus && mem < offerMem {
return true, nil
}
return false, nil
}
1 change: 0 additions & 1 deletion handlers/vm/null/null_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,5 @@ func TestResourceName(t *testing.T) {

func TestTypes(t *testing.T) {
assert := assert.New(t)
assert.Implements((*handlers.ResourceHandler)(nil), &NullHandler{})
assert.Implements((*handlers.CLIHandler)(nil), &NullHandler{})
}
21 changes: 21 additions & 0 deletions handlers/vm/null/scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package null

import (
"github.com/axsh/openvdc/model"
"github.com/axsh/openvdc/scheduler"
)

func init() {
scheduler.RegisterInstanceScheduleHandler("vm/null", &NullScheduler{})
}

type NullScheduler struct {
}

func NewScheduler() *NullScheduler {
return new(NullScheduler)
}

func (*NullScheduler) ScheduleInstance(ir model.InstanceResource, offer *model.VDCOffer) (bool, error) {
return true, nil
}
1 change: 0 additions & 1 deletion handlers/vm/qemu/qemu_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ func TestResourceName(t *testing.T) {

func TestTypes(t *testing.T) {
assert := assert.New(t)
assert.Implements((*handlers.ResourceHandler)(nil), &QemuHandler{})
assert.Implements((*handlers.CLIHandler)(nil), &QemuHandler{})
}

Expand Down
30 changes: 30 additions & 0 deletions handlers/vm/qemu/scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package qemu

import (
"github.com/axsh/openvdc/handlers/vm"
"github.com/axsh/openvdc/model"
"github.com/axsh/openvdc/scheduler"
)

func init() {
scheduler.RegisterInstanceScheduleHandler("vm/qemu", &QemuScheduler{})
}

type QemuScheduler struct {
}

func NewScheduler() *QemuScheduler {
return new(QemuScheduler)
}

func (*QemuScheduler) ScheduleInstance(ir model.InstanceResource, offer *model.VDCOffer) (bool, error) {
cpus := ir.GetVcpu()
mem := ir.GetMemoryGb()

offerCpus := int32(vm.GetOfferScalar(offer, "cpus"))
offerMem := int32(vm.GetOfferScalar(offer, "mem") / 1000)
if cpus < offerCpus && mem < offerMem {
return true, nil
}
return false, nil
}
26 changes: 26 additions & 0 deletions handlers/vm/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package vm

import "github.com/axsh/openvdc/model"

// Generic scheduling util functions
// TODO change to dont allow non-nullable function
func GetOfferScalar(offer *model.VDCOffer, name string) float64 {
resources := filterResources(offer.Resources, func(res model.VDCOfferResource) bool {
return res.Name == name
})

value := 0.0
for _, res := range resources {
value += res.Scalar
}
return value
}

func filterResources(resources []model.VDCOfferResource, filter func(model.VDCOfferResource) bool) (result []model.VDCOfferResource) {
for _, resource := range resources {
if filter(resource) {
result = append(result, resource)
}
}
return result
}
30 changes: 30 additions & 0 deletions model/resource_templates.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func (*NullTemplate) ResourceName() string { return "vm/null" }

// InstanceResource is a marker interface for instance template structs.
type InstanceResource interface {
ResourceTemplate
isInstanceResourceKind()
// protobuf message belongs to InstanceResource should have fields below:
// int32 vcpu = xx;
Expand Down Expand Up @@ -72,3 +73,32 @@ func IsMatchingNodeGroups(res InstanceResource, offered []string) bool {
}
return true
}

// define openvdc's offer
type VDCOffer struct {
SlaveID string
Resources []VDCOfferResource
}

type VDCOfferResource struct {
Name string
Type VDCOfferValueType
Scalar float64
Ranges []VDCOfferValueRange
Set []string
// Disk
}

type VDCOfferValueType int32

const (
VDCOfferValueScalar VDCOfferValueType = 0
VDCOfferValueRanges VDCOfferValueType = 1
VDCOfferValueSet VDCOfferValueType = 2
VDCOfferValueText VDCOfferValueType = 3
)

type VDCOfferValueRange struct {
Begin uint64
End uint64
}
Loading