You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

401 lines
7.2 KiB

package cluster
import (
"bytes"
"path/filepath"
"git.jpi.io/amery/jpictl/pkg/rings"
)
var (
_ MachineIterator = (*Region)(nil)
_ ZoneIterator = (*Region)(nil)
_ RegionIterator = (*Zone)(nil)
_ RegionIterator = (*Cluster)(nil)
)
// A RegionIterator is a set of Regions we can iterate on
type RegionIterator interface {
ForEachRegion(func(*Region) bool)
}
// Region represents a group of zones geographically related
type Region struct {
m *Cluster
zones []*Zone
Name string
ID rings.RegionID `json:",omitempty" yaml:",omitempty"`
Cluster *string `json:",omitempty" yaml:",omitempty"`
Regions []string `json:",omitempty" yaml:",omitempty"`
}
// IsPrimary indicates the region is primary and corresponds
// to a kubernetes cluster.
func (r *Region) IsPrimary() bool {
return r != nil && r.Cluster != nil
}
// ForEachRegion calls a function for each Region of the cluster
// until instructed to terminate the loop
func (m *Cluster) ForEachRegion(fn func(r *Region) bool) {
for i := range m.Regions {
r := &m.Regions[i]
if fn(r) {
return
}
}
}
// ForEachMachine calls a function for each Machine in the region
// until instructed to terminate the loop
func (r *Region) ForEachMachine(fn func(*Machine) bool) {
r.ForEachZone(func(z *Zone) bool {
var term bool
z.ForEachMachine(func(p *Machine) bool {
if p.IsActive() {
term = fn(p)
}
return term
})
return term
})
}
// ForEachZone calls a function for each Zone in the region
// until instructed to terminate the loop
func (r *Region) ForEachZone(fn func(*Zone) bool) {
for _, p := range r.zones {
if fn(p) {
// terminate
return
}
}
}
func (m *Cluster) initRegions(_ *ScanOptions) error {
regions := make(map[string][]*Zone)
// first regions defined by zones
m.ForEachZone(func(z *Zone) bool {
SortRegions(z.Regions)
for _, region := range z.Regions {
regions[region] = append(regions[region], z)
}
return false
})
// bind first level regions and their zones
for name, zones := range regions {
m.setRegionZones(name, zones...)
}
// and combine zones to produce larger regions
for i := range m.Regions {
r := &m.Regions[i]
m.finishRegion(r)
}
m.sortRegions()
m.scanRegionID()
return nil
}
func (m *Cluster) setRegionZones(name string, zones ...*Zone) {
for i := range m.Regions {
r := &m.Regions[i]
if r.Name == name {
// found
r.m = m
r.zones = zones
return
}
}
// new
m.Regions = append(m.Regions, Region{
m: m,
zones: zones,
Name: name,
})
}
func (m *Cluster) setRegionClusterToken(name string, token string) error {
for i := range m.Regions {
r := &m.Regions[i]
if r.Name == name {
// found
r.Cluster = &token
return nil
}
}
// new
m.Regions = append(m.Regions, Region{
m: m,
Name: name,
Cluster: &token,
})
return nil
}
func (m *Cluster) appendRegionRegions(name string, subs ...string) {
for i := range m.Regions {
r := &m.Regions[i]
if name == r.Name {
// found
r.Regions = append(r.Regions, subs...)
return
}
}
// new
m.Regions = append(m.Regions, Region{
Name: name,
Regions: subs,
})
}
// ForEachRegion calls a function on all regions this zone belongs to.
func (z *Zone) ForEachRegion(fn func(*Region) bool) {
if fn == nil {
return
}
z.zones.ForEachRegion(func(r *Region) bool {
var match bool
r.ForEachZone(func(z2 *Zone) bool {
match = (z == z2)
return match
})
if match && fn(r) {
return true
}
return false
})
}
func (z *Zone) appendRegions(regions ...string) error {
for _, s := range regions {
// TODO: validate
z.debug().
WithField("zone", z.Name).
WithField("region", s).
Print("attached")
z.Regions = append(z.Regions, s)
}
return nil
}
func (m *Cluster) finishRegion(r *Region) {
if r.m != nil {
// ready
return
}
r.m = m
sub := []string{}
for _, name := range r.Regions {
r2, ok := m.getFinishRegion(name)
if !ok {
m.warn(nil).WithField("region", name).Print("unknown region")
continue
}
sub = append(sub, r2.Name)
r.zones = append(r.zones, r2.zones...)
}
r.Regions = sub
}
// revive:disable:cognitive-complexity
func (m *Cluster) scanRegionID() {
// revive:enable:cognitive-complexity
var max rings.RegionID
var missing bool
// check IDs
ids := make(map[rings.RegionID]bool)
fn := func(r *Region) bool {
var term bool
switch {
case !r.IsPrimary():
// secondary, no ID.
r.ID = 0
case !r.ID.Valid():
// primary without ID
missing = true
case ids[r.ID]:
// duplicate
m.error(nil).WithField("region", r.Name).Print("duplicate ID")
missing = true
r.ID = 0
default:
ids[r.ID] = true
if r.ID > max {
max = r.ID
}
}
return term
}
m.ForEachRegion(fn)
if missing {
// assign missing IDs
fn := func(r *Region) bool {
var term bool
switch {
case !r.IsPrimary():
// ignore secondary
case r.ID.Valid():
// already has an ID
default:
r.ID = max + 1
max = r.ID
}
return term
}
m.ForEachRegion(fn)
}
}
func (m *Cluster) getRegion(name string) (*Region, bool) {
for i := range m.Regions {
r := &m.Regions[i]
if name == r.Name {
return r, true
}
}
return nil, false
}
func (m *Cluster) getFinishRegion(name string) (*Region, bool) {
if r, ok := m.getRegion(name); ok {
m.finishRegion(r)
return r, true
}
return nil, false
}
// SyncRegions writes to the file system the regions this [Zone]
// belongs to.
func (z *Zone) SyncRegions() error {
err := z.syncZoneRegions()
if err == nil {
z.ForEachMachine(func(p *Machine) bool {
err = z.syncMachineRegions(p)
return err != nil
})
}
return err
}
func (*Zone) syncMachineRegions(p *Machine) error {
var err error
if p.IsActive() {
err = p.RemoveFile("region")
} else {
err = p.WriteStringFile("none\n", "region")
}
if err == nil {
err = p.RemoveFile(RegionClusterTokenFileName)
}
return err
}
func (z *Zone) syncZoneRegions() error {
name := filepath.Join(z.Name, ZoneRegionsFileName)
if len(z.Regions) > 0 {
var buf bytes.Buffer
for _, s := range z.Regions {
_, _ = buf.WriteString(s)
_, _ = buf.WriteRune('\n')
}
return z.zones.WriteStringFile(buf.String(), name)
}
return z.zones.RemoveFile(name)
}
// SyncRegions writes to the file system the regions covered
// by this meta-region
func (r *Region) SyncRegions() error {
if err := r.syncRegionsFile(); err != nil {
return err
}
return r.syncClusterFile()
}
func (r *Region) mkdir() error {
return r.m.MkdirAll(r.Name)
}
func (r *Region) syncRegionsFile() error {
var err error
name := filepath.Join(r.Name, ZoneRegionsFileName)
if len(r.Regions) == 0 {
err = r.m.RemoveFile(name)
} else if err = r.mkdir(); err == nil {
var buf bytes.Buffer
for _, s := range r.Regions {
_, _ = buf.WriteString(s)
_, _ = buf.WriteRune('\n')
}
err = r.m.WriteStringFile(buf.String(), name)
}
return err
}
func (r *Region) syncClusterFile() error {
var err error
name := filepath.Join(r.Name, RegionClusterTokenFileName)
if r.Cluster == nil {
err = r.m.RemoveFile(name)
} else if err = r.mkdir(); err == nil {
var buf bytes.Buffer
_, _ = buf.WriteString(*r.Cluster)
if buf.Len() > 0 {
_, _ = buf.WriteRune('\n')
}
err = r.m.WriteStringFile(buf.String(), name)
}
return err
}