You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
431 lines
7.7 KiB
431 lines
7.7 KiB
package cluster |
|
|
|
import ( |
|
"bytes" |
|
"path/filepath" |
|
|
|
"git.jpi.io/amery/jpictl/pkg/rings" |
|
) |
|
|
|
var ( |
|
_ MachineIterator = (*Region)(nil) |
|
_ ZoneIterator = (*Region)(nil) |
|
|
|
_ RegionIterator = (*Zone)(nil) |
|
_ RegionIterator = (*Cluster)(nil) |
|
) |
|
|
|
// A RegionIterator is a set of Regions we can iterate on |
|
type RegionIterator interface { |
|
ForEachRegion(func(*Region) bool) |
|
} |
|
|
|
// Region represents a group of zones geographically related |
|
type Region struct { |
|
m *Cluster |
|
zones []*Zone |
|
|
|
Name string |
|
ID rings.RegionID `json:",omitempty" yaml:",omitempty"` |
|
Cluster *string `json:",omitempty" yaml:",omitempty"` |
|
Regions []string `json:",omitempty" yaml:",omitempty"` |
|
} |
|
|
|
// IsPrimary indicates the region is primary and corresponds |
|
// to a kubernetes cluster. |
|
func (r *Region) IsPrimary() bool { |
|
return r != nil && r.Cluster != nil |
|
} |
|
|
|
// ForEachRegion calls a function for each Region of the cluster |
|
// until instructed to terminate the loop |
|
func (m *Cluster) ForEachRegion(fn func(r *Region) bool) { |
|
for i := range m.Regions { |
|
r := &m.Regions[i] |
|
if fn(r) { |
|
return |
|
} |
|
} |
|
} |
|
|
|
// ForEachMachine calls a function for each Machine in the region |
|
// until instructed to terminate the loop |
|
func (r *Region) ForEachMachine(fn func(*Machine) bool) { |
|
r.ForEachZone(func(z *Zone) bool { |
|
var term bool |
|
|
|
z.ForEachMachine(func(p *Machine) bool { |
|
if p.IsActive() { |
|
term = fn(p) |
|
} |
|
return term |
|
}) |
|
|
|
return term |
|
}) |
|
} |
|
|
|
// ForEachZone calls a function for each Zone in the region |
|
// until instructed to terminate the loop |
|
func (r *Region) ForEachZone(fn func(*Zone) bool) { |
|
for _, p := range r.zones { |
|
if fn(p) { |
|
// terminate |
|
return |
|
} |
|
} |
|
} |
|
|
|
func (m *Cluster) initRegions(_ *ScanOptions) error { |
|
regions := make(map[string][]*Zone) |
|
|
|
// first regions defined by zones |
|
m.ForEachZone(func(z *Zone) bool { |
|
SortRegions(z.Regions) |
|
for _, region := range z.Regions { |
|
regions[region] = append(regions[region], z) |
|
} |
|
|
|
return false |
|
}) |
|
|
|
// bind first level regions and their zones |
|
for name, zones := range regions { |
|
m.setRegionZones(name, zones...) |
|
} |
|
|
|
// and combine zones to produce larger regions |
|
for i := range m.Regions { |
|
r := &m.Regions[i] |
|
m.finishRegion(r) |
|
} |
|
|
|
m.sortRegions() |
|
m.scanRegionID() |
|
m.computeZonesRegion() |
|
return nil |
|
} |
|
|
|
func (m *Cluster) setRegionZones(name string, zones ...*Zone) { |
|
for i := range m.Regions { |
|
r := &m.Regions[i] |
|
|
|
if r.Name == name { |
|
// found |
|
r.m = m |
|
r.zones = zones |
|
return |
|
} |
|
} |
|
|
|
// new |
|
m.Regions = append(m.Regions, Region{ |
|
m: m, |
|
zones: zones, |
|
Name: name, |
|
}) |
|
} |
|
|
|
func (m *Cluster) setRegionClusterToken(name string, token string) error { |
|
for i := range m.Regions { |
|
r := &m.Regions[i] |
|
|
|
if r.Name == name { |
|
// found |
|
r.Cluster = &token |
|
return nil |
|
} |
|
} |
|
|
|
// new |
|
m.Regions = append(m.Regions, Region{ |
|
m: m, |
|
Name: name, |
|
Cluster: &token, |
|
}) |
|
return nil |
|
} |
|
|
|
func (m *Cluster) appendRegionRegions(name string, subs ...string) { |
|
for i := range m.Regions { |
|
r := &m.Regions[i] |
|
|
|
if name == r.Name { |
|
// found |
|
r.Regions = append(r.Regions, subs...) |
|
return |
|
} |
|
} |
|
|
|
// new |
|
m.Regions = append(m.Regions, Region{ |
|
Name: name, |
|
Regions: subs, |
|
}) |
|
} |
|
|
|
// ForEachRegion calls a function on all regions this zone belongs to. |
|
func (z *Zone) ForEachRegion(fn func(*Region) bool) { |
|
if fn == nil { |
|
return |
|
} |
|
|
|
z.zones.ForEachRegion(func(r *Region) bool { |
|
var match bool |
|
|
|
r.ForEachZone(func(z2 *Zone) bool { |
|
match = (z == z2) |
|
return match |
|
}) |
|
|
|
if match && fn(r) { |
|
return true |
|
} |
|
|
|
return false |
|
}) |
|
} |
|
|
|
func (z *Zone) appendRegions(regions ...string) error { |
|
for _, s := range regions { |
|
// TODO: validate |
|
z.debug(). |
|
WithField("zone", z.Name). |
|
WithField("region", s). |
|
Print("attached") |
|
|
|
z.Regions = append(z.Regions, s) |
|
} |
|
|
|
return nil |
|
} |
|
|
|
func (m *Cluster) finishRegion(r *Region) { |
|
if r.m != nil { |
|
// ready |
|
return |
|
} |
|
|
|
r.m = m |
|
sub := []string{} |
|
for _, name := range r.Regions { |
|
r2, ok := m.getFinishRegion(name) |
|
if !ok { |
|
m.warn(nil).WithField("region", name).Print("unknown region") |
|
continue |
|
} |
|
|
|
sub = append(sub, r2.Name) |
|
r.zones = append(r.zones, r2.zones...) |
|
} |
|
r.Regions = sub |
|
} |
|
|
|
// revive:disable:cognitive-complexity |
|
func (m *Cluster) scanRegionID() { |
|
// revive:enable:cognitive-complexity |
|
var max rings.RegionID |
|
var missing bool |
|
|
|
// check IDs |
|
ids := make(map[rings.RegionID]bool) |
|
fn := func(r *Region) bool { |
|
var term bool |
|
|
|
switch { |
|
case !r.IsPrimary(): |
|
// secondary, no ID. |
|
r.ID = 0 |
|
case !r.ID.Valid(): |
|
// primary without ID |
|
missing = true |
|
case ids[r.ID]: |
|
// duplicate |
|
m.error(nil).WithField("region", r.Name).Print("duplicate ID") |
|
missing = true |
|
r.ID = 0 |
|
default: |
|
ids[r.ID] = true |
|
|
|
if r.ID > max { |
|
max = r.ID |
|
} |
|
} |
|
|
|
return term |
|
} |
|
m.ForEachRegion(fn) |
|
|
|
if missing { |
|
// assign missing IDs |
|
fn := func(r *Region) bool { |
|
var term bool |
|
|
|
switch { |
|
case !r.IsPrimary(): |
|
// ignore secondary |
|
case r.ID.Valid(): |
|
// already has an ID |
|
default: |
|
r.ID = max + 1 |
|
max = r.ID |
|
} |
|
|
|
return term |
|
} |
|
|
|
m.ForEachRegion(fn) |
|
} |
|
} |
|
|
|
func (m *Cluster) computeZonesRegion() { |
|
fn := func(r *Region, z *Zone) { |
|
if z.region != nil { |
|
m.error(nil). |
|
WithField("zone", z.Name). |
|
WithField("region", []string{ |
|
z.region.Name, |
|
r.Name, |
|
}). |
|
Print("zone in two regions") |
|
} else { |
|
z.region = r |
|
} |
|
} |
|
|
|
m.ForEachRegion(func(r *Region) bool { |
|
var term bool |
|
|
|
if r.IsPrimary() { |
|
r.ForEachZone(func(z *Zone) bool { |
|
fn(r, z) |
|
return term |
|
}) |
|
} |
|
|
|
return term |
|
}) |
|
} |
|
|
|
func (m *Cluster) getRegion(name string) (*Region, bool) { |
|
for i := range m.Regions { |
|
r := &m.Regions[i] |
|
if name == r.Name { |
|
return r, true |
|
} |
|
} |
|
|
|
return nil, false |
|
} |
|
|
|
func (m *Cluster) getFinishRegion(name string) (*Region, bool) { |
|
if r, ok := m.getRegion(name); ok { |
|
m.finishRegion(r) |
|
return r, true |
|
} |
|
|
|
return nil, false |
|
} |
|
|
|
// SyncRegions writes to the file system the regions this [Zone] |
|
// belongs to. |
|
func (z *Zone) SyncRegions() error { |
|
err := z.syncZoneRegions() |
|
if err == nil { |
|
z.ForEachMachine(func(p *Machine) bool { |
|
err = z.syncMachineRegions(p) |
|
return err != nil |
|
}) |
|
} |
|
|
|
return err |
|
} |
|
|
|
func (*Zone) syncMachineRegions(p *Machine) error { |
|
var err error |
|
|
|
if p.IsActive() { |
|
err = p.RemoveFile("region") |
|
} else { |
|
err = p.WriteStringFile("none\n", "region") |
|
} |
|
|
|
if err == nil { |
|
err = p.RemoveFile(RegionClusterTokenFileName) |
|
} |
|
|
|
return err |
|
} |
|
|
|
func (z *Zone) syncZoneRegions() error { |
|
name := filepath.Join(z.Name, ZoneRegionsFileName) |
|
|
|
if len(z.Regions) > 0 { |
|
var buf bytes.Buffer |
|
|
|
for _, s := range z.Regions { |
|
_, _ = buf.WriteString(s) |
|
_, _ = buf.WriteRune('\n') |
|
} |
|
|
|
return z.zones.WriteStringFile(buf.String(), name) |
|
} |
|
|
|
return z.zones.RemoveFile(name) |
|
} |
|
|
|
// SyncRegions writes to the file system the regions covered |
|
// by this meta-region |
|
func (r *Region) SyncRegions() error { |
|
if err := r.syncRegionsFile(); err != nil { |
|
return err |
|
} |
|
|
|
return r.syncClusterFile() |
|
} |
|
|
|
func (r *Region) mkdir() error { |
|
return r.m.MkdirAll(r.Name) |
|
} |
|
|
|
func (r *Region) syncRegionsFile() error { |
|
var err error |
|
|
|
name := filepath.Join(r.Name, ZoneRegionsFileName) |
|
|
|
if len(r.Regions) == 0 { |
|
err = r.m.RemoveFile(name) |
|
} else if err = r.mkdir(); err == nil { |
|
var buf bytes.Buffer |
|
|
|
for _, s := range r.Regions { |
|
_, _ = buf.WriteString(s) |
|
_, _ = buf.WriteRune('\n') |
|
} |
|
|
|
err = r.m.WriteStringFile(buf.String(), name) |
|
} |
|
|
|
return err |
|
} |
|
|
|
func (r *Region) syncClusterFile() error { |
|
var err error |
|
|
|
name := filepath.Join(r.Name, RegionClusterTokenFileName) |
|
|
|
if r.Cluster == nil { |
|
err = r.m.RemoveFile(name) |
|
} else if err = r.mkdir(); err == nil { |
|
var buf bytes.Buffer |
|
|
|
_, _ = buf.WriteString(*r.Cluster) |
|
if buf.Len() > 0 { |
|
_, _ = buf.WriteRune('\n') |
|
} |
|
|
|
err = r.m.WriteStringFile(buf.String(), name) |
|
} |
|
|
|
return err |
|
}
|
|
|