cluster: handle regions on filesystem #39

Merged
amery merged 3 commits from pr-amery-regions into main 1 year ago
  1. 109
      pkg/cluster/cluster_scan.go
  2. 103
      pkg/cluster/regions.go
  3. 22
      pkg/cluster/sync.go

109
pkg/cluster/cluster_scan.go

@ -2,17 +2,25 @@ package cluster
import (
"io/fs"
"path"
"sort"
"darvaza.org/core"
)
const (
// ZoneRegionsFileName indicates the file containing
// region names as references
ZoneRegionsFileName = "regions"
)
func (m *Cluster) scan(opts *ScanOptions) error {
for _, fn := range []func(*ScanOptions) error{
m.scanDirectory,
m.scanMachines,
m.scanZoneIDs,
m.scanSort,
m.initRegions,
m.scanGateways,
m.scanCephMonitors,
} {
@ -24,7 +32,7 @@ func (m *Cluster) scan(opts *ScanOptions) error {
return nil
}
func (m *Cluster) scanDirectory(_ *ScanOptions) error {
func (m *Cluster) scanDirectory(opts *ScanOptions) error {
// each directory is a zone
entries, err := fs.ReadDir(m.dir, ".")
if err != nil {
@ -33,16 +41,14 @@ func (m *Cluster) scanDirectory(_ *ScanOptions) error {
for _, e := range entries {
if e.IsDir() {
z, err := m.newZone(e.Name())
ok, err := m.scanSubdirectory(opts, e.Name())
switch {
case err != nil:
return core.Wrap(err, e.Name())
case z.Machines.Len() == 0:
z.warn(nil).
WithField("zone", z.Name).
case !ok:
m.warn(nil).
WithField("zone", e.Name()).
Print("empty")
default:
m.Zones = append(m.Zones, z)
}
}
}
@ -50,6 +56,27 @@ func (m *Cluster) scanDirectory(_ *ScanOptions) error {
return nil
}
func (m *Cluster) scanSubdirectory(_ *ScanOptions, name string) (bool, error) {
z, err := m.newZone(name)
switch {
case err != nil:
// somewhere went wrong scanning the subdirectory
return false, err
case z.Machines.Len() > 0:
// zones have machines and the regions they belong
m.Zones = append(m.Zones, z)
return true, nil
case len(z.Regions) > 0:
// regions have no machines but can include
// other regions
m.appendRegionRegions(name, z.Regions...)
return true, nil
default:
// empty
return false, nil
}
}
func (m *Cluster) newZone(name string) (*Zone, error) {
z := &Zone{
zones: m,
@ -154,31 +181,65 @@ func (z *Zone) scan() error {
}
for _, e := range entries {
if e.IsDir() {
m := &Machine{
zone: z,
logger: z,
Name: e.Name(),
}
name := e.Name()
m.debug().
WithField("node", m.Name).
switch {
case name == ZoneRegionsFileName:
err = z.loadRegions()
case e.IsDir():
err = z.scanSubdirectory(name)
default:
z.warn(nil).
WithField("zone", z.Name).
Print("found")
WithField("filename", name).
Print("unknown")
}
if err := m.init(); err != nil {
m.error(err).
WithField("node", m.Name).
WithField("zone", z.Name).
Print()
if err != nil {
return err
}
}
return err
}
return nil
}
z.Machines = append(z.Machines, m)
func (z *Zone) loadRegions() error {
filename := path.Join(z.Name, ZoneRegionsFileName)
regions, err := z.zones.ReadLines(filename)
if err == nil {
// parsed
err = z.appendRegions(regions...)
if err != nil {
err = core.Wrap(err, filename)
}
}
return err
}
func (z *Zone) scanSubdirectory(name string) error {
m := &Machine{
zone: z,
logger: z,
Name: name,
}
m.debug().
WithField("node", m.Name).
WithField("zone", z.Name).
Print("found")
if err := m.init(); err != nil {
m.error(err).
WithField("node", m.Name).
WithField("zone", z.Name).
Print()
return err
}
z.Machines = append(z.Machines, m)
return nil
}

103
pkg/cluster/regions.go

@ -1,5 +1,10 @@
package cluster
import (
"bytes"
"path/filepath"
)
var (
_ MachineIterator = (*Region)(nil)
_ ZoneIterator = (*Region)(nil)
@ -68,7 +73,7 @@ func (m *Cluster) initRegions(_ *ScanOptions) error {
// bind first level regions and their zones
for name, zones := range regions {
m.syncRegions(name, zones...)
m.setRegionZones(name, zones...)
}
// and combine zones to produce larger regions
@ -81,8 +86,10 @@ func (m *Cluster) initRegions(_ *ScanOptions) error {
return nil
}
func (m *Cluster) syncRegions(name string, zones ...*Zone) {
for _, r := range m.Regions {
func (m *Cluster) setRegionZones(name string, zones ...*Zone) {
for i := range m.Regions {
r := &m.Regions[i]
if r.Name == name {
// found
r.m = m
@ -99,6 +106,38 @@ func (m *Cluster) syncRegions(name string, zones ...*Zone) {
})
}
func (m *Cluster) appendRegionRegions(name string, subs ...string) {
for i := range m.Regions {
r := &m.Regions[i]
if name == r.Name {
// found
r.Regions = append(r.Regions, subs...)
return
}
}
// new
m.Regions = append(m.Regions, Region{
Name: name,
Regions: subs,
})
}
func (z *Zone) appendRegions(regions ...string) error {
for _, s := range regions {
// TODO: validate
z.debug().
WithField("zone", z.Name).
WithField("region", s).
Print("attached")
z.Regions = append(z.Regions, s)
}
return nil
}
func (m *Cluster) finishRegion(r *Region) {
if r.m != nil {
// ready
@ -132,3 +171,61 @@ func (m *Cluster) getRegion(name string) (*Region, bool) {
return nil, false
}
// SyncRegions writes to the file system the regions this [Zone]
// belongs to.
func (z *Zone) SyncRegions() error {
err := z.syncZoneRegions()
if err == nil {
z.ForEachMachine(func(p *Machine) bool {
if p.IsActive() {
err = p.RemoveFile("region")
} else {
err = p.WriteStringFile("none\n", "region")
}
return err != nil
})
}
return err
}
func (z *Zone) syncZoneRegions() error {
name := filepath.Join(z.Name, "regions")
if len(z.Regions) > 0 {
var buf bytes.Buffer
for _, s := range z.Regions {
_, _ = buf.WriteString(s)
_, _ = buf.WriteRune('\n')
}
return z.zones.WriteStringFile(buf.String(), name)
}
return z.zones.RemoveFile(name)
}
// SyncRegions writes to the file system the regions covered
// by this meta-region
func (r *Region) SyncRegions() error {
name := filepath.Join(r.Name, "regions")
if len(r.Regions) > 0 {
var buf bytes.Buffer
for _, s := range r.Regions {
_, _ = buf.WriteString(s)
_, _ = buf.WriteRune('\n')
}
if err := r.m.MkdirAll(r.Name); err != nil {
return err
}
return r.m.WriteStringFile(buf.String(), name)
}
return r.m.RemoveFile(name)
}

22
pkg/cluster/sync.go

@ -6,6 +6,7 @@ func (m *Cluster) SyncAll() error {
m.SyncMkdirAll,
m.SyncAllWireguard,
m.SyncAllCeph,
m.SyncAllRegions,
m.WriteHosts,
} {
if err := fn(); err != nil {
@ -58,3 +59,24 @@ func (m *Cluster) SyncAllCeph() error {
return m.WriteCephConfig(cfg)
}
// SyncAllRegions rewrites all region data
func (m *Cluster) SyncAllRegions() error {
var err error
m.ForEachZone(func(z *Zone) bool {
err := z.SyncRegions()
return err != nil
})
if err != nil {
return err
}
m.ForEachRegion(func(r *Region) bool {
err = r.SyncRegions()
return err != nil
})
return err
}

Loading…
Cancel
Save