package broadcast import ( "encoding/json" "fmt" "goseg/config" "goseg/docker" "goseg/startram" "goseg/structs" "goseg/system" "log/slog" "math" "os" "reflect" "strings" "sync" "github.com/gorilla/websocket" ) var ( logger = slog.New(slog.NewJSONHandler(os.Stdout, nil)) clients = make(map[*websocket.Conn]bool) broadcastState structs.AuthBroadcast mu sync.RWMutex // synchronize access to broadcastState ) func init() { // initialize broadcastState global var config := config.Conf() broadcast, err := bootstrapBroadcastState(config) if err != nil { errmsg := fmt.Sprintf("Unable to initialize broadcast: %v", err) panic(errmsg) } broadcastState = broadcast } // adds ws client func RegisterClient(conn *websocket.Conn) { clients[conn] = true broadcastJson, err := GetStateJson() if err != nil { return } // when a new ws client registers, send them the current broadcast if err := conn.WriteMessage(websocket.TextMessage, broadcastJson); err != nil { fmt.Println("Error writing response:", err) return } } // remove ws client func UnregisterClient(conn *websocket.Conn) { delete(clients, conn) } // take in config file and addt'l info to initialize broadcast func bootstrapBroadcastState(config structs.SysConfig) (structs.AuthBroadcast, error) { logger.Info("Bootstrapping state") var res structs.AuthBroadcast // get a list of piers from config piers := config.Piers // this returns a map of ship:running status logger.Info("Resolving pier status") updates, err := constructPierInfo(piers) if err != nil { return res, err } // update broadcastState err = UpdateBroadcastState(map[string]interface{}{ "Urbits": updates, }) if err != nil { errmsg := fmt.Sprintf("Unable to update broadcast state: %v", err) logger.Error(errmsg) return res, err } // wgRegistered := config.WgRegistered // wgOn := config.WgOn // get startram regions logger.Info("Retrieving StarTram region info") regions, err := startram.GetRegions() if err != nil { logger.Warn("Couldn't get StarTram regions") } else { updates := map[string]interface{}{ "Profile": map[string]interface{}{ "Startram": map[string]interface{}{ "Info": map[string]interface{}{ "Regions": regions, }, }, }, } err := UpdateBroadcastState(updates) if err != nil { errmsg := fmt.Sprintf("Error updating broadcast state:", err) logger.Error(errmsg) } } // update with system state sysInfo := constructSystemInfo() err = UpdateBroadcastState(sysInfo) if err != nil { errmsg := fmt.Sprintf("Error updating broadcast state:", err) logger.Error(errmsg) } // return the boostrapped result res = GetState() return res, nil } // this is for building the broadcast objects describing piers func constructPierInfo(piers []string) (map[string]structs.Urbit, error) { updates := make(map[string]structs.Urbit) // load fresh broadcast state currentState := GetState() // get the networks containers are attached to shipNetworks := GetContainerNetworks(piers) // find out whether they're running pierStatus, err := docker.GetShipStatus(piers) if err != nil { errmsg := fmt.Sprintf("Unable to bootstrap urbit states: %v", err) logger.Error(errmsg) return updates, err } hostName, err := os.Hostname() if err != nil { errmsg := fmt.Sprintf("Error getting hostname, defaulting to `nativeplanet`: %v", err) logger.Warn(errmsg) hostName = "nativeplanet" } // convert the running status into bools for pier, status := range pierStatus { // pull urbit info from json err := docker.LoadConfig(pier) if err != nil { errmsg := fmt.Sprintf("Unable to load %s config: %v", pier, err) logger.Error(errmsg) continue } dockerConfig := docker.Conf(pier) // get container stats from docker var dockerStats structs.ContainerStats dockerStats, err = docker.GetContainerStats(pier) if err != nil { errmsg := fmt.Sprintf("Unable to load %s stats: %v", pier, err) logger.Error(errmsg) continue } urbit := structs.Urbit{} if existingUrbit, exists := currentState.Urbits[pier]; exists { // If the ship already exists in broadcastState, use its current state urbit = existingUrbit } isRunning := (status == "Up" || strings.HasPrefix(status, "Up ")) bootStatus := true if dockerConfig.BootStatus == "ignore" { bootStatus = false } setRemote := false if dockerConfig.Network == "wireguard" { setRemote = true } // collate all the info from our sources into the struct urbit.Info.Running = isRunning urbit.Info.Network = shipNetworks[pier] urbit.Info.URL = fmt.Sprintf("http://%s.local:%d", hostName, dockerConfig.HTTPPort) urbit.Info.LoomSize = int(math.Pow(2, float64(dockerConfig.LoomSize)) / math.Pow(1024, 2)) urbit.Info.DiskUsage = dockerStats.DiskUsage urbit.Info.MemUsage = dockerStats.MemoryUsage urbit.Info.DevMode = dockerConfig.DevMode urbit.Info.Vere = dockerConfig.UrbitVersion urbit.Info.DetectBootStatus = bootStatus urbit.Info.Remote = setRemote urbit.Info.Vere = dockerConfig.UrbitVersion // and insert the struct into the map we will use as input for the broadcast struct updates[pier] = urbit } return updates, nil } // put together the system[usage] subobject func constructSystemInfo() map[string]interface{} { var res map[string]interface{} var ramObj []uint64 var diskObj []uint64 usedRam, totalRam := system.GetMemory() ramObj = append(ramObj,usedRam,totalRam) cpuUsage := system.GetCPU() usedDisk, freeDisk := system.GetDisk() diskObj = append(diskObj,usedDisk,freeDisk) res = map[string]interface{}{ "system":map[string]interface{}{ "usage":map[string]interface{}{ "ram":ramObj, "cpu":cpuUsage, "disk":diskObj, }, }, } return res } // return a map of ships and their networks func GetContainerNetworks(containers []string) map[string]string { res := make(map[string]string) for _, container := range containers { network, err := docker.GetContainerNetwork(container) if err != nil { errmsg := fmt.Sprintf("Error getting container network: %v", err) logger.Error(errmsg) continue } else { res[container] = network } } return res } // update broadcastState with a map of items func UpdateBroadcastState(values map[string]interface{}) error { mu.Lock() defer mu.Unlock() v := reflect.ValueOf(&broadcastState).Elem() for key, value := range values { field := v.FieldByName(key) if !field.IsValid() || !field.CanSet() { return fmt.Errorf("field %s does not exist or is not settable", key) } val := reflect.ValueOf(value) if val.Kind() == reflect.Interface { val = val.Elem() // Extract the underlying value from the interface } if err := recursiveUpdate(field, val); err != nil { return err } } BroadcastToClients() return nil } // this allows us to insert stuff into nested structs/keys and not overwrite the existing contents func recursiveUpdate(dst, src reflect.Value) error { if !dst.CanSet() { return fmt.Errorf("field is not settable") } // If dst is a struct and src is a map, handle them field by field if dst.Kind() == reflect.Struct && src.Kind() == reflect.Map { for _, key := range src.MapKeys() { dstField := dst.FieldByName(key.String()) if !dstField.IsValid() { return fmt.Errorf("field %s does not exist in the struct", key.String()) } // Initialize the map if it's nil and we're trying to set a map if dstField.Kind() == reflect.Map && dstField.IsNil() && src.MapIndex(key).Kind() == reflect.Map { dstField.Set(reflect.MakeMap(dstField.Type())) } if !dstField.CanSet() { return fmt.Errorf("field %s is not settable in the struct", key.String()) } srcVal := src.MapIndex(key) if srcVal.Kind() == reflect.Interface { srcVal = srcVal.Elem() } if err := recursiveUpdate(dstField, srcVal); err != nil { return err } } return nil } // If both dst and src are maps, handle them recursively if dst.Kind() == reflect.Map && src.Kind() == reflect.Map { for _, key := range src.MapKeys() { srcVal := src.MapIndex(key) // If the key doesn't exist in dst, initialize it dstVal := dst.MapIndex(key) if !dstVal.IsValid() { dstVal = reflect.New(dst.Type().Elem()).Elem() } // Recursive call to handle potential nested maps or structs if err := recursiveUpdate(dstVal, srcVal); err != nil { return err } // Initialize the map if it's nil if dst.IsNil() { dst.Set(reflect.MakeMap(dst.Type())) } dst.SetMapIndex(key, dstVal) } return nil } // For non-map or non-struct fields, or for direct updates if dst.Type() != src.Type() { return fmt.Errorf("type mismatch: expected %s, got %s", dst.Type(), src.Type()) } dst.Set(src) return nil } // return broadcast state func GetState() structs.AuthBroadcast { mu.Lock() defer mu.Unlock() return broadcastState } // return json string of current broadcast state func GetStateJson() ([]byte, error) { mu.Lock() defer mu.Unlock() broadcastJson, err := json.Marshal(broadcastState) if err != nil { errmsg := fmt.Sprintf("Error marshalling response: %v", err) logger.Error(errmsg) return nil, err } return broadcastJson, nil } // broadcast the global state to all clients func BroadcastToClients() error { broadcastJson, err := json.Marshal(broadcastState) if err != nil { logger.Error("Error marshalling response:", err) return err } for client := range clients { if err := client.WriteMessage(websocket.TextMessage, broadcastJson); err != nil { logger.Error("Error writing response:", err) return err } } return nil }