package broadcast import ( "encoding/json" "fmt" "goseg/auth" "goseg/config" "goseg/docker" "goseg/startram" "goseg/structs" "goseg/system" "math" "os" "reflect" "strings" "sync" "time" "github.com/gorilla/websocket" ) var ( clients = make(map[*websocket.Conn]bool) hostInfoInterval = 3 * time.Second // how often we refresh system info shipInfoInterval = 3 * time.Second // how often we refresh ship info broadcastState structs.AuthBroadcast unauthState structs.UnauthBroadcast mu sync.RWMutex // synchronize access to broadcastState ) func init() { // initialize broadcastState global var conf := config.Conf() broadcast, err := bootstrapBroadcastState(conf) if err != nil { errmsg := fmt.Sprintf("Unable to initialize broadcast: %v", err) panic(errmsg) } broadcastState = broadcast } // adds ws client func RegisterClient(conn *websocket.Conn) { clients[conn] = true broadcastJson, err := GetStateJson() if err != nil { return } // when a new ws client registers, send them the current broadcast if err := conn.WriteMessage(websocket.TextMessage, broadcastJson); err != nil { fmt.Println("Error writing response:", err) return } } // remove ws client func UnregisterClient(conn *websocket.Conn) { delete(clients, conn) } // take in config file and addt'l info to initialize broadcast func bootstrapBroadcastState(conf structs.SysConfig) (structs.AuthBroadcast, error) { config.Logger.Info("Bootstrapping state") var res structs.AuthBroadcast // get a list of piers from config piers := conf.Piers // this returns a map of ship:running status config.Logger.Info("Resolving pier status") updates, err := constructPierInfo(piers) if err != nil { return res, err } // update broadcastState err = UpdateBroadcastState(map[string]interface{}{ "Urbits": updates, }) if err != nil { errmsg := fmt.Sprintf("Unable to update broadcast state: %v", err) config.Logger.Error(errmsg) return res, err } // wgRegistered := config.WgRegistered // wgOn := config.WgOn // get startram regions config.Logger.Info("Retrieving StarTram region info") regions, err := startram.GetRegions() if err != nil { config.Logger.Warn("Couldn't get StarTram regions") } else { updates := map[string]interface{}{ "Profile": map[string]interface{}{ "Startram": map[string]interface{}{ "Info": map[string]interface{}{ "Regions": regions, }, }, }, } err := UpdateBroadcastState(updates) if err != nil { errmsg := fmt.Sprintf("Error updating broadcast state:", err) config.Logger.Error(errmsg) } } // update with system state sysInfo := constructSystemInfo() err = UpdateBroadcastState(sysInfo) if err != nil { errmsg := fmt.Sprintf("Error updating broadcast state:", err) config.Logger.Error(errmsg) } // start looping info refreshes go hostStatusLoop() go shipStatusLoop() // return the boostrapped result res = GetState() return res, nil } // this is for building the broadcast objects describing piers func constructPierInfo(piers []string) (map[string]structs.Urbit, error) { updates := make(map[string]structs.Urbit) // load fresh broadcast state currentState := GetState() // get the networks containers are attached to shipNetworks := GetContainerNetworks(piers) // find out whether they're running pierStatus, err := docker.GetShipStatus(piers) if err != nil { errmsg := fmt.Sprintf("Unable to bootstrap urbit states: %v", err) config.Logger.Error(errmsg) return updates, err } hostName, err := os.Hostname() if err != nil { errmsg := fmt.Sprintf("Error getting hostname, defaulting to `nativeplanet`: %v", err) config.Logger.Warn(errmsg) hostName = "nativeplanet" } // convert the running status into bools for pier, status := range pierStatus { // pull urbit info from json err := config.LoadUrbitConfig(pier) if err != nil { errmsg := fmt.Sprintf("Unable to load %s config: %v", pier, err) config.Logger.Error(errmsg) continue } dockerConfig := config.UrbitConf(pier) // get container stats from docker var dockerStats structs.ContainerStats dockerStats, err = docker.GetContainerStats(pier) if err != nil { errmsg := fmt.Sprintf("Unable to load %s stats: %v", pier, err) config.Logger.Error(errmsg) continue } urbit := structs.Urbit{} if existingUrbit, exists := currentState.Urbits[pier]; exists { // If the ship already exists in broadcastState, use its current state urbit = existingUrbit } isRunning := (status == "Up" || strings.HasPrefix(status, "Up ")) bootStatus := true if dockerConfig.BootStatus == "ignore" { bootStatus = false } setRemote := false if dockerConfig.Network == "wireguard" { setRemote = true } // collate all the info from our sources into the struct urbit.Info.Running = isRunning urbit.Info.Network = shipNetworks[pier] urbit.Info.URL = fmt.Sprintf("http://%s.local:%d", hostName, dockerConfig.HTTPPort) urbit.Info.LoomSize = int(math.Pow(2, float64(dockerConfig.LoomSize)) / math.Pow(1024, 2)) urbit.Info.DiskUsage = dockerStats.DiskUsage urbit.Info.MemUsage = dockerStats.MemoryUsage urbit.Info.DevMode = dockerConfig.DevMode urbit.Info.Vere = dockerConfig.UrbitVersion urbit.Info.DetectBootStatus = bootStatus urbit.Info.Remote = setRemote urbit.Info.Vere = dockerConfig.UrbitVersion // and insert the struct into the map we will use as input for the broadcast struct updates[pier] = urbit } return updates, nil } // put together the system[usage] subobject func constructSystemInfo() map[string]interface{} { var res map[string]interface{} var ramObj []uint64 var diskObj []uint64 usedRam, totalRam := system.GetMemory() ramObj = append(ramObj, usedRam, totalRam) cpuUsage := system.GetCPU() cpuTemp := system.GetTemp() usedDisk, freeDisk := system.GetDisk() diskObj = append(diskObj, usedDisk, freeDisk) swapVal := system.HasSwap() res = map[string]interface{}{ "System": map[string]interface{}{ "Usage": map[string]interface{}{ "RAM": ramObj, "CPU": cpuUsage, "CPUTemp": cpuTemp, "Disk": diskObj, "SwapFile": swapVal, }, }, } return res } // return a map of ships and their networks func GetContainerNetworks(containers []string) map[string]string { res := make(map[string]string) for _, container := range containers { network, err := docker.GetContainerNetwork(container) if err != nil { errmsg := fmt.Sprintf("Error getting container network: %v", err) config.Logger.Error(errmsg) continue } else { res[container] = network } } return res } // update broadcastState with a map of items func UpdateBroadcastState(values map[string]interface{}) error { mu.Lock() defer mu.Unlock() v := reflect.ValueOf(&broadcastState).Elem() for key, value := range values { field := v.FieldByName(key) if !field.IsValid() || !field.CanSet() { return fmt.Errorf("field %s does not exist or is not settable", key) } val := reflect.ValueOf(value) if val.Kind() == reflect.Interface { val = val.Elem() // Extract the underlying value from the interface } if err := recursiveUpdate(field, val); err != nil { return err } } BroadcastToClients() return nil } // this allows us to insert stuff into nested structs/keys and not overwrite the existing contents func recursiveUpdate(dst, src reflect.Value) error { if !dst.CanSet() { return fmt.Errorf("field is not settable") } // If dst is a struct and src is a map, handle them field by field if dst.Kind() == reflect.Struct && src.Kind() == reflect.Map { for _, key := range src.MapKeys() { dstField := dst.FieldByName(key.String()) if !dstField.IsValid() { return fmt.Errorf("field %s does not exist in the struct", key.String()) } // Initialize the map if it's nil and we're trying to set a map if dstField.Kind() == reflect.Map && dstField.IsNil() && src.MapIndex(key).Kind() == reflect.Map { dstField.Set(reflect.MakeMap(dstField.Type())) } if !dstField.CanSet() { return fmt.Errorf("field %s is not settable in the struct", key.String()) } srcVal := src.MapIndex(key) if srcVal.Kind() == reflect.Interface { srcVal = srcVal.Elem() } if err := recursiveUpdate(dstField, srcVal); err != nil { return err } } return nil } // If both dst and src are maps, handle them recursively if dst.Kind() == reflect.Map && src.Kind() == reflect.Map { for _, key := range src.MapKeys() { srcVal := src.MapIndex(key) // If the key doesn't exist in dst, initialize it dstVal := dst.MapIndex(key) if !dstVal.IsValid() { dstVal = reflect.New(dst.Type().Elem()).Elem() } // Recursive call to handle potential nested maps or structs if err := recursiveUpdate(dstVal, srcVal); err != nil { return err } // Initialize the map if it's nil if dst.IsNil() { dst.Set(reflect.MakeMap(dst.Type())) } dst.SetMapIndex(key, dstVal) } return nil } // For non-map or non-struct fields, or for direct updates if dst.Type() != src.Type() { return fmt.Errorf("type mismatch: expected %s, got %s", dst.Type(), src.Type()) } dst.Set(src) return nil } // return broadcast state func GetState() structs.AuthBroadcast { mu.Lock() defer mu.Unlock() return broadcastState } // return json string of current broadcast state func GetStateJson() ([]byte, error) { bState := GetState() broadcastJson, err := json.Marshal(bState) if err != nil { errmsg := fmt.Sprintf("Error marshalling response: %v", err) config.Logger.Error(errmsg) return nil, err } return broadcastJson, nil } // broadcast the global state to auth'd clients func BroadcastToClients() error { authJson, err := GetStateJson() if err != nil { errmsg := fmt.Errorf("Error marshalling auth broadcast:", err) return errmsg } auth.AuthenticatedClients.Lock() defer auth.AuthenticatedClients.Unlock() for client := range auth.AuthenticatedClients.Conns { if err := client.WriteMessage(websocket.TextMessage, authJson); err != nil { config.Logger.Error(fmt.Sprintf("Error writing response: %v", err)) return err } } return nil } // refresh loop for host info func hostStatusLoop() { ticker := time.NewTicker(hostInfoInterval) for { select { case <-ticker.C: update := constructSystemInfo() err := UpdateBroadcastState(update) if err != nil { config.Logger.Warn(fmt.Sprintf("Error updating system status: %v", err)) } } } } // refresh loop for ship info func shipStatusLoop() { ticker := time.NewTicker(hostInfoInterval) for { select { case <-ticker.C: conf := config.Conf() piers := conf.Piers updates, err := constructPierInfo(piers) if err != nil { errmsg := fmt.Sprintf("Unable to build pier info: %v",err) config.Logger.Warn(errmsg) } // update broadcastState err = UpdateBroadcastState(map[string]interface{}{ "Urbits": updates, }) if err != nil { errmsg := fmt.Sprintf("Unable to update ship state: %v", err) config.Logger.Error(errmsg) } } } }