package broadcast import ( "encoding/json" "fmt" "goseg/config" "goseg/docker" "goseg/startram" "goseg/structs" "log/slog" "math" "os" "reflect" "strings" "sync" "github.com/gorilla/websocket" ) var ( logger = slog.New(slog.NewJSONHandler(os.Stdout, nil)) clients = make(map[*websocket.Conn]bool) broadcastState structs.AuthBroadcast mu sync.RWMutex // synchronize access to broadcastState ) func init() { // initialize broadcastState global var config := config.Conf() broadcast, err := bootstrapBroadcastState(config) if err != nil { errmsg := fmt.Sprintf("Unable to initialize broadcast: %v", err) panic(errmsg) } broadcastState = broadcast } // adds ws client func RegisterClient(conn *websocket.Conn) { clients[conn] = true broadcastJson, err := GetStateJson() if err != nil { return } // when a new ws client registers, send them the current broadcast if err := conn.WriteMessage(websocket.TextMessage, broadcastJson); err != nil { fmt.Println("Error writing response:", err) return } } // remove ws client func UnregisterClient(conn *websocket.Conn) { delete(clients, conn) } // take in config file and addt'l info to initialize broadcast func bootstrapBroadcastState(config structs.SysConfig) (structs.AuthBroadcast, error) { logger.Info("Bootstrapping state") var res structs.AuthBroadcast // get a list of piers from config piers := config.Piers // this returns a map of ship:running status logger.Info("Resolving pier status") updates, err := constructPierInfo(piers) if err != nil { return res, err } // update broadcastState err = UpdateBroadcastState(map[string]interface{}{ "Urbits": updates, }) if err != nil { errmsg := fmt.Sprintf("Unable to update broadcast state: %v", err) logger.Error(errmsg) return res, err } // wgRegistered := config.WgRegistered // wgOn := config.WgOn // get startram regions logger.Info("Retrieving StarTram region info") regions, err := startram.GetRegions() if err != nil { logger.Warn("Couldn't get StarTram regions") } else { updates := map[string]interface{}{ "Profile": map[string]interface{}{ "Startram": map[string]interface{}{ "Info": map[string]interface{}{ "Regions": regions, }, }, }, } err := UpdateBroadcastState(updates) if err != nil { errmsg := fmt.Sprintf("Error updating broadcast state:", err) logger.Error(errmsg) } } // return the boostrapped result res = GetState() return res, nil } func constructPierInfo(piers []string) (map[string]structs.Urbit, error) { updates := make(map[string]structs.Urbit) currentState := GetState() shipNetworks := GetContainerNetworks(piers) pierStatus, err := docker.GetShipStatus(piers) if err != nil { errmsg := fmt.Sprintf("Unable to bootstrap urbit states: %v", err) logger.Error(errmsg) return updates, err } hostName, err := os.Hostname() if err != nil { errmsg := fmt.Sprintf("Error getting hostname, defaulting to `nativeplanet`: %v", err) logger.Warn(errmsg) hostName = "nativeplanet" } // convert the running status into bools for pier, status := range pierStatus { dockerConfig := docker.Conf(pier) fmt.Println(dockerConfig) // pull docker info from json /* var dockerConfig structs.UrbitDocker confPath := filepath.Join(config.BasePath, "settings", "pier", pier+".json") file, err := ioutil.ReadFile(confPath) if err != nil { errmsg := fmt.Sprintf("Unable to load %s config: %v", pier, err) logger.Error(errmsg) continue } if err := json.Unmarshal(file, &dockerConfig); err != nil { errmsg := fmt.Sprintf("Error decoding %s JSON: %v", pier, err) logger.Error(errmsg) continue } */ urbit := structs.Urbit{} if existingUrbit, exists := currentState.Urbits[pier]; exists { // If the ship already exists in broadcastState, use its current state urbit = existingUrbit } isRunning := (status == "Up" || strings.HasPrefix(status, "Up ")) urbit.Info.Running = isRunning urbit.Info.Network = shipNetworks[pier] urbit.Info.URL = "http://" + hostName + ":" + string(dockerConfig.HTTPPort) urbit.Info.LoomSize = int(math.Pow(2, float64(dockerConfig.LoomSize)) / math.Pow(1024, 2)) updates[pier] = urbit } return updates, nil } // return a map of ships and their networks func GetContainerNetworks(containers []string) map[string]string { res := make(map[string]string) for _, container := range containers { network, err := docker.GetContainerNetwork(container) if err != nil { errmsg := fmt.Sprintf("Error getting container network: %v", err) logger.Error(errmsg) continue } else { res[container] = network } } return res } // update broadcastState with a map of items func UpdateBroadcastState(values map[string]interface{}) error { mu.Lock() defer mu.Unlock() v := reflect.ValueOf(&broadcastState).Elem() for key, value := range values { field := v.FieldByName(key) if !field.IsValid() || !field.CanSet() { return fmt.Errorf("field %s does not exist or is not settable", key) } val := reflect.ValueOf(value) if val.Kind() == reflect.Interface { val = val.Elem() // Extract the underlying value from the interface } if err := recursiveUpdate(field, val); err != nil { return err } } BroadcastToClients() return nil } // this allows us to insert stuff into nested structs/keys and not overwrite the existing contents func recursiveUpdate(dst, src reflect.Value) error { if !dst.CanSet() { return fmt.Errorf("field is not settable") } // If dst is a struct and src is a map, handle them field by field if dst.Kind() == reflect.Struct && src.Kind() == reflect.Map { for _, key := range src.MapKeys() { dstField := dst.FieldByName(key.String()) if !dstField.IsValid() { return fmt.Errorf("field %s does not exist in the struct", key.String()) } // Initialize the map if it's nil and we're trying to set a map if dstField.Kind() == reflect.Map && dstField.IsNil() && src.MapIndex(key).Kind() == reflect.Map { dstField.Set(reflect.MakeMap(dstField.Type())) } if !dstField.CanSet() { return fmt.Errorf("field %s is not settable in the struct", key.String()) } srcVal := src.MapIndex(key) if srcVal.Kind() == reflect.Interface { srcVal = srcVal.Elem() } if err := recursiveUpdate(dstField, srcVal); err != nil { return err } } return nil } // If both dst and src are maps, handle them recursively if dst.Kind() == reflect.Map && src.Kind() == reflect.Map { for _, key := range src.MapKeys() { srcVal := src.MapIndex(key) // If the key doesn't exist in dst, initialize it dstVal := dst.MapIndex(key) if !dstVal.IsValid() { dstVal = reflect.New(dst.Type().Elem()).Elem() } // Recursive call to handle potential nested maps or structs if err := recursiveUpdate(dstVal, srcVal); err != nil { return err } // Initialize the map if it's nil if dst.IsNil() { dst.Set(reflect.MakeMap(dst.Type())) } dst.SetMapIndex(key, dstVal) } return nil } // For non-map or non-struct fields, or for direct updates if dst.Type() != src.Type() { return fmt.Errorf("type mismatch: expected %s, got %s", dst.Type(), src.Type()) } dst.Set(src) return nil } // return broadcast state func GetState() structs.AuthBroadcast { mu.Lock() defer mu.Unlock() return broadcastState } // return json string of current broadcast state func GetStateJson() ([]byte, error) { mu.Lock() defer mu.Unlock() broadcastJson, err := json.Marshal(broadcastState) if err != nil { errmsg := fmt.Sprintf("Error marshalling response: %v", err) logger.Error(errmsg) return nil, err } return broadcastJson, nil } // broadcast the global state to all clients func BroadcastToClients() error { broadcastJson, err := json.Marshal(broadcastState) if err != nil { logger.Error("Error marshalling response:", err) return err } for client := range clients { if err := client.WriteMessage(websocket.TextMessage, broadcastJson); err != nil { logger.Error("Error writing response:", err) return err } } return nil }