package broadcast import ( "encoding/json" "goseg/config" "goseg/docker" "goseg/structs" "log/slog" "os" "reflect" "strings" "sync" "fmt" "github.com/gorilla/websocket" ) var ( logger = slog.New(slog.NewJSONHandler(os.Stdout, nil)) clients = make(map[*websocket.Conn]bool) broadcastState structs.AuthBroadcast mu sync.RWMutex // synchronize access to broadcastState ) func init() { // initialize broadcastState global var config := config.Conf() broadcast, err := bootstrapBroadcastState(config) if err != nil { errmsg := fmt.Sprintf("Unable to initialize broadcast: %v",err) panic(errmsg) } broadcastState = broadcast } // adds ws client func RegisterClient(conn *websocket.Conn) { clients[conn] = true broadcastJson, err := GetStateJson() if err != nil { return } // when a new ws client registers, send them the current broadcast if err := conn.WriteMessage(websocket.TextMessage, broadcastJson); err != nil { fmt.Println("Error writing response:", err) return } } // remove ws client func UnregisterClient(conn *websocket.Conn) { delete(clients, conn) } // take in config file and addt'l info to initialize broadcast func bootstrapBroadcastState(config structs.SysConfig) (structs.AuthBroadcast, error) { var res structs.AuthBroadcast piers := config.Piers pierStatus, err := docker.GetShipStatus(piers) if err != nil { errmsg := fmt.Sprintf("Unable to bootstrap urbit states: %v",err) logger.Error(errmsg) return res, err } updates := make(map[string]structs.Urbit) for pier, status := range pierStatus { urbit := structs.Urbit{} if existingUrbit, exists := broadcastState.Urbits[pier]; exists { // If the ship already exists in broadcastState, use its current state urbit = existingUrbit } isRunning := (status == "Up" || strings.HasPrefix(status, "Up ")) urbit.Info.Running = isRunning updates[pier] = urbit } // update broadcastState err = UpdateBroadcastState(map[string]interface{}{ "Urbits": updates, }) if err != nil { errmsg := fmt.Sprintf("Unable to update broadcast state: %v", err) logger.Error(errmsg) return res, err } res = GetState() return res, nil } // // update broadcastState with a map of items // func UpdateBroadcastState(values map[string]interface{}) error { // mu.Lock() // defer mu.Unlock() // v := reflect.ValueOf(&broadcastState).Elem() // for key, value := range values { // // we are matching the map key with the broadcastState item // field := v.FieldByName(key) // if !field.IsValid() || !field.CanSet() { // return fmt.Errorf("field %s does not exist or is not settable", key) // } // val := reflect.ValueOf(value) // if field.Type() != val.Type() { // return fmt.Errorf("type mismatch for field %s: expected %s, got %s", key, field.Type(), val.Type()) // } // field.Set(val) // } // BroadcastToClients() // return nil // } // update broadcastState with a map of items func UpdateBroadcastState(values map[string]interface{}) error { mu.Lock() defer mu.Unlock() v := reflect.ValueOf(&broadcastState).Elem() for key, value := range values { field := v.FieldByName(key) if !field.IsValid() || !field.CanSet() { return fmt.Errorf("field %s does not exist or is not settable", key) } if err := recursiveUpdate(field, reflect.ValueOf(value)); err != nil { return err } } BroadcastToClients() return nil } // this allows us to insert stuff into nested vals and not overwrite the existing contents func recursiveUpdate(dst, src reflect.Value) error { if !dst.CanSet() { return fmt.Errorf("field is not settable") } // If both dst and src are maps, handle them recursively if dst.Kind() == reflect.Map && src.Kind() == reflect.Map { for _, key := range src.MapKeys() { srcVal := src.MapIndex(key) // If the key doesn't exist in dst, initialize it dstVal := dst.MapIndex(key) if !dstVal.IsValid() { dstVal = reflect.New(dst.Type().Elem()).Elem() } // Recursive call to handle potential nested maps if err := recursiveUpdate(dstVal, srcVal); err != nil { return err } dst.SetMapIndex(key, dstVal) } return nil } // For non-map fields or direct updates if dst.Type() != src.Type() { return fmt.Errorf("type mismatch: expected %s, got %s", dst.Type(), src.Type()) } dst.Set(src) return nil } // return broadcast state func GetState() structs.AuthBroadcast { mu.Lock() defer mu.Unlock() return broadcastState } // return json string of current broadcast state func GetStateJson() ([]byte, error) { mu.Lock() defer mu.Unlock() broadcastJson, err := json.Marshal(broadcastState) if err != nil { errmsg := fmt.Sprintf("Error marshalling response: %v", err) logger.Error(errmsg) return nil, err } return broadcastJson, nil } // broadcast the global state to all clients func BroadcastToClients() error { broadcastJson, err := json.Marshal(broadcastState) if err != nil { logger.Error("Error marshalling response:", err) return err } for client := range clients { if err := client.WriteMessage(websocket.TextMessage, broadcastJson); err != nil { logger.Error("Error writing response:", err) return err } } return nil }