| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174 |
- package broadcast
- import (
- "encoding/json"
- "goseg/config"
- "goseg/docker"
- "goseg/structs"
- "log/slog"
- "os"
- "reflect"
- "strings"
- "sync"
- "fmt"
- "github.com/gorilla/websocket"
- )
- var (
- logger = slog.New(slog.NewJSONHandler(os.Stdout, nil))
- clients = make(map[*websocket.Conn]bool)
- broadcastState structs.AuthBroadcast
- mu sync.RWMutex // synchronize access to broadcastState
- )
- func init() {
- // initialize broadcastState global var
- config := config.Conf()
- broadcast, err := bootstrapBroadcastState(config)
- if err != nil {
- errmsg := fmt.Sprintf("Unable to initialize broadcast: %v",err)
- panic(errmsg)
- }
- broadcastState = broadcast
- }
- // adds ws client
- func RegisterClient(conn *websocket.Conn) {
- clients[conn] = true
- broadcastJson, err := GetStateJson()
- if err != nil {
- return
- }
- // when a new ws client registers, send them the current broadcast
- if err := conn.WriteMessage(websocket.TextMessage, broadcastJson); err != nil {
- fmt.Println("Error writing response:", err)
- return
- }
- }
- // remove ws client
- func UnregisterClient(conn *websocket.Conn) {
- delete(clients, conn)
- }
- // take in config file and addt'l info to initialize broadcast
- func bootstrapBroadcastState(config structs.SysConfig) (structs.AuthBroadcast, error) {
- var res structs.AuthBroadcast
- piers := config.Piers
- pierStatus, err := docker.GetShipStatus(piers)
- if err != nil {
- errmsg := fmt.Sprintf("Unable to bootstrap urbit states: %v",err)
- logger.Error(errmsg)
- return res, err
- }
- updates := make(map[string]structs.Urbit)
- for pier, status := range pierStatus {
- urbit := structs.Urbit{}
- if existingUrbit, exists := broadcastState.Urbits[pier]; exists {
- // If the ship already exists in broadcastState, use its current state
- urbit = existingUrbit
- }
- isRunning := (status == "Up" || strings.HasPrefix(status, "Up "))
- urbit.Info.Running = isRunning
- updates[pier] = urbit
- }
- // update broadcastState
- err = UpdateBroadcastState(map[string]interface{}{
- "Urbits": updates,
- })
- if err != nil {
- errmsg := fmt.Sprintf("Unable to update broadcast state: %v", err)
- logger.Error(errmsg)
- return res, err
- }
- res = GetState()
- return res, nil
- }
- // update broadcastState with a map of items
- func UpdateBroadcastState(values map[string]interface{}) error {
- mu.Lock()
- defer mu.Unlock()
- v := reflect.ValueOf(&broadcastState).Elem()
- for key, value := range values {
- field := v.FieldByName(key)
- if !field.IsValid() || !field.CanSet() {
- return fmt.Errorf("field %s does not exist or is not settable", key)
- }
- if err := recursiveUpdate(field, reflect.ValueOf(value)); err != nil {
- return err
- }
- }
- BroadcastToClients()
- return nil
- }
- // this allows us to insert stuff into nested vals and not overwrite the existing contents
- func recursiveUpdate(dst, src reflect.Value) error {
- if !dst.CanSet() {
- return fmt.Errorf("field is not settable")
- }
- // If both dst and src are maps, handle them recursively
- if dst.Kind() == reflect.Map && src.Kind() == reflect.Map {
- for _, key := range src.MapKeys() {
- srcVal := src.MapIndex(key)
- // If the key doesn't exist in dst, initialize it
- dstVal := dst.MapIndex(key)
- if !dstVal.IsValid() {
- dstVal = reflect.New(dst.Type().Elem()).Elem()
- }
- // Recursive call to handle potential nested maps
- if err := recursiveUpdate(dstVal, srcVal); err != nil {
- return err
- }
- if dst.IsNil() {
- dst.Set(reflect.MakeMap(dst.Type()))
- }
- dst.SetMapIndex(key, dstVal)
- }
- return nil
- }
- // For non-map fields or direct updates
- if dst.Type() != src.Type() {
- return fmt.Errorf("type mismatch: expected %s, got %s", dst.Type(), src.Type())
- }
- dst.Set(src)
- return nil
- }
- // return broadcast state
- func GetState() structs.AuthBroadcast {
- mu.Lock()
- defer mu.Unlock()
- return broadcastState
- }
- // return json string of current broadcast state
- func GetStateJson() ([]byte, error) {
- mu.Lock()
- defer mu.Unlock()
- broadcastJson, err := json.Marshal(broadcastState)
- if err != nil {
- errmsg := fmt.Sprintf("Error marshalling response: %v", err)
- logger.Error(errmsg)
- return nil, err
- }
- return broadcastJson, nil
- }
- // broadcast the global state to all clients
- func BroadcastToClients() error {
- broadcastJson, err := json.Marshal(broadcastState)
- if err != nil {
- logger.Error("Error marshalling response:", err)
- return err
- }
- for client := range clients {
- if err := client.WriteMessage(websocket.TextMessage, broadcastJson); err != nil {
- logger.Error("Error writing response:", err)
- return err
- }
- }
- return nil
- }
|