cluster: handle regions on filesystem #39
+87
-26
@@ -2,17 +2,25 @@ package cluster
|
||||
|
||||
import (
|
||||
"io/fs"
|
||||
"path"
|
||||
"sort"
|
||||
|
||||
"darvaza.org/core"
|
||||
)
|
||||
|
||||
const (
|
||||
// ZoneRegionsFileName indicates the file containing
|
||||
// region names as references
|
||||
ZoneRegionsFileName = "regions"
|
||||
)
|
||||
|
||||
func (m *Cluster) scan(opts *ScanOptions) error {
|
||||
for _, fn := range []func(*ScanOptions) error{
|
||||
m.scanDirectory,
|
||||
m.scanMachines,
|
||||
m.scanZoneIDs,
|
||||
m.scanSort,
|
||||
m.initRegions,
|
||||
m.scanGateways,
|
||||
m.scanCephMonitors,
|
||||
} {
|
||||
@@ -24,7 +32,7 @@ func (m *Cluster) scan(opts *ScanOptions) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Cluster) scanDirectory(_ *ScanOptions) error {
|
||||
func (m *Cluster) scanDirectory(opts *ScanOptions) error {
|
||||
// each directory is a zone
|
||||
entries, err := fs.ReadDir(m.dir, ".")
|
||||
if err != nil {
|
||||
@@ -33,16 +41,14 @@ func (m *Cluster) scanDirectory(_ *ScanOptions) error {
|
||||
|
||||
for _, e := range entries {
|
||||
if e.IsDir() {
|
||||
z, err := m.newZone(e.Name())
|
||||
ok, err := m.scanSubdirectory(opts, e.Name())
|
||||
switch {
|
||||
case err != nil:
|
||||
return core.Wrap(err, e.Name())
|
||||
case z.Machines.Len() == 0:
|
||||
z.warn(nil).
|
||||
WithField("zone", z.Name).
|
||||
case !ok:
|
||||
m.warn(nil).
|
||||
WithField("zone", e.Name()).
|
||||
Print("empty")
|
||||
default:
|
||||
m.Zones = append(m.Zones, z)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -50,6 +56,27 @@ func (m *Cluster) scanDirectory(_ *ScanOptions) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Cluster) scanSubdirectory(_ *ScanOptions, name string) (bool, error) {
|
||||
z, err := m.newZone(name)
|
||||
switch {
|
||||
case err != nil:
|
||||
// somewhere went wrong scanning the subdirectory
|
||||
return false, err
|
||||
case z.Machines.Len() > 0:
|
||||
// zones have machines and the regions they belong
|
||||
m.Zones = append(m.Zones, z)
|
||||
return true, nil
|
||||
case len(z.Regions) > 0:
|
||||
// regions have no machines but can include
|
||||
// other regions
|
||||
m.appendRegionRegions(name, z.Regions...)
|
||||
return true, nil
|
||||
default:
|
||||
// empty
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Cluster) newZone(name string) (*Zone, error) {
|
||||
z := &Zone{
|
||||
zones: m,
|
||||
@@ -154,34 +181,68 @@ func (z *Zone) scan() error {
|
||||
}
|
||||
|
||||
for _, e := range entries {
|
||||
if e.IsDir() {
|
||||
m := &Machine{
|
||||
zone: z,
|
||||
logger: z,
|
||||
Name: e.Name(),
|
||||
}
|
||||
name := e.Name()
|
||||
|
||||
m.debug().
|
||||
WithField("node", m.Name).
|
||||
switch {
|
||||
case name == ZoneRegionsFileName:
|
||||
err = z.loadRegions()
|
||||
case e.IsDir():
|
||||
err = z.scanSubdirectory(name)
|
||||
default:
|
||||
z.warn(nil).
|
||||
WithField("zone", z.Name).
|
||||
Print("found")
|
||||
WithField("filename", name).
|
||||
Print("unknown")
|
||||
}
|
||||
|
||||
if err := m.init(); err != nil {
|
||||
m.error(err).
|
||||
WithField("node", m.Name).
|
||||
WithField("zone", z.Name).
|
||||
Print()
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
z.Machines = append(z.Machines, m)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (z *Zone) loadRegions() error {
|
||||
filename := path.Join(z.Name, ZoneRegionsFileName)
|
||||
regions, err := z.zones.ReadLines(filename)
|
||||
|
||||
if err == nil {
|
||||
// parsed
|
||||
err = z.appendRegions(regions...)
|
||||
if err != nil {
|
||||
err = core.Wrap(err, filename)
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (z *Zone) scanSubdirectory(name string) error {
|
||||
m := &Machine{
|
||||
zone: z,
|
||||
logger: z,
|
||||
Name: name,
|
||||
}
|
||||
|
||||
m.debug().
|
||||
WithField("node", m.Name).
|
||||
WithField("zone", z.Name).
|
||||
Print("found")
|
||||
|
||||
if err := m.init(); err != nil {
|
||||
m.error(err).
|
||||
WithField("node", m.Name).
|
||||
WithField("zone", z.Name).
|
||||
Print()
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
z.Machines = append(z.Machines, m)
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetGateway returns the first gateway found, if none
|
||||
// files will be created to enable the first [Machine] to
|
||||
// be one
|
||||
|
||||
+100
-3
@@ -1,5 +1,10 @@
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"path/filepath"
|
||||
)
|
||||
|
||||
var (
|
||||
_ MachineIterator = (*Region)(nil)
|
||||
_ ZoneIterator = (*Region)(nil)
|
||||
@@ -68,7 +73,7 @@ func (m *Cluster) initRegions(_ *ScanOptions) error {
|
||||
|
||||
// bind first level regions and their zones
|
||||
for name, zones := range regions {
|
||||
m.syncRegions(name, zones...)
|
||||
m.setRegionZones(name, zones...)
|
||||
}
|
||||
|
||||
// and combine zones to produce larger regions
|
||||
@@ -81,8 +86,10 @@ func (m *Cluster) initRegions(_ *ScanOptions) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Cluster) syncRegions(name string, zones ...*Zone) {
|
||||
for _, r := range m.Regions {
|
||||
func (m *Cluster) setRegionZones(name string, zones ...*Zone) {
|
||||
for i := range m.Regions {
|
||||
r := &m.Regions[i]
|
||||
|
||||
if r.Name == name {
|
||||
// found
|
||||
r.m = m
|
||||
@@ -99,6 +106,38 @@ func (m *Cluster) syncRegions(name string, zones ...*Zone) {
|
||||
})
|
||||
}
|
||||
|
||||
func (m *Cluster) appendRegionRegions(name string, subs ...string) {
|
||||
for i := range m.Regions {
|
||||
r := &m.Regions[i]
|
||||
|
||||
if name == r.Name {
|
||||
// found
|
||||
r.Regions = append(r.Regions, subs...)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// new
|
||||
m.Regions = append(m.Regions, Region{
|
||||
Name: name,
|
||||
Regions: subs,
|
||||
})
|
||||
}
|
||||
|
||||
func (z *Zone) appendRegions(regions ...string) error {
|
||||
for _, s := range regions {
|
||||
// TODO: validate
|
||||
z.debug().
|
||||
WithField("zone", z.Name).
|
||||
WithField("region", s).
|
||||
Print("attached")
|
||||
|
||||
z.Regions = append(z.Regions, s)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Cluster) finishRegion(r *Region) {
|
||||
if r.m != nil {
|
||||
// ready
|
||||
@@ -132,3 +171,61 @@ func (m *Cluster) getRegion(name string) (*Region, bool) {
|
||||
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// SyncRegions writes to the file system the regions this [Zone]
|
||||
// belongs to.
|
||||
func (z *Zone) SyncRegions() error {
|
||||
err := z.syncZoneRegions()
|
||||
if err == nil {
|
||||
z.ForEachMachine(func(p *Machine) bool {
|
||||
if p.IsActive() {
|
||||
err = p.RemoveFile("region")
|
||||
} else {
|
||||
err = p.WriteStringFile("none\n", "region")
|
||||
}
|
||||
return err != nil
|
||||
})
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (z *Zone) syncZoneRegions() error {
|
||||
name := filepath.Join(z.Name, "regions")
|
||||
|
||||
if len(z.Regions) > 0 {
|
||||
var buf bytes.Buffer
|
||||
|
||||
for _, s := range z.Regions {
|
||||
_, _ = buf.WriteString(s)
|
||||
_, _ = buf.WriteRune('\n')
|
||||
}
|
||||
|
||||
return z.zones.WriteStringFile(buf.String(), name)
|
||||
}
|
||||
|
||||
return z.zones.RemoveFile(name)
|
||||
}
|
||||
|
||||
// SyncRegions writes to the file system the regions covered
|
||||
// by this meta-region
|
||||
func (r *Region) SyncRegions() error {
|
||||
name := filepath.Join(r.Name, "regions")
|
||||
|
||||
if len(r.Regions) > 0 {
|
||||
var buf bytes.Buffer
|
||||
|
||||
for _, s := range r.Regions {
|
||||
_, _ = buf.WriteString(s)
|
||||
_, _ = buf.WriteRune('\n')
|
||||
}
|
||||
|
||||
if err := r.m.MkdirAll(r.Name); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return r.m.WriteStringFile(buf.String(), name)
|
||||
}
|
||||
|
||||
return r.m.RemoveFile(name)
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ func (m *Cluster) SyncAll() error {
|
||||
m.SyncMkdirAll,
|
||||
m.SyncAllWireguard,
|
||||
m.SyncAllCeph,
|
||||
m.SyncAllRegions,
|
||||
m.WriteHosts,
|
||||
} {
|
||||
if err := fn(); err != nil {
|
||||
@@ -58,3 +59,24 @@ func (m *Cluster) SyncAllCeph() error {
|
||||
|
||||
return m.WriteCephConfig(cfg)
|
||||
}
|
||||
|
||||
// SyncAllRegions rewrites all region data
|
||||
func (m *Cluster) SyncAllRegions() error {
|
||||
var err error
|
||||
|
||||
m.ForEachZone(func(z *Zone) bool {
|
||||
err := z.SyncRegions()
|
||||
return err != nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m.ForEachRegion(func(r *Region) bool {
|
||||
err = r.SyncRegions()
|
||||
return err != nil
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user