| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342 |
- package docker
- import (
- "context"
- "encoding/json"
- "fmt"
- "goseg/config"
- "goseg/structs"
- "log/slog"
- "os"
- "strings"
- "time"
- "github.com/docker/docker/api/types"
- "github.com/docker/docker/api/types/container"
- "github.com/docker/docker/client"
- )
- var (
- logger = slog.New(slog.NewJSONHandler(os.Stdout, nil))
- EventBus = make(chan structs.Event, 100)
- )
- // return the container status of a slice of ships
- func GetShipStatus(patps []string) (map[string]string, error) {
- statuses := make(map[string]string)
- cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
- if err != nil {
- errmsg := fmt.Sprintf("Error getting Docker info: %v", err)
- logger.Error(errmsg)
- return statuses, err
- } else {
- containers, err := cli.ContainerList(context.Background(), types.ContainerListOptions{})
- if err != nil {
- errmsg := fmt.Sprintf("Error getting containers: %v", err)
- logger.Error(errmsg)
- return statuses, err
- } else {
- for _, pier := range patps {
- found := false
- for _, container := range containers {
- for _, name := range container.Names {
- fasPier := "/" + pier
- if name == fasPier {
- statuses[pier] = container.Status
- found = true
- break
- }
- }
- if found {
- break
- }
- }
- if !found {
- statuses[pier] = "not found"
- }
- }
- }
- return statuses, nil
- }
- }
- // return the name of a container's network
- func GetContainerNetwork(name string) (string, error) {
- cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
- if err != nil {
- return "", err
- }
- defer cli.Close()
- containerJSON, err := cli.ContainerInspect(context.Background(), name)
- if err != nil {
- return "", err
- }
- for networkName := range containerJSON.NetworkSettings.Networks {
- return networkName, nil
- }
- return "", fmt.Errorf("container is not attached to any network")
- }
- // return the disk and memory usage for a container
- func GetContainerStats(containerName string) (structs.ContainerStats, error) {
- var res structs.ContainerStats
- cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
- if err != nil {
- return res, err
- }
- defer cli.Close()
- statsResp, err := cli.ContainerStats(context.Background(), containerName, false)
- if err != nil {
- return res, err
- }
- defer statsResp.Body.Close()
- var stat types.StatsJSON
- if err := json.NewDecoder(statsResp.Body).Decode(&stat); err != nil {
- return res, err
- }
- memUsage := stat.MemoryStats.Usage
- inspectResp, err := cli.ContainerInspect(context.Background(), containerName)
- if err != nil {
- return res, err
- }
- diskUsage := int64(0)
- if inspectResp.SizeRw != nil {
- diskUsage = *inspectResp.SizeRw
- }
- return structs.ContainerStats{
- MemoryUsage: memUsage,
- DiskUsage: diskUsage,
- }, nil
- }
- // start a container by name + type
- // not for booting new ships
- func StartContainer(containerName string, containerType string) (structs.ContainerState, error) {
- var containerState structs.ContainerState
- ctx := context.Background()
- cli, err := client.NewClientWithOpts(client.FromEnv)
- if err != nil {
- return containerState, err
- }
- // get the desired tag and hash from config
- containerInfo, err := GetLatestContainerInfo(containerType)
- if err != nil {
- return containerState, err
- }
- // check if container exists
- containers, err := cli.ContainerList(ctx, types.ContainerListOptions{All: true})
- if err != nil {
- return containerState, err
- }
- var existingContainer *types.Container = nil
- for _, container := range containers {
- for _, name := range container.Names {
- if name == containerName {
- existingContainer = &container
- break
- }
- }
- if existingContainer != nil {
- break
- }
- }
- desiredTag := containerInfo["tag"]
- desiredHash := containerInfo["hash"]
- desiredRepo := containerInfo["repo"]
- desiredImage := fmt.Sprintf("%s:%s@sha256:%s", desiredRepo, desiredTag, desiredHash)
- desiredStatus := "running"
- if desiredTag == "" || desiredHash == "" {
- err = fmt.Errorf("Version info has not been retrieved!")
- return containerState, err
- }
- // check if the desired image is available locally
- images, err := cli.ImageList(ctx, types.ImageListOptions{})
- if err != nil {
- return containerState, err
- }
- imageExistsLocally := false
- for _, img := range images {
- if img.ID == desiredHash {
- imageExistsLocally = true
- break
- }
- if imageExistsLocally {
- break
- }
- }
- if !imageExistsLocally {
- // pull the image if it doesn't exist locally
- _, err = cli.ImagePull(ctx, desiredImage, types.ImagePullOptions{})
- if err != nil {
- return containerState, err
- }
- }
- switch {
- case existingContainer == nil:
- // if the container does not exist, create and start it
- _, err := cli.ContainerCreate(ctx, &container.Config{
- Image: desiredImage,
- }, nil, nil, nil, containerName)
- if err != nil {
- return containerState, err
- }
- err = cli.ContainerStart(ctx, containerName, types.ContainerStartOptions{})
- if err != nil {
- return containerState, err
- }
- msg := fmt.Sprintf("%s started with image %s", containerName, desiredImage)
- logger.Info(msg)
- case existingContainer.State == "exited":
- // if the container exists but is stopped, start it
- err := cli.ContainerStart(ctx, containerName, types.ContainerStartOptions{})
- if err != nil {
- return containerState, err
- }
- msg := fmt.Sprintf("Started stopped container %s", containerName)
- logger.Info(msg)
- default:
- // if container is running, check the image digest
- currentImage := existingContainer.Image
- digestParts := strings.Split(currentImage, "@sha256:")
- currentDigest := ""
- if len(digestParts) > 1 {
- currentDigest = digestParts[1]
- }
- if currentDigest != desiredHash {
- // if the hashes don't match, recreate the container with the new one
- err := cli.ContainerRemove(ctx, containerName, types.ContainerRemoveOptions{Force: true})
- if err != nil {
- return containerState, err
- }
- _, err = cli.ContainerCreate(ctx, &container.Config{
- Image: desiredImage,
- }, nil, nil, nil, containerName)
- if err != nil {
- return containerState, err
- }
- err = cli.ContainerStart(ctx, containerName, types.ContainerStartOptions{})
- if err != nil {
- return containerState, err
- }
- msg := fmt.Sprintf("Restarted %s with image %s", containerName, desiredImage)
- logger.Info(msg)
- }
- }
- containerDetails, err := cli.ContainerInspect(ctx, containerName)
- if err != nil {
- return containerState, fmt.Errorf("failed to inspect container %s: %v", containerName, err)
- }
- containerState = structs.ContainerState{
- ID: containerDetails.ID, // container id hash
- Name: containerName, // name (eg @p)
- Image: desiredImage, // full repo:tag@hash string
- Type: containerType, // eg `vere` (corresponds with version server label)
- DesiredStatus: desiredStatus, // what the user sets
- ActualStatus: containerDetails.State.Status, // what the daemon reports
- CreatedAt: containerDetails.Created, // this is a string
- }
- return containerState, err
- }
- // convert the version info back into json then a map lol
- // so we can easily get the correct repo/release channel/tag/hash
- func GetLatestContainerInfo(containerType string) (map[string]string, error) {
- var res map[string]string
- arch := config.Architecture
- hashLabel := arch + "_sha256"
- versionInfo := config.VersionInfo
- jsonData, err := json.Marshal(versionInfo)
- if err != nil {
- return res, err
- }
- // Convert JSON to map
- var m map[string]interface{}
- err = json.Unmarshal(jsonData, &m)
- if err != nil {
- return res, err
- }
- containerData, ok := m[containerType].(map[string]interface{})
- if !ok {
- return nil, fmt.Errorf("%s data is not a map", containerType)
- }
- tag, ok := containerData["tag"].(string)
- if !ok {
- return nil, fmt.Errorf("'tag' is not a string")
- }
- hashValue, ok := containerData[hashLabel].(string)
- if !ok {
- return nil, fmt.Errorf("'%s' is not a string", hashLabel)
- }
- repo, ok := containerData["repo"].(string)
- if !ok {
- return nil, fmt.Errorf("'repo' is not a string")
- }
- res = make(map[string]string)
- res["tag"] = tag
- res["hash"] = hashValue
- res["repo"] = repo
- return res, nil
- }
- // stop a container with the name
- func StopContainerByName(containerName string) error {
- ctx := context.Background()
- cli, err := client.NewClientWithOpts(client.FromEnv)
- if err != nil {
- return err
- }
- // fetch all containers incl stopped
- containers, err := cli.ContainerList(ctx, types.ContainerListOptions{All: true})
- if err != nil {
- return err
- }
- for _, cont := range containers {
- for _, name := range cont.Names {
- if name == "/"+containerName {
- // Stop the container
- options := container.StopOptions{}
- if err := cli.ContainerStop(ctx, cont.ID, options); err != nil {
- return fmt.Errorf("failed to stop container %s: %v", containerName, err)
- }
- logger.Info(fmt.Sprintf("Successfully stopped container %s\n", containerName))
- return nil
- }
- }
- }
- return fmt.Errorf("container with name %s not found", containerName)
- }
- // subscribe to docker events and feed them into eventbus
- func DockerListener() {
- ctx := context.Background()
- cli, err := client.NewClientWithOpts(client.FromEnv)
- if err != nil {
- logger.Error(fmt.Sprintf("Error initializing Docker client: %v", err))
- return
- }
- messages, errs := cli.Events(ctx, types.EventsOptions{})
- for {
- select {
- case event := <-messages:
- // Convert the Docker event to our custom event and send it to the EventBus
- EventBus <- structs.Event{Type: event.Action, Data: event}
- case err := <-errs:
- logger.Error(fmt.Sprintf("Docker event error: %v", err))
- }
- }
- }
- // periodically poll docker in case we miss something
- func DockerPoller() {
- ticker := time.NewTicker(10 * time.Second)
- for {
- select {
- case <-ticker.C:
- logger.Info("polling docker")
- // todo (maybe not necessary?)
- // fetch the status of all containers and compare with app's state
- // if there's a change, send an event to the EventBus
- return
- }
- }
- }
|