Browse Source

cluster: rename Zones to Cluster

Signed-off-by: Alejandro Mery <amery@jpi.io>
Alejandro Mery 10 months ago
parent
commit
36f1619d69
  1. 2
      cmd/jpictl/config.go
  2. 8
      pkg/cluster/ceph.go
  3. 2
      pkg/cluster/ceph_scan.go
  4. 72
      pkg/cluster/cluster.go
  5. 10
      pkg/cluster/cluster_file.go
  6. 14
      pkg/cluster/cluster_scan.go
  7. 22
      pkg/cluster/cluster_scan_options.go
  8. 2
      pkg/cluster/env.go
  9. 14
      pkg/cluster/log.go
  10. 6
      pkg/cluster/sync.go
  11. 16
      pkg/cluster/wireguard.go
  12. 70
      pkg/cluster/zones.go

2
cmd/jpictl/config.go

@ -14,7 +14,7 @@ var cfg = &Config{
} }
// LoadZones loads all zones and machines in the config directory // LoadZones loads all zones and machines in the config directory
func (cfg *Config) LoadZones(resolve bool) (*cluster.Zones, error) { func (cfg *Config) LoadZones(resolve bool) (*cluster.Cluster, error) {
return cluster.New(cfg.Base, cfg.Domain, return cluster.New(cfg.Base, cfg.Domain,
cluster.ResolvePublicAddresses(resolve), cluster.ResolvePublicAddresses(resolve),
cluster.WithLogger(log), cluster.WithLogger(log),

8
pkg/cluster/ceph.go

@ -12,7 +12,7 @@ import (
) )
// GetCephFSID returns our Ceph's FSID // GetCephFSID returns our Ceph's FSID
func (m *Zones) GetCephFSID() (uuid.UUID, error) { func (m *Cluster) GetCephFSID() (uuid.UUID, error) {
if core.IsZero(m.CephFSID) { if core.IsZero(m.CephFSID) {
// generate one // generate one
v, err := uuid.NewV4() v, err := uuid.NewV4()
@ -25,7 +25,7 @@ func (m *Zones) GetCephFSID() (uuid.UUID, error) {
} }
// GetCephConfig reads the ceph.conf file // GetCephConfig reads the ceph.conf file
func (m *Zones) GetCephConfig() (*ceph.Config, error) { func (m *Cluster) GetCephConfig() (*ceph.Config, error) {
data, err := m.ReadFile("ceph.conf") data, err := m.ReadFile("ceph.conf")
if err != nil { if err != nil {
return nil, err return nil, err
@ -36,7 +36,7 @@ func (m *Zones) GetCephConfig() (*ceph.Config, error) {
} }
// WriteCephConfig writes the ceph.conf file // WriteCephConfig writes the ceph.conf file
func (m *Zones) WriteCephConfig(cfg *ceph.Config) error { func (m *Cluster) WriteCephConfig(cfg *ceph.Config) error {
f, err := m.CreateTruncFile("ceph.conf") f, err := m.CreateTruncFile("ceph.conf")
if err != nil { if err != nil {
return err return err
@ -48,7 +48,7 @@ func (m *Zones) WriteCephConfig(cfg *ceph.Config) error {
} }
// GenCephConfig prepares a ceph.Config using the cluster information // GenCephConfig prepares a ceph.Config using the cluster information
func (m *Zones) GenCephConfig() (*ceph.Config, error) { func (m *Cluster) GenCephConfig() (*ceph.Config, error) {
fsid, err := m.GetCephFSID() fsid, err := m.GetCephFSID()
if err != nil { if err != nil {
return nil, err return nil, err

2
pkg/cluster/ceph_scan.go

@ -71,7 +71,7 @@ func newCephScanTODO(cfg *ceph.Config) *cephScanTODO {
return todo return todo
} }
func (m *Zones) scanCephMonitors(_ *ScanOptions) error { func (m *Cluster) scanCephMonitors(_ *ScanOptions) error {
cfg, err := m.GetCephConfig() cfg, err := m.GetCephConfig()
switch { switch {
case os.IsNotExist(err): case os.IsNotExist(err):

72
pkg/cluster/cluster.go

@ -1,2 +1,74 @@
// Package cluster contains information about the cluster // Package cluster contains information about the cluster
package cluster package cluster
import (
"io/fs"
"darvaza.org/resolver"
"darvaza.org/slog"
"github.com/gofrs/uuid/v5"
)
var (
_ MachineIterator = (*Cluster)(nil)
_ ZoneIterator = (*Cluster)(nil)
)
// revive:disable:line-length-limit
// Cluster represents all zones in a cluster
type Cluster struct {
dir fs.FS
log slog.Logger
resolver resolver.Resolver
domain string
CephFSID uuid.UUID `json:"ceph_fsid,omitempty" yaml:"ceph_fsid,omitempty"`
Zones []*Zone
}
// revive:enable:line-length-limit
// ForEachMachine calls a function for each Machine in the cluster
// until instructed to terminate the loop
func (m *Cluster) ForEachMachine(fn func(*Machine) bool) {
m.ForEachZone(func(z *Zone) bool {
var term bool
z.ForEachMachine(func(p *Machine) bool {
term = fn(p)
return term
})
return term
})
}
// ForEachZone calls a function for each Zone in the cluster
// until instructed to terminate the loop
func (m *Cluster) ForEachZone(fn func(*Zone) bool) {
for _, p := range m.Zones {
if fn(p) {
// terminate
return
}
}
}
// GetMachineByName looks for a machine with the specified
// name on any zone
func (m *Cluster) GetMachineByName(name string) (*Machine, bool) {
var out *Machine
if name != "" {
m.ForEachMachine(func(p *Machine) bool {
if p.Name == name {
out = p
}
return out != nil
})
}
return out, out != nil
}

10
pkg/cluster/zones_file.go → pkg/cluster/cluster_file.go

@ -9,7 +9,7 @@ import (
) )
// OpenFile opens a file on the cluster's config directory with the specified flags // OpenFile opens a file on the cluster's config directory with the specified flags
func (m *Zones) OpenFile(name string, flags int, args ...any) (fs.File, error) { func (m *Cluster) OpenFile(name string, flags int, args ...any) (fs.File, error) {
if len(args) > 0 { if len(args) > 0 {
name = fmt.Sprintf(name, args...) name = fmt.Sprintf(name, args...)
} }
@ -18,16 +18,16 @@ func (m *Zones) OpenFile(name string, flags int, args ...any) (fs.File, error) {
} }
// CreateTruncFile creates or truncates a file on the cluster's config directory // CreateTruncFile creates or truncates a file on the cluster's config directory
func (m *Zones) CreateTruncFile(name string, args ...any) (io.WriteCloser, error) { func (m *Cluster) CreateTruncFile(name string, args ...any) (io.WriteCloser, error) {
return m.openWriter(name, os.O_CREATE|os.O_TRUNC, args...) return m.openWriter(name, os.O_CREATE|os.O_TRUNC, args...)
} }
// CreateFile creates a file on the cluster's config directory // CreateFile creates a file on the cluster's config directory
func (m *Zones) CreateFile(name string, args ...any) (io.WriteCloser, error) { func (m *Cluster) CreateFile(name string, args ...any) (io.WriteCloser, error) {
return m.openWriter(name, os.O_CREATE, args...) return m.openWriter(name, os.O_CREATE, args...)
} }
func (m *Zones) openWriter(name string, flags int, args ...any) (io.WriteCloser, error) { func (m *Cluster) openWriter(name string, flags int, args ...any) (io.WriteCloser, error) {
f, err := m.OpenFile(name, os.O_WRONLY|flags, args...) f, err := m.OpenFile(name, os.O_WRONLY|flags, args...)
if err != nil { if err != nil {
return nil, err return nil, err
@ -37,7 +37,7 @@ func (m *Zones) openWriter(name string, flags int, args ...any) (io.WriteCloser,
} }
// ReadFile reads a file from the cluster's config directory // ReadFile reads a file from the cluster's config directory
func (m *Zones) ReadFile(name string, args ...any) ([]byte, error) { func (m *Cluster) ReadFile(name string, args ...any) ([]byte, error) {
if len(args) > 0 { if len(args) > 0 {
name = fmt.Sprintf(name, args...) name = fmt.Sprintf(name, args...)
} }

14
pkg/cluster/scan.go → pkg/cluster/cluster_scan.go

@ -7,7 +7,7 @@ import (
"darvaza.org/core" "darvaza.org/core"
) )
func (m *Zones) scan(opts *ScanOptions) error { func (m *Cluster) scan(opts *ScanOptions) error {
for _, fn := range []func(*ScanOptions) error{ for _, fn := range []func(*ScanOptions) error{
m.scanDirectory, m.scanDirectory,
m.scanMachines, m.scanMachines,
@ -24,7 +24,7 @@ func (m *Zones) scan(opts *ScanOptions) error {
return nil return nil
} }
func (m *Zones) scanDirectory(_ *ScanOptions) error { func (m *Cluster) scanDirectory(_ *ScanOptions) error {
// each directory is a zone // each directory is a zone
entries, err := fs.ReadDir(m.dir, ".") entries, err := fs.ReadDir(m.dir, ".")
if err != nil { if err != nil {
@ -50,7 +50,7 @@ func (m *Zones) scanDirectory(_ *ScanOptions) error {
return nil return nil
} }
func (m *Zones) newZone(name string) (*Zone, error) { func (m *Cluster) newZone(name string) (*Zone, error) {
z := &Zone{ z := &Zone{
zones: m, zones: m,
logger: m, logger: m,
@ -67,7 +67,7 @@ func (m *Zones) newZone(name string) (*Zone, error) {
return z, nil return z, nil
} }
func (m *Zones) scanMachines(opts *ScanOptions) error { func (m *Cluster) scanMachines(opts *ScanOptions) error {
var err error var err error
m.ForEachMachine(func(p *Machine) bool { m.ForEachMachine(func(p *Machine) bool {
err = p.scan(opts) err = p.scan(opts)
@ -76,7 +76,7 @@ func (m *Zones) scanMachines(opts *ScanOptions) error {
return err return err
} }
func (m *Zones) scanZoneIDs(_ *ScanOptions) error { func (m *Cluster) scanZoneIDs(_ *ScanOptions) error {
var hasMissing bool var hasMissing bool
var lastZoneID int var lastZoneID int
@ -106,7 +106,7 @@ func (m *Zones) scanZoneIDs(_ *ScanOptions) error {
return nil return nil
} }
func (m *Zones) scanSort(_ *ScanOptions) error { func (m *Cluster) scanSort(_ *ScanOptions) error {
sort.SliceStable(m.Zones, func(i, j int) bool { sort.SliceStable(m.Zones, func(i, j int) bool {
id1 := m.Zones[i].ID id1 := m.Zones[i].ID
id2 := m.Zones[j].ID id2 := m.Zones[j].ID
@ -132,7 +132,7 @@ func (m *Zones) scanSort(_ *ScanOptions) error {
return nil return nil
} }
func (m *Zones) scanGateways(_ *ScanOptions) error { func (m *Cluster) scanGateways(_ *ScanOptions) error {
var err error var err error
m.ForEachZone(func(z *Zone) bool { m.ForEachZone(func(z *Zone) bool {

22
pkg/cluster/options.go → pkg/cluster/cluster_scan_options.go

@ -11,7 +11,7 @@ import (
) )
// A ScanOption pre-configures the Zones before scanning // A ScanOption pre-configures the Zones before scanning
type ScanOption func(*Zones, *ScanOptions) error type ScanOption func(*Cluster, *ScanOptions) error
// ScanOptions contains flags used by the initial scan // ScanOptions contains flags used by the initial scan
type ScanOptions struct { type ScanOptions struct {
@ -29,7 +29,7 @@ type ScanOptions struct {
// the DNS resolver to get PublicAddresses of nodes. // the DNS resolver to get PublicAddresses of nodes.
// Default is true // Default is true
func ResolvePublicAddresses(resolve bool) ScanOption { func ResolvePublicAddresses(resolve bool) ScanOption {
return func(m *Zones, opt *ScanOptions) error { return func(m *Cluster, opt *ScanOptions) error {
opt.DontResolvePublicAddresses = !resolve opt.DontResolvePublicAddresses = !resolve
return nil return nil
} }
@ -38,7 +38,7 @@ func ResolvePublicAddresses(resolve bool) ScanOption {
// WithLookuper specifies what resolver.Lookuper to use to // WithLookuper specifies what resolver.Lookuper to use to
// find public addresses // find public addresses
func WithLookuper(h resolver.Lookuper) ScanOption { func WithLookuper(h resolver.Lookuper) ScanOption {
return func(m *Zones, opt *ScanOptions) error { return func(m *Cluster, opt *ScanOptions) error {
if h == nil { if h == nil {
return fs.ErrInvalid return fs.ErrInvalid
} }
@ -51,7 +51,7 @@ func WithLookuper(h resolver.Lookuper) ScanOption {
// public addresses. if nil is passed, the [net.Resolver] will be used. // public addresses. if nil is passed, the [net.Resolver] will be used.
// The default is using Cloudflare's 1.1.1.1. // The default is using Cloudflare's 1.1.1.1.
func WithResolver(h resolver.Resolver) ScanOption { func WithResolver(h resolver.Resolver) ScanOption {
return func(m *Zones, opt *ScanOptions) error { return func(m *Cluster, opt *ScanOptions) error {
if h == nil { if h == nil {
h = resolver.SystemResolver(true) h = resolver.SystemResolver(true)
} }
@ -63,7 +63,7 @@ func WithResolver(h resolver.Resolver) ScanOption {
// WithLogger specifies what to use for logging // WithLogger specifies what to use for logging
func WithLogger(log slog.Logger) ScanOption { func WithLogger(log slog.Logger) ScanOption {
return func(m *Zones, opt *ScanOptions) error { return func(m *Cluster, opt *ScanOptions) error {
if log == nil { if log == nil {
log = discard.New() log = discard.New()
} }
@ -74,7 +74,7 @@ func WithLogger(log slog.Logger) ScanOption {
} }
} }
func (m *Zones) setDefaults(opt *ScanOptions) error { func (m *Cluster) setDefaults(opt *ScanOptions) error {
if m.resolver == nil { if m.resolver == nil {
h := resolver.NewCloudflareLookuper() h := resolver.NewCloudflareLookuper()
@ -92,11 +92,11 @@ func (m *Zones) setDefaults(opt *ScanOptions) error {
return nil return nil
} }
// NewFS builds a [Zones] tree using the given directory // NewFS builds a [Cluster] tree using the given directory
func NewFS(dir fs.FS, domain string, opts ...ScanOption) (*Zones, error) { func NewFS(dir fs.FS, domain string, opts ...ScanOption) (*Cluster, error) {
var scanOptions ScanOptions var scanOptions ScanOptions
z := &Zones{ z := &Cluster{
dir: dir, dir: dir,
domain: domain, domain: domain,
} }
@ -118,8 +118,8 @@ func NewFS(dir fs.FS, domain string, opts ...ScanOption) (*Zones, error) {
return z, nil return z, nil
} }
// New builds a [Zones] tree using the given directory // New builds a [Cluster] tree using the given directory
func New(dir, domain string, opts ...ScanOption) (*Zones, error) { func New(dir, domain string, opts ...ScanOption) (*Cluster, error) {
dir, err := filepath.Abs(dir) dir, err := filepath.Abs(dir)
if err != nil { if err != nil {
return nil, err return nil, err

2
pkg/cluster/env.go

@ -16,7 +16,7 @@ type Env struct {
} }
// Env returns a shell environment factory // Env returns a shell environment factory
func (m *Zones) Env(export bool) (*Env, error) { func (m *Cluster) Env(export bool) (*Env, error) {
fsid, err := m.GetCephFSID() fsid, err := m.GetCephFSID()
if err != nil { if err != nil {
return nil, err return nil, err

14
pkg/cluster/log.go

@ -13,26 +13,26 @@ type logger interface {
} }
var ( var (
_ logger = (*Zones)(nil) _ logger = (*Cluster)(nil)
) )
func (z *Zones) withDebug() (slog.Logger, bool) { func (z *Cluster) withDebug() (slog.Logger, bool) {
return z.debug().WithEnabled() return z.debug().WithEnabled()
} }
func (z *Zones) withInfo() (slog.Logger, bool) { func (z *Cluster) withInfo() (slog.Logger, bool) {
return z.debug().WithEnabled() return z.debug().WithEnabled()
} }
func (z *Zones) debug() slog.Logger { func (z *Cluster) debug() slog.Logger {
return z.log.Debug() return z.log.Debug()
} }
func (z *Zones) info() slog.Logger { func (z *Cluster) info() slog.Logger {
return z.log.Info() return z.log.Info()
} }
func (z *Zones) warn(err error) slog.Logger { func (z *Cluster) warn(err error) slog.Logger {
l := z.log.Warn() l := z.log.Warn()
if err != nil { if err != nil {
l = l.WithField(slog.ErrorFieldName, err) l = l.WithField(slog.ErrorFieldName, err)
@ -40,7 +40,7 @@ func (z *Zones) warn(err error) slog.Logger {
return l return l
} }
func (z *Zones) error(err error) slog.Logger { func (z *Cluster) error(err error) slog.Logger {
l := z.log.Error() l := z.log.Error()
if err != nil { if err != nil {
l = l.WithField(slog.ErrorFieldName, err) l = l.WithField(slog.ErrorFieldName, err)

6
pkg/cluster/sync.go

@ -1,7 +1,7 @@
package cluster package cluster
// SyncAll updates all config files // SyncAll updates all config files
func (m *Zones) SyncAll() error { func (m *Cluster) SyncAll() error {
for _, fn := range []func() error{ for _, fn := range []func() error{
m.SyncAllWireguard, m.SyncAllWireguard,
m.SyncAllCeph, m.SyncAllCeph,
@ -15,7 +15,7 @@ func (m *Zones) SyncAll() error {
} }
// SyncAllWireguard updates all wireguard config files // SyncAllWireguard updates all wireguard config files
func (m *Zones) SyncAllWireguard() error { func (m *Cluster) SyncAllWireguard() error {
var err error var err error
for ring := 0; ring < RingsCount; ring++ { for ring := 0; ring < RingsCount; ring++ {
@ -34,7 +34,7 @@ func (m *Zones) SyncAllWireguard() error {
} }
// SyncAllCeph updates the ceph.conf file // SyncAllCeph updates the ceph.conf file
func (m *Zones) SyncAllCeph() error { func (m *Cluster) SyncAllCeph() error {
cfg, err := m.GenCephConfig() cfg, err := m.GenCephConfig()
if err != nil { if err != nil {
return err return err

16
pkg/cluster/wireguard.go

@ -6,19 +6,19 @@ import (
) )
var ( var (
_ WireguardConfigPruner = (*Zones)(nil) _ WireguardConfigPruner = (*Cluster)(nil)
_ WireguardConfigPruner = (*Zone)(nil) _ WireguardConfigPruner = (*Zone)(nil)
_ WireguardConfigPruner = (*Machine)(nil) _ WireguardConfigPruner = (*Machine)(nil)
_ WireguardConfigWriter = (*Zones)(nil) _ WireguardConfigWriter = (*Cluster)(nil)
_ WireguardConfigWriter = (*Zone)(nil) _ WireguardConfigWriter = (*Zone)(nil)
_ WireguardConfigWriter = (*Machine)(nil) _ WireguardConfigWriter = (*Machine)(nil)
_ WireguardConfigSyncer = (*Zones)(nil) _ WireguardConfigSyncer = (*Cluster)(nil)
_ WireguardConfigSyncer = (*Zone)(nil) _ WireguardConfigSyncer = (*Zone)(nil)
_ WireguardConfigSyncer = (*Machine)(nil) _ WireguardConfigSyncer = (*Machine)(nil)
_ WireguardKeysWriter = (*Zones)(nil) _ WireguardKeysWriter = (*Cluster)(nil)
_ WireguardKeysWriter = (*Zone)(nil) _ WireguardKeysWriter = (*Zone)(nil)
_ WireguardKeysWriter = (*Machine)(nil) _ WireguardKeysWriter = (*Machine)(nil)
) )
@ -31,7 +31,7 @@ type WireguardConfigPruner interface {
// PruneWireguardConfig removes wgN.conf files of machines with // PruneWireguardConfig removes wgN.conf files of machines with
// the corresponding ring disabled on all zones // the corresponding ring disabled on all zones
func (m *Zones) PruneWireguardConfig(ring int) error { func (m *Cluster) PruneWireguardConfig(ring int) error {
return pruneWireguardConfig(m, ring) return pruneWireguardConfig(m, ring)
} }
@ -76,7 +76,7 @@ type WireguardConfigWriter interface {
// WriteWireguardConfig rewrites all wgN.conf on all machines // WriteWireguardConfig rewrites all wgN.conf on all machines
// attached to that ring // attached to that ring
func (m *Zones) WriteWireguardConfig(ring int) error { func (m *Cluster) WriteWireguardConfig(ring int) error {
switch ring { switch ring {
case 0: case 0:
return writeWireguardConfig(m, m, ring) return writeWireguardConfig(m, m, ring)
@ -154,7 +154,7 @@ type WireguardConfigSyncer interface {
// SyncWireguardConfig updates all wgN.conf files for the specified // SyncWireguardConfig updates all wgN.conf files for the specified
// ring // ring
func (m *Zones) SyncWireguardConfig(ring int) error { func (m *Cluster) SyncWireguardConfig(ring int) error {
switch ring { switch ring {
case 0: case 0:
return syncWireguardConfig(m, m, ring) return syncWireguardConfig(m, m, ring)
@ -214,7 +214,7 @@ type WireguardKeysWriter interface {
} }
// WriteWireguardKeys rewrites all wgN.{key,pub} files // WriteWireguardKeys rewrites all wgN.{key,pub} files
func (m *Zones) WriteWireguardKeys(ring int) error { func (m *Cluster) WriteWireguardKeys(ring int) error {
return writeWireguardKeys(m, ring) return writeWireguardKeys(m, ring)
} }

70
pkg/cluster/zones.go

@ -2,16 +2,10 @@ package cluster
import ( import (
"io/fs" "io/fs"
"darvaza.org/resolver"
"darvaza.org/slog"
"github.com/gofrs/uuid/v5"
) )
var ( var (
_ MachineIterator = (*Zone)(nil) _ MachineIterator = (*Zone)(nil)
_ MachineIterator = (*Zones)(nil)
_ ZoneIterator = (*Zones)(nil)
) )
// A ZoneIterator is a set of Zones we can iterate on // A ZoneIterator is a set of Zones we can iterate on
@ -19,9 +13,10 @@ type ZoneIterator interface {
ForEachZone(func(*Zone) bool) ForEachZone(func(*Zone) bool)
} }
// Zone represents one zone in a cluster // A Zone is a set of machines in close proximity and strong
// affinity.
type Zone struct { type Zone struct {
zones *Zones zones *Cluster
logger `json:"-" yaml:"-"` logger `json:"-" yaml:"-"`
ID int ID int
@ -71,62 +66,3 @@ func (z *Zone) GatewayIDs() ([]int, int) {
return out, len(out) return out, len(out)
} }
// revive:disable:line-length-limit
// Zones represents all zones in a cluster
type Zones struct {
dir fs.FS
log slog.Logger
resolver resolver.Resolver
domain string
CephFSID uuid.UUID `json:"ceph_fsid,omitempty" yaml:"ceph_fsid,omitempty"`
Zones []*Zone
}
// revive:enable:line-length-limit
// ForEachMachine calls a function for each Machine in the cluster
// until instructed to terminate the loop
func (m *Zones) ForEachMachine(fn func(*Machine) bool) {
m.ForEachZone(func(z *Zone) bool {
var term bool
z.ForEachMachine(func(p *Machine) bool {
term = fn(p)
return term
})
return term
})
}
// ForEachZone calls a function for each Zone in the cluster
// until instructed to terminate the loop
func (m *Zones) ForEachZone(fn func(*Zone) bool) {
for _, p := range m.Zones {
if fn(p) {
// terminate
return
}
}
}
// GetMachineByName looks for a machine with the specified
// name on any zone
func (m *Zones) GetMachineByName(name string) (*Machine, bool) {
var out *Machine
if name != "" {
m.ForEachMachine(func(p *Machine) bool {
if p.Name == name {
out = p
}
return out != nil
})
}
return out, out != nil
}

Loading…
Cancel
Save