diff options
author | Phus Lu <phuslu@hotmail.com> | 2015-04-24 16:37:17 +0800 |
---|---|---|
committer | Zheng Chai <zheng.chai@citrix.com> | 2015-05-14 17:30:24 +0800 |
commit | 75bc98ab536804963551416a4206ec8c43ebcc34 (patch) | |
tree | 158e65bf08b638bff6375baeb2d13fda16667196 | |
parent | 2c85890a25bff9f327ea95018c541c1190267492 (diff) | |
download | vyos-xe-guest-utilities-75bc98ab536804963551416a4206ec8c43ebcc34.tar.gz vyos-xe-guest-utilities-75bc98ab536804963551416a4206ec8c43ebcc34.zip |
CP-11399: Go Linux guest agent for XenServer
This is the reviewed and tested Go guest agent for XenServer Linux guests.
Go guest agent is a static linked binary without any dependency (e.g. Bash or Python execution environment) with below benefits:
1. Cross platform, Go version works well with all kinds Linux distributions (i386 and x86_64) with the porting ability to arm, FreeBSD, Darwin OS etc.
2. Standalone binary, works well with some restricted environment for example, CoreOS and Boot2Docker Linux
3. Easy to maintain and structured design, with Golang's nature
Change history:
1: Refined Rob Robert's comments.
2: Add unit test for xenstoreclient and refact folder structure.
3: Refined codes according to Robert's comments
4: To run 32bit xe-guest-agent in Linux 64bit OS(eg, CoreOS):
we need
4.1 - Switch to ip/ifconfig CLI tool instead of net package
4.2 - Switch to log package instead of syslog package
Signed-off-by: phus lu <phus.lu@citrix.com>
-rw-r--r-- | .gitignore | 1 | ||||
-rw-r--r-- | Makefile | 36 | ||||
-rw-r--r-- | README.md | 21 | ||||
-rw-r--r-- | guestmetric/guestmetric.go | 38 | ||||
-rw-r--r-- | guestmetric/guestmetric_linux.go | 257 | ||||
-rw-r--r-- | guestmetric/guestmetric_test.go | 63 | ||||
-rw-r--r-- | xe-daemon/xe-daemon.go | 119 | ||||
-rw-r--r-- | xenstore/xenstore.go | 123 | ||||
-rw-r--r-- | xenstoreclient/xenstore.go | 433 | ||||
-rw-r--r-- | xenstoreclient/xenstore_test.go | 60 |
10 files changed, 1151 insertions, 0 deletions
@@ -6,6 +6,7 @@ # Folders _obj _test +bin # Architecture specific extensions/prefixes *.[568vq] diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..d5eeb5c --- /dev/null +++ b/Makefile @@ -0,0 +1,36 @@ + +GO_BUILD = go build +GO_FLAGS = -a -x + +SRC_DIR = . +BIN_DIR = bin + +BINARIES := +BINARIES += $(BIN_DIR)/xe-daemon +BINARIES += $(BIN_DIR)/xenstore + +XE_DAEMON_SOURCES := +XE_DAEMON_SOURCES += $(SRC_DIR)/xe-daemon/xe-daemon.go +XE_DAEMON_SOURCES += $(SRC_DIR)/guestmetric/guestmetric.go +XE_DAEMON_SOURCES += $(SRC_DIR)/guestmetric/guestmetric_linux.go +XE_DAEMON_SOURCES += $(SRC_DIR)/xenstoreclient/xenstore.go + +XENSTORE_SOURCES := +XENSTORE_SOURCES += $(SRC_DIR)/xenstore/xenstore.go +XENSTORE_SOURCES += $(SRC_DIR)/xenstoreclient/xenstore.go + +.PHONY: build +build: $(BINARIES) + +.PHONY: clean +clean: + -rm -f $(BINARIES) + +$(BIN_DIR)/xe-daemon: $(XE_DAEMON_SOURCES) + mkdir -p $(BIN_DIR) + $(GO_BUILD) $(GO_FLAGS) -o $@ $< + +$(BIN_DIR)/xenstore: $(XENSTORE_SOURCES) + mkdir -p $(BIN_DIR) + $(GO_BUILD) $(GO_FLAGS) -o $@ $< + diff --git a/README.md b/README.md new file mode 100644 index 0000000..ac8c91b --- /dev/null +++ b/README.md @@ -0,0 +1,21 @@ +go-guest-utilites +=================== + +This is the golang guest utilites for XenServer + + +XenStore CLI +----------- +xe-guest-utilities.git/xenstore + + +XenServer Guest Utilities +----------- +xe-guest-utilities.git/xe-daemon + + +Build Instructions +=================== +[Go development environment](https://golang.org/doc/install) is required to build the guest utilities. + +Type `make` or `make build` to build the xenstore and xe-daemon. diff --git a/guestmetric/guestmetric.go b/guestmetric/guestmetric.go new file mode 100644 index 0000000..836f105 --- /dev/null +++ b/guestmetric/guestmetric.go @@ -0,0 +1,38 @@ +package guestmetric + +import ( + "bytes" + "os/exec" +) + +type GuestMetric map[string]string + +type CollectFunc func() (GuestMetric, error) + +type GuestMetricsCollector interface { + CollectOS() (GuestMetric, error) + CollectMisc() (GuestMetric, error) + CollectNetworkAddr() (GuestMetric, error) + CollectDisk() (GuestMetric, error) + CollectMemory() (GuestMetric, error) +} + +func runCmd(name string, args ...string) (output string, err error) { + cmd := exec.Command(name, args...) + var out bytes.Buffer + cmd.Stdout = &out + err = cmd.Run() + if err != nil { + return "", err + } + output = out.String() + return output, nil +} + +func prefixKeys(prefix string, m GuestMetric) GuestMetric { + m1 := make(GuestMetric, 0) + for k, v := range m { + m1[prefix+k] = v + } + return m1 +} diff --git a/guestmetric/guestmetric_linux.go b/guestmetric/guestmetric_linux.go new file mode 100644 index 0000000..3970da4 --- /dev/null +++ b/guestmetric/guestmetric_linux.go @@ -0,0 +1,257 @@ +package guestmetric + +import ( + xenstoreclient "../xenstoreclient" + "bufio" + "bytes" + "fmt" + "os" + "path/filepath" + "regexp" + "sort" + "strconv" + "strings" +) + +type Collector struct { + Client xenstoreclient.XenStoreClient + Ballon bool + Debug bool +} + +func (c *Collector) CollectOS() (GuestMetric, error) { + current := make(GuestMetric, 0) + f, err := os.OpenFile("/var/cache/xe-linux-distribution", os.O_RDONLY, 0666) + if err != nil { + return nil, err + } + defer f.Close() + scanner := bufio.NewScanner(f) + for scanner.Scan() { + line := scanner.Text() + if strings.Contains(line, "=") { + parts := strings.SplitN(line, "=", 2) + k := strings.TrimSpace(parts[0]) + v := strings.TrimSpace(strings.Trim(strings.TrimSpace(parts[1]), "\"")) + current[k] = v + } + } + return prefixKeys("data/", current), nil +} + +func (c *Collector) CollectMisc() (GuestMetric, error) { + current := make(GuestMetric, 0) + if c.Ballon { + current["control/feature-balloon"] = "1" + } else { + current["control/feature-balloon"] = "0" + } + current["attr/PVAddons/Installed"] = "1" + current["attr/PVAddons/MajorVersion"] = "@PRODUCT_MAJOR_VERSION@" + current["attr/PVAddons/MinorVersion"] = "@PRODUCT_MINOR_VERSION@" + current["attr/PVAddons/MicroVersion"] = "@PRODUCT_MICRO_VERSION@" + current["attr/PVAddons/BuildVersion"] = "@NUMERIC_BUILD_NUMBER@" + + return current, nil +} + +func (c *Collector) CollectMemory() (GuestMetric, error) { + current := make(GuestMetric, 0) + f, err := os.OpenFile("/proc/meminfo", os.O_RDONLY, 0666) + if err != nil { + return nil, err + } + defer f.Close() + scanner := bufio.NewScanner(f) + for scanner.Scan() { + parts := regexp.MustCompile(`\w+`).FindAllString(scanner.Text(), -1) + switch parts[0] { + case "MemTotal": + current["meminfo_total"] = parts[1] + case "MemFree": + current["meminfo_free"] = parts[1] + } + } + return prefixKeys("data/", current), nil +} + +func EnumNetworkAddresses(iface string) (GuestMetric, error) { + const ( + IP_RE string = `(\d{1,3}\.){3}\d{1,3}` + IPV6_RE string = `[\da-f:]+[\da-f]` + ) + + var ( + IP_IPV4_ADDR_RE = regexp.MustCompile(`inet\s*(` + IP_RE + `).*\se[a-zA-Z0-9]+\s`) + IP_IPV6_ADDR_RE = regexp.MustCompile(`inet6\s*(` + IPV6_RE + `)`) + IFCONFIG_IPV4_ADDR_RE = regexp.MustCompile(`inet addr:\s*(` + IP_RE + `)`) + IFCONFIG_IPV6_ADDR_RE = regexp.MustCompile(`inet6 addr:\s*(` + IPV6_RE + `)`) + ) + + d := make(GuestMetric, 0) + + var v4re, v6re *regexp.Regexp + var out string + var err error + if out, err = runCmd("ip", "addr", "show", iface); err == nil { + v4re = IP_IPV4_ADDR_RE + v6re = IP_IPV6_ADDR_RE + } else if out, err = runCmd("ifconfig", iface); err == nil { + v4re = IFCONFIG_IPV4_ADDR_RE + v6re = IFCONFIG_IPV6_ADDR_RE + } else { + return nil, fmt.Errorf("Cannot found ip/ifconfig command") + } + + m := v4re.FindAllStringSubmatch(out, -1) + if m != nil { + for _, parts := range m { + d["ip"] = parts[1] + } + } + m = v6re.FindAllStringSubmatch(out, -1) + if m != nil { + for i, parts := range m { + d[fmt.Sprintf("ipv6/%d/addr", i)] = parts[1] + } + } + return d, nil +} + +func (c *Collector) CollectNetworkAddr() (GuestMetric, error) { + current := make(GuestMetric, 0) + + paths, err := filepath.Glob("/sys/class/net/e*") + if err != nil { + return nil, err + } + + for _, path := range paths { + iface := filepath.Base(path) + if addrs, err := EnumNetworkAddresses(iface); err == nil { + for tag, addr := range addrs { + current[fmt.Sprintf("%s/%s", iface, tag)] = addr + } + } + } + return prefixKeys("attr/", current), nil +} + +func readSysfs(filename string) (string, error) { + f, err := os.OpenFile(filename, os.O_RDONLY, 0666) + if err != nil { + return "", err + } + defer f.Close() + scanner := bufio.NewScanner(f) + scanner.Scan() + return scanner.Text(), nil +} + +func (c *Collector) CollectDisk() (GuestMetric, error) { + pi := make(GuestMetric, 0) + + disks := make([]string, 0) + paths, err := filepath.Glob("/sys/block/*/device") + if err != nil { + return nil, err + } + for _, path := range paths { + disk := filepath.Base(strings.TrimSuffix(filepath.Dir(path), "/")) + disks = append(disks, disk) + } + + var sortedDisks sort.StringSlice = disks + sortedDisks.Sort() + + part_idx := 0 + for _, disk := range sortedDisks[:] { + paths, err = filepath.Glob(fmt.Sprintf("/dev/%s?*", disk)) + if err != nil { + return nil, err + } + for _, path := range paths { + p := filepath.Base(path) + line, err := readSysfs(fmt.Sprintf("/sys/block/%s/%s/size", disk, p)) + if err != nil { + return nil, err + } + size, err := strconv.ParseInt(line, 10, 64) + if err != nil { + return nil, err + } + blocksize := 512 + if bs, err := readSysfs(fmt.Sprintf("/sys/block/%s/queue/physical_block_size", p)); err == nil { + if bs1, err := strconv.Atoi(bs); err == nil { + blocksize = bs1 + } + } + real_dev := "" + if c.Client != nil { + nodename, err := readSysfs(fmt.Sprintf("/sys/block/%s/device/nodename", disk)) + if err != nil { + return nil, err + } + backend, err := c.Client.Read(fmt.Sprintf("%s/backend", nodename)) + if err != nil { + return nil, err + } + real_dev, err = c.Client.Read(fmt.Sprintf("%s/dev", backend)) + if err != nil { + return nil, err + } + } + name := path + blkid, err := runCmd("blkid", "-s", "UUID", path) + if err != nil { + return nil, err + } + if strings.Contains(blkid, "=") { + parts := strings.SplitN(strings.TrimSpace(blkid), "=", 2) + name = fmt.Sprintf("%s(%s)", name, strings.Trim(parts[1], "\"")) + } + i := map[string]string{ + "extents/0": real_dev, + "name": name, + "size": strconv.FormatInt(size*int64(blocksize), 10), + } + output, err := runCmd("pvs", "--noheadings", "--units", "b", path) + if err == nil && output != "" { + parts := regexp.MustCompile(`\s+`).Split(output, -1)[1:] + i["free"] = strings.TrimSpace(parts[5])[:len(parts[5])-1] + i["filesystem"] = strings.TrimSpace(parts[2]) + i["mount_points/0"] = "[LVM]" + } else { + output, err = runCmd("mount") + if err == nil { + m := regexp.MustCompile(`(?m)^(\S+) on (\S+) type (\S+)`).FindAllStringSubmatch(output, -1) + if m != nil { + for _, parts := range m { + if parts[1] == path { + i["mount_points/0"] = parts[2] + i["filesystem"] = parts[3] + break + } + } + } + } + output, err = runCmd("df", path) + if err == nil { + scanner := bufio.NewScanner(bytes.NewReader([]byte(output))) + scanner.Scan() + scanner.Scan() + parts := regexp.MustCompile(`\s+`).Split(scanner.Text(), -1) + free, err := strconv.ParseInt(parts[3], 10, 64) + if err == nil { + i["free"] = strconv.FormatInt(free*1024, 10) + } + } + } + for k, v := range i { + pi[fmt.Sprintf("data/volumes/%d/%s", part_idx, k)] = v + } + part_idx += 1 + } + } + return pi, nil +} diff --git a/guestmetric/guestmetric_test.go b/guestmetric/guestmetric_test.go new file mode 100644 index 0000000..dcc5384 --- /dev/null +++ b/guestmetric/guestmetric_test.go @@ -0,0 +1,63 @@ +package guestmetric + +import ( + "testing" +) + +func TestCollector(t *testing.T) { + c := Collector{ + Client: nil, + } + + funcs := []CollectFunc{ + c.CollectDisk, + c.CollectMemory, + c.CollectMisc, + c.CollectNetworkAddr, + } + + for _, f := range funcs { + metric, err := f() + if err != nil { + t.Errorf("%#v error: %#v\n", f, err) + } + t.Logf("%#v return %#v", f, metric) + } +} + +func doBenchmark(b *testing.B, f CollectFunc) { + b.Logf("doBenchmark 1000 for %#v", f) + for i := 0; i < 1000; i++ { + if _, err := f(); err != nil { + b.Errorf("%#v error: %#v\n", f, err) + } + } +} + +func BenchmarkCollectorDisk(b *testing.B) { + c := Collector{ + Client: nil, + } + doBenchmark(b, c.CollectDisk) +} + +func BenchmarkCollectMemory(b *testing.B) { + c := Collector{ + Client: nil, + } + doBenchmark(b, c.CollectMemory) +} + +func BenchmarkCollectMisc(b *testing.B) { + c := Collector{ + Client: nil, + } + doBenchmark(b, c.CollectMisc) +} + +func BenchmarkCollectNetwork(b *testing.B) { + c := Collector{ + Client: nil, + } + doBenchmark(b, c.CollectNetworkAddr) +} diff --git a/xe-daemon/xe-daemon.go b/xe-daemon/xe-daemon.go new file mode 100644 index 0000000..1e5fdb1 --- /dev/null +++ b/xe-daemon/xe-daemon.go @@ -0,0 +1,119 @@ +package main + +import ( + guestmetric "../guestmetric" + xenstoreclient "../xenstoreclient" + "flag" + "fmt" + "log" + "os" + "os/signal" + "syscall" + "time" +) + +func write_pid_file(pid_file string) error { + f, err := os.Create(pid_file) + if err != nil { + return err + } + defer f.Close() + + fmt.Fprintf(f, "%d\n", os.Getpid()) + return nil +} + +func main() { + var err error + + sleepInterval := flag.Int("i", 60, "Interval between updates (in seconds)") + debugFlag := flag.Bool("d", false, "Update to stdout rather than xenstore") + balloonFlag := flag.Bool("B", true, "Do not report that ballooning is supported") + pid := flag.String("p", "", "Write the PID to FILE") + + flag.Parse() + + if *pid != "" { + write_pid_file(*pid) + } + + logger := log.New(os.Stderr, "xe-daemon", 0) + + exitChannel := make(chan os.Signal, 1) + signal.Notify(exitChannel, syscall.SIGTERM, syscall.SIGINT) + + xs, err := xenstoreclient.NewCachedXenstore(0) + if err != nil { + logger.Printf("NewCachedXenstore error: %v", err) + return + } + + collector := &guestmetric.Collector{ + Client: xs, + Ballon: *balloonFlag, + Debug: *debugFlag, + } + + collectors := []struct { + divisor int + name string + Collect func() (guestmetric.GuestMetric, error) + }{ + {1, "CollectOS", collector.CollectOS}, + {1, "CollectMisc", collector.CollectMisc}, + {1, "CollectNetworkAddr", collector.CollectNetworkAddr}, + {1, "CollectDisk", collector.CollectDisk}, + {2, "CollectMemory", collector.CollectMemory}, + } + + lastUniqueID, err := xs.Read("unique-domain-id") + if err != nil { + logger.Printf("xenstore.Read unique-domain-id error: %v", err) + } + + for count := 0; ; count += 1 { + uniqueID, err := xs.Read("unique-domain-id") + if err != nil { + logger.Printf("xenstore.Read unique-domain-id error: %v", err) + return + } + if uniqueID != lastUniqueID { + // VM has just resume, cache state now invalid + lastUniqueID = uniqueID + if cx, ok := xs.(*xenstoreclient.CachedXenStore); ok { + cx.Clear() + } + } + + // invoke collectors + for _, collector := range collectors { + if count%collector.divisor == 0 { + logger.Printf("Running %s ...", collector.name) + result, err := collector.Collect() + if err != nil { + logger.Printf("%s error: %#v", collector.name, err) + } else { + for name, value := range result { + err := xs.Write(name, value) + if err != nil { + logger.Printf("xenstore.Write error: %v", err) + } else { + logger.Printf("xenstore.Write OK: %#v: %#v", name, value) + } + } + } + } + } + + xs.Write("data/updated", time.Now().Format("Mon Jan _2 15:04:05 2006")) + + select { + case <-exitChannel: + logger.Printf("Received an interrupt, stopping services...") + return + + case <-time.After(time.Duration(*sleepInterval) * time.Second): + continue + } + } +} diff --git a/xenstore/xenstore.go b/xenstore/xenstore.go new file mode 100644 index 0000000..c6fe1a5 --- /dev/null +++ b/xenstore/xenstore.go @@ -0,0 +1,123 @@ +package main + +import ( + xenstoreclient "../xenstoreclient" + "fmt" + "os" + "strings" +) + +func die(format string, a ...interface{}) { + fmt.Fprintf(os.Stderr, format, a...) + fmt.Fprintln(os.Stderr) + os.Exit(1) +} + +func usage() { + die( + `Usage: xenstore read key [ key ... ] + write key value [ key value ... ] + rm key [ key ... ] + exists key [ key ... ]`) +} + +func new_xs() xenstoreclient.XenStoreClient { + xs, err := xenstoreclient.NewXenstore(0) + if err != nil { + die("xenstore.Open error: %v", err) + } + + return xs +} + +func xs_read(script_name string, args []string) { + if len(args) == 0 || args[0] == "-h" { + die("Usage: %s key [ key ... ]", script_name) + } + + xs := new_xs() + for _, key := range args[:] { + result, err := xs.Read(key) + if err != nil { + die("%s error: %v", script_name, err) + } + + fmt.Println(result) + } +} + +func xs_write(script_name string, args []string) { + if len(args) == 0 || args[0] == "-h" || len(args)%2 != 0 { + die("Usage: %s key value [ key value ... ]", script_name) + } + + xs := new_xs() + for i := 0; i < len(args); i += 2 { + key := args[i] + value := args[i+1] + + err := xs.Write(key, value) + if err != nil { + die("%s error: %v", script_name, err) + } + } +} + +func xs_rm(script_name string, args []string) { + if len(args) == 0 || args[0] == "-h" { + die("Usage: %s key [ key ... ]", script_name) + } + + xs := new_xs() + for _, key := range args[:] { + err := xs.Rm(key) + if err != nil { + die("%s error: %v", script_name, err) + } + } +} + +func xs_exists(script_name string, args []string) { + if len(args) == 0 || args[0] == "-h" { + die("Usage: %s key [ key ... ]", script_name) + } + + xs := new_xs() + for _, key := range args[:] { + _, err := xs.Read(key) + if err != nil { + die("%s error: %v", script_name, err) + } + } +} + +func main() { + var operation string + var args []string + + script_name := os.Args[0] + if strings.Contains(script_name, "-") { + operation = script_name[strings.LastIndex(script_name, "-")+1:] + args = os.Args[1:] + } else { + if len(os.Args) < 2 { + usage() + } + operation = os.Args[1] + script_name = script_name + " " + operation + args = os.Args[2:] + } + + switch operation { + case "read": + xs_read(script_name, args) + case "write": + xs_write(script_name, args) + case "rm": + xs_rm(script_name, args) + case "exists": + xs_exists(script_name, args) + default: + usage() + } +} diff --git a/xenstoreclient/xenstore.go b/xenstoreclient/xenstore.go new file mode 100644 index 0000000..0d3e2bb --- /dev/null +++ b/xenstoreclient/xenstore.go @@ -0,0 +1,433 @@ +package xenstoreclient + +import ( + "bufio" + "bytes" + "encoding/binary" + "errors" + "fmt" + "io" + "os" + "strconv" + "strings" + "sync" + "time" +) + +type Permission int + +const ( + PERM_NONE Permission = iota + PERM_READ + PERM_WRITE + PERM_READWRITE +) + +type Operation uint32 + +const ( + XS_READ Operation = 2 + XS_GET_PERMS Operation = 3 + XS_WATCH Operation = 4 + XS_UNWATCH Operation = 5 + XS_TRANSACTION_START Operation = 6 + XS_TRANSACTION_END Operation = 7 + XS_WRITE Operation = 11 + XS_MKDIR Operation = 12 + XS_RM Operation = 13 + XS_SET_PERMS Operation = 14 + XS_WATCH_EVENT Operation = 15 + XS_ERROR Operation = 16 +) + +type Packet struct { + OpCode Operation + Req uint32 + TxID uint32 + Length uint32 + Value []byte +} + +type Event struct { + Token string + Data []byte +} + +type XenStoreClient interface { + Close() error + DO(packet *Packet) (*Packet, error) + Read(path string) (string, error) + Mkdir(path string) error + Rm(path string) error + Write(path string, value string) error + GetPermission(path string) (map[int]Permission, error) + Watch(path string) (<-chan Event, error) + StopWatch() error +} + +func ReadPacket(r io.Reader) (packet *Packet, err error) { + + packet = &Packet{} + + err = binary.Read(r, binary.LittleEndian, &packet.OpCode) + if err != nil { + return nil, err + } + err = binary.Read(r, binary.LittleEndian, &packet.Req) + if err != nil { + return nil, err + } + err = binary.Read(r, binary.LittleEndian, &packet.TxID) + if err != nil { + return nil, err + } + err = binary.Read(r, binary.LittleEndian, &packet.Length) + if err != nil { + return nil, err + } + + if packet.Length > 0 { + packet.Value = make([]byte, packet.Length) + _, err = io.ReadFull(r, packet.Value) + if err != nil { + return nil, err + } + if packet.OpCode == XS_ERROR { + return nil, errors.New(strings.Split(string(packet.Value), "\x00")[0]) + } + } + + return packet, nil +} + +func (p *Packet) Write(w io.Writer) (err error) { + var bw *bufio.Writer + + if w1, ok := w.(*bufio.Writer); ok { + bw = w1 + } else { + bw = bufio.NewWriter(w) + } + defer bw.Flush() + + err = binary.Write(bw, binary.LittleEndian, p.OpCode) + if err != nil { + return err + } + err = binary.Write(bw, binary.LittleEndian, p.Req) + if err != nil { + return err + } + err = binary.Write(bw, binary.LittleEndian, p.TxID) + if err != nil { + return err + } + err = binary.Write(bw, binary.LittleEndian, p.Length) + if err != nil { + return err + } + if p.Length > 0 { + _, err = bw.Write(p.Value) + if err != nil { + return err + } + } + return nil +} + +type XenStore struct { + tx uint32 + xbFile io.ReadWriteCloser + xbFileReader *bufio.Reader + muWatch *sync.Mutex + onceWatch *sync.Once + watchQueues map[string]chan Event + watchStopChan chan struct{} + watchStoppedChan chan struct{} + nonWatchQueue chan []byte +} + +func NewXenstore(tx uint32) (XenStoreClient, error) { + devPath, err := getDevPath() + if err != nil { + return nil, err + } + + xbFile, err := os.OpenFile(devPath, os.O_RDWR, 0666) + if err != nil { + return nil, err + } + return newXenstore(tx, xbFile) +} + +func newXenstore(tx uint32, rwc io.ReadWriteCloser) (XenStoreClient, error) { + return &XenStore{ + tx: tx, + xbFile: rwc, + xbFileReader: bufio.NewReader(rwc), + watchQueues: make(map[string]chan Event, 0), + nonWatchQueue: nil, + watchStopChan: make(chan struct{}, 1), + watchStoppedChan: make(chan struct{}, 1), + onceWatch: &sync.Once{}, + muWatch: &sync.Mutex{}, + }, nil +} + +func (xs *XenStore) Close() error { + return xs.xbFile.Close() +} + +func (xs *XenStore) DO(req *Packet) (resp *Packet, err error) { + err = req.Write(xs.xbFile) + if err != nil { + return nil, err + } + + var r io.Reader + if xs.nonWatchQueue != nil { + data := <-xs.nonWatchQueue + r = bytes.NewReader(data) + } else { + r = xs.xbFileReader + } + + resp, err = ReadPacket(r) + return resp, err +} + +func (xs *XenStore) Read(path string) (string, error) { + v := []byte(path + "\x00") + req := &Packet{ + OpCode: XS_READ, + Req: 0, + TxID: xs.tx, + Length: uint32(len(v)), + Value: v, + } + resp, err := xs.DO(req) + if err != nil { + return "", err + } + return string(resp.Value), nil +} + +func (xs *XenStore) Mkdir(path string) error { + v := []byte(path + "\x00") + req := &Packet{ + OpCode: XS_WRITE, + Req: 0, + TxID: xs.tx, + Length: uint32(len(v)), + Value: v, + } + _, err := xs.DO(req) + return err +} + +func (xs *XenStore) Rm(path string) error { + v := []byte(path + "\x00") + req := &Packet{ + OpCode: XS_RM, + Req: 0, + TxID: xs.tx, + Length: uint32(len(v)), + Value: v, + } + _, err := xs.DO(req) + return err +} + +func (xs *XenStore) Write(path string, value string) error { + v := []byte(path + "\x00" + value) + req := &Packet{ + OpCode: XS_WRITE, + Req: 0, + TxID: xs.tx, + Length: uint32(len(v)), + Value: v, + } + _, err := xs.DO(req) + return err +} + +func (xs *XenStore) GetPermission(path string) (map[int]Permission, error) { + perm := make(map[int]Permission, 0) + + v := []byte(path + "\x00") + req := &Packet{ + OpCode: XS_GET_PERMS, + Req: 0, + TxID: xs.tx, + Length: uint32(len(v)), + Value: v, + } + resp, err := xs.DO(req) + if err != nil { + return nil, err + } + + for _, e := range strings.Split(string(resp.Value[:len(resp.Value)-1]), "\x00") { + k, err := strconv.Atoi(e[1:]) + if err != nil { + return nil, err + } + var p Permission + switch e[0] { + case 'n': + p = PERM_NONE + case 'r': + p = PERM_READ + case 'w': + p = PERM_WRITE + case 'b': + p = PERM_READWRITE + } + perm[k] = p + } + return perm, nil +} + +func (xs *XenStore) Watch(path string) (<-chan Event, error) { + watcher := func() { + + type XSData struct { + *Packet + Error error + } + + xsDataChan := make(chan XSData, 100) + go func(r io.Reader, out chan<- XSData) { + for { + p, err := ReadPacket(r) + out <- XSData{Packet: p, Error: err} + } + }(xs.xbFileReader, xsDataChan) + + xs.nonWatchQueue = make(chan []byte, 100) + for { + select { + case <-xs.watchStopChan: + fmt.Printf("watch receive stop signal, quit.") + xs.watchStopChan <- struct{}{} + return + case xsdata := <-xsDataChan: + if xsdata.Error != nil { + fmt.Printf("watch receive error: %#v", xsdata.Error) + return + } + switch xsdata.Packet.OpCode { + case XS_WATCH_EVENT: + parts := strings.SplitN(string(xsdata.Value), "\x00", 2) + path := parts[0] + token := parts[1] + data := []byte(parts[2]) + if c, ok := xs.watchQueues[path]; ok { + c <- Event{token, data} + } + default: + var b bytes.Buffer + xsdata.Packet.Write(&b) + xs.nonWatchQueue <- b.Bytes() + } + } + } + } + xs.onceWatch.Do(watcher) + xs.muWatch.Lock() + defer xs.muWatch.Unlock() + if _, ok := xs.watchQueues[path]; !ok { + xs.watchQueues[path] = make(chan Event, 100) + } + return xs.watchQueues[path], nil +} + +func (xs *XenStore) StopWatch() error { + xs.watchStopChan <- struct{}{} + <-xs.watchStoppedChan + xs.nonWatchQueue = nil + return nil +} + +type CachedXenStore struct { + xs XenStoreClient + writeCache map[string]string + lastCommit map[string]time.Time +} + +func NewCachedXenstore(tx uint32) (XenStoreClient, error) { + xs, err := NewXenstore(tx) + if err != nil { + return nil, err + } + return &CachedXenStore{ + xs: xs, + writeCache: make(map[string]string, 0), + lastCommit: make(map[string]time.Time, 0), + }, nil +} + +func (xs *CachedXenStore) Write(path string, value string) error { + if v, ok := xs.writeCache[path]; ok && v == value { + if t, ok := xs.lastCommit[path]; ok && t.After(time.Now().Add(-2*time.Minute)) { + return nil + } + } + err := xs.xs.Write(path, value) + if err != nil { + xs.writeCache[path] = value + xs.lastCommit[path] = time.Now() + } + return err +} + +func (xs *CachedXenStore) Close() error { + return xs.xs.Close() +} + +func (xs *CachedXenStore) DO(req *Packet) (resp *Packet, err error) { + return xs.xs.DO(req) +} + +func (xs *CachedXenStore) Read(path string) (string, error) { + return xs.xs.Read(path) +} + +func (xs *CachedXenStore) Mkdir(path string) error { + return xs.xs.Mkdir(path) +} + +func (xs *CachedXenStore) Rm(path string) error { + return xs.xs.Rm(path) +} + +func (xs *CachedXenStore) GetPermission(path string) (map[int]Permission, error) { + return xs.xs.GetPermission(path) +} + +func (xs *CachedXenStore) Watch(path string) (<-chan Event, error) { + return xs.xs.Watch(path) +} + +func (xs *CachedXenStore) StopWatch() error { + return xs.xs.StopWatch() +} + +func (xs *CachedXenStore) Clear() { + xs.writeCache = make(map[string]string, 0) + xs.lastCommit = make(map[string]time.Time, 0) +} + +func getDevPath() (devPath string, err error) { + devPaths := []string{ + "/proc/xen/xenbus", + "/dev/xen/xenbus", + "/kern/xen/xenbus", + } + for _, devPath = range devPaths { + if _, err = os.Stat(devPath); err == nil { + return devPath, err + } + } + return "", fmt.Errorf("Cannot locate xenbus dev path in %v", devPaths) +} diff --git a/xenstoreclient/xenstore_test.go b/xenstoreclient/xenstore_test.go new file mode 100644 index 0000000..1259447 --- /dev/null +++ b/xenstoreclient/xenstore_test.go @@ -0,0 +1,60 @@ +package xenstoreclient + +import ( + "bytes" + "testing" +) + +type mockFile struct { + t *testing.T +} + +func (f *mockFile) Read(b []byte) (n int, err error) { + value := "i am value" + p := &Packet{ + OpCode: XS_READ, + Req: 0, + TxID: 0, + Length: uint32(len(value)), + Value: []byte(value), + } + var buf bytes.Buffer + if err = p.Write(&buf); err != nil { + return 0, err + } + copy(b, buf.Bytes()) + n = buf.Len() + f.t.Logf("Read %d bytes", n) + return n, nil +} + +func (f *mockFile) Write(b []byte) (n int, err error) { + buf := bytes.NewBuffer(b) + if _, err := ReadPacket(buf); err != nil { + return 0, err + } + n = len(b) + f.t.Logf("Write %d bytes", n) + return n, nil +} + +func (f *mockFile) Close() error { + f.t.Logf("Close()") + return nil +} + +func TestXenStore(t *testing.T) { + xs, err := newXenstore(0, &mockFile{t}) + if err != nil { + t.Errorf("newXenstore error: %#v\n", err) + } + defer xs.Close() + + if _, err := xs.Read("foo"); err != nil { + t.Errorf("xs.Read error: %#v\n", err) + } + + if err := xs.Write("foo", "bar"); err != nil { + t.Errorf("xs.Read error: %#v\n", err) + } +} |