summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzheng <Zheng.chai@citrix.com>2015-05-19 08:49:57 +0100
committerzheng <Zheng.chai@citrix.com>2015-05-19 08:49:57 +0100
commit2fb5afbd442e35a6e0ba3bd57a04e0bd58c7c213 (patch)
tree158e65bf08b638bff6375baeb2d13fda16667196
parent2c85890a25bff9f327ea95018c541c1190267492 (diff)
parent75bc98ab536804963551416a4206ec8c43ebcc34 (diff)
downloadvyos-xe-guest-utilities-2fb5afbd442e35a6e0ba3bd57a04e0bd58c7c213.tar.gz
vyos-xe-guest-utilities-2fb5afbd442e35a6e0ba3bd57a04e0bd58c7c213.zip
Merge pull request #1 from xs-nanjing/master
CP-11399: Go Linux guest agent for XenServer
-rw-r--r--.gitignore1
-rw-r--r--Makefile36
-rw-r--r--README.md21
-rw-r--r--guestmetric/guestmetric.go38
-rw-r--r--guestmetric/guestmetric_linux.go257
-rw-r--r--guestmetric/guestmetric_test.go63
-rw-r--r--xe-daemon/xe-daemon.go119
-rw-r--r--xenstore/xenstore.go123
-rw-r--r--xenstoreclient/xenstore.go433
-rw-r--r--xenstoreclient/xenstore_test.go60
10 files changed, 1151 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
index daf913b..6921662 100644
--- a/.gitignore
+++ b/.gitignore
@@ -6,6 +6,7 @@
# Folders
_obj
_test
+bin
# Architecture specific extensions/prefixes
*.[568vq]
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..d5eeb5c
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,36 @@
+
+GO_BUILD = go build
+GO_FLAGS = -a -x
+
+SRC_DIR = .
+BIN_DIR = bin
+
+BINARIES :=
+BINARIES += $(BIN_DIR)/xe-daemon
+BINARIES += $(BIN_DIR)/xenstore
+
+XE_DAEMON_SOURCES :=
+XE_DAEMON_SOURCES += $(SRC_DIR)/xe-daemon/xe-daemon.go
+XE_DAEMON_SOURCES += $(SRC_DIR)/guestmetric/guestmetric.go
+XE_DAEMON_SOURCES += $(SRC_DIR)/guestmetric/guestmetric_linux.go
+XE_DAEMON_SOURCES += $(SRC_DIR)/xenstoreclient/xenstore.go
+
+XENSTORE_SOURCES :=
+XENSTORE_SOURCES += $(SRC_DIR)/xenstore/xenstore.go
+XENSTORE_SOURCES += $(SRC_DIR)/xenstoreclient/xenstore.go
+
+.PHONY: build
+build: $(BINARIES)
+
+.PHONY: clean
+clean:
+ -rm -f $(BINARIES)
+
+$(BIN_DIR)/xe-daemon: $(XE_DAEMON_SOURCES)
+ mkdir -p $(BIN_DIR)
+ $(GO_BUILD) $(GO_FLAGS) -o $@ $<
+
+$(BIN_DIR)/xenstore: $(XENSTORE_SOURCES)
+ mkdir -p $(BIN_DIR)
+ $(GO_BUILD) $(GO_FLAGS) -o $@ $<
+
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..ac8c91b
--- /dev/null
+++ b/README.md
@@ -0,0 +1,21 @@
+go-guest-utilites
+===================
+
+This is the golang guest utilites for XenServer
+
+
+XenStore CLI
+-----------
+xe-guest-utilities.git/xenstore
+
+
+XenServer Guest Utilities
+-----------
+xe-guest-utilities.git/xe-daemon
+
+
+Build Instructions
+===================
+[Go development environment](https://golang.org/doc/install) is required to build the guest utilities.
+
+Type `make` or `make build` to build the xenstore and xe-daemon.
diff --git a/guestmetric/guestmetric.go b/guestmetric/guestmetric.go
new file mode 100644
index 0000000..836f105
--- /dev/null
+++ b/guestmetric/guestmetric.go
@@ -0,0 +1,38 @@
+package guestmetric
+
+import (
+ "bytes"
+ "os/exec"
+)
+
+type GuestMetric map[string]string
+
+type CollectFunc func() (GuestMetric, error)
+
+type GuestMetricsCollector interface {
+ CollectOS() (GuestMetric, error)
+ CollectMisc() (GuestMetric, error)
+ CollectNetworkAddr() (GuestMetric, error)
+ CollectDisk() (GuestMetric, error)
+ CollectMemory() (GuestMetric, error)
+}
+
+func runCmd(name string, args ...string) (output string, err error) {
+ cmd := exec.Command(name, args...)
+ var out bytes.Buffer
+ cmd.Stdout = &out
+ err = cmd.Run()
+ if err != nil {
+ return "", err
+ }
+ output = out.String()
+ return output, nil
+}
+
+func prefixKeys(prefix string, m GuestMetric) GuestMetric {
+ m1 := make(GuestMetric, 0)
+ for k, v := range m {
+ m1[prefix+k] = v
+ }
+ return m1
+}
diff --git a/guestmetric/guestmetric_linux.go b/guestmetric/guestmetric_linux.go
new file mode 100644
index 0000000..3970da4
--- /dev/null
+++ b/guestmetric/guestmetric_linux.go
@@ -0,0 +1,257 @@
+package guestmetric
+
+import (
+ xenstoreclient "../xenstoreclient"
+ "bufio"
+ "bytes"
+ "fmt"
+ "os"
+ "path/filepath"
+ "regexp"
+ "sort"
+ "strconv"
+ "strings"
+)
+
+type Collector struct {
+ Client xenstoreclient.XenStoreClient
+ Ballon bool
+ Debug bool
+}
+
+func (c *Collector) CollectOS() (GuestMetric, error) {
+ current := make(GuestMetric, 0)
+ f, err := os.OpenFile("/var/cache/xe-linux-distribution", os.O_RDONLY, 0666)
+ if err != nil {
+ return nil, err
+ }
+ defer f.Close()
+ scanner := bufio.NewScanner(f)
+ for scanner.Scan() {
+ line := scanner.Text()
+ if strings.Contains(line, "=") {
+ parts := strings.SplitN(line, "=", 2)
+ k := strings.TrimSpace(parts[0])
+ v := strings.TrimSpace(strings.Trim(strings.TrimSpace(parts[1]), "\""))
+ current[k] = v
+ }
+ }
+ return prefixKeys("data/", current), nil
+}
+
+func (c *Collector) CollectMisc() (GuestMetric, error) {
+ current := make(GuestMetric, 0)
+ if c.Ballon {
+ current["control/feature-balloon"] = "1"
+ } else {
+ current["control/feature-balloon"] = "0"
+ }
+ current["attr/PVAddons/Installed"] = "1"
+ current["attr/PVAddons/MajorVersion"] = "@PRODUCT_MAJOR_VERSION@"
+ current["attr/PVAddons/MinorVersion"] = "@PRODUCT_MINOR_VERSION@"
+ current["attr/PVAddons/MicroVersion"] = "@PRODUCT_MICRO_VERSION@"
+ current["attr/PVAddons/BuildVersion"] = "@NUMERIC_BUILD_NUMBER@"
+
+ return current, nil
+}
+
+func (c *Collector) CollectMemory() (GuestMetric, error) {
+ current := make(GuestMetric, 0)
+ f, err := os.OpenFile("/proc/meminfo", os.O_RDONLY, 0666)
+ if err != nil {
+ return nil, err
+ }
+ defer f.Close()
+ scanner := bufio.NewScanner(f)
+ for scanner.Scan() {
+ parts := regexp.MustCompile(`\w+`).FindAllString(scanner.Text(), -1)
+ switch parts[0] {
+ case "MemTotal":
+ current["meminfo_total"] = parts[1]
+ case "MemFree":
+ current["meminfo_free"] = parts[1]
+ }
+ }
+ return prefixKeys("data/", current), nil
+}
+
+func EnumNetworkAddresses(iface string) (GuestMetric, error) {
+ const (
+ IP_RE string = `(\d{1,3}\.){3}\d{1,3}`
+ IPV6_RE string = `[\da-f:]+[\da-f]`
+ )
+
+ var (
+ IP_IPV4_ADDR_RE = regexp.MustCompile(`inet\s*(` + IP_RE + `).*\se[a-zA-Z0-9]+\s`)
+ IP_IPV6_ADDR_RE = regexp.MustCompile(`inet6\s*(` + IPV6_RE + `)`)
+ IFCONFIG_IPV4_ADDR_RE = regexp.MustCompile(`inet addr:\s*(` + IP_RE + `)`)
+ IFCONFIG_IPV6_ADDR_RE = regexp.MustCompile(`inet6 addr:\s*(` + IPV6_RE + `)`)
+ )
+
+ d := make(GuestMetric, 0)
+
+ var v4re, v6re *regexp.Regexp
+ var out string
+ var err error
+ if out, err = runCmd("ip", "addr", "show", iface); err == nil {
+ v4re = IP_IPV4_ADDR_RE
+ v6re = IP_IPV6_ADDR_RE
+ } else if out, err = runCmd("ifconfig", iface); err == nil {
+ v4re = IFCONFIG_IPV4_ADDR_RE
+ v6re = IFCONFIG_IPV6_ADDR_RE
+ } else {
+ return nil, fmt.Errorf("Cannot found ip/ifconfig command")
+ }
+
+ m := v4re.FindAllStringSubmatch(out, -1)
+ if m != nil {
+ for _, parts := range m {
+ d["ip"] = parts[1]
+ }
+ }
+ m = v6re.FindAllStringSubmatch(out, -1)
+ if m != nil {
+ for i, parts := range m {
+ d[fmt.Sprintf("ipv6/%d/addr", i)] = parts[1]
+ }
+ }
+ return d, nil
+}
+
+func (c *Collector) CollectNetworkAddr() (GuestMetric, error) {
+ current := make(GuestMetric, 0)
+
+ paths, err := filepath.Glob("/sys/class/net/e*")
+ if err != nil {
+ return nil, err
+ }
+
+ for _, path := range paths {
+ iface := filepath.Base(path)
+ if addrs, err := EnumNetworkAddresses(iface); err == nil {
+ for tag, addr := range addrs {
+ current[fmt.Sprintf("%s/%s", iface, tag)] = addr
+ }
+ }
+ }
+ return prefixKeys("attr/", current), nil
+}
+
+func readSysfs(filename string) (string, error) {
+ f, err := os.OpenFile(filename, os.O_RDONLY, 0666)
+ if err != nil {
+ return "", err
+ }
+ defer f.Close()
+ scanner := bufio.NewScanner(f)
+ scanner.Scan()
+ return scanner.Text(), nil
+}
+
+func (c *Collector) CollectDisk() (GuestMetric, error) {
+ pi := make(GuestMetric, 0)
+
+ disks := make([]string, 0)
+ paths, err := filepath.Glob("/sys/block/*/device")
+ if err != nil {
+ return nil, err
+ }
+ for _, path := range paths {
+ disk := filepath.Base(strings.TrimSuffix(filepath.Dir(path), "/"))
+ disks = append(disks, disk)
+ }
+
+ var sortedDisks sort.StringSlice = disks
+ sortedDisks.Sort()
+
+ part_idx := 0
+ for _, disk := range sortedDisks[:] {
+ paths, err = filepath.Glob(fmt.Sprintf("/dev/%s?*", disk))
+ if err != nil {
+ return nil, err
+ }
+ for _, path := range paths {
+ p := filepath.Base(path)
+ line, err := readSysfs(fmt.Sprintf("/sys/block/%s/%s/size", disk, p))
+ if err != nil {
+ return nil, err
+ }
+ size, err := strconv.ParseInt(line, 10, 64)
+ if err != nil {
+ return nil, err
+ }
+ blocksize := 512
+ if bs, err := readSysfs(fmt.Sprintf("/sys/block/%s/queue/physical_block_size", p)); err == nil {
+ if bs1, err := strconv.Atoi(bs); err == nil {
+ blocksize = bs1
+ }
+ }
+ real_dev := ""
+ if c.Client != nil {
+ nodename, err := readSysfs(fmt.Sprintf("/sys/block/%s/device/nodename", disk))
+ if err != nil {
+ return nil, err
+ }
+ backend, err := c.Client.Read(fmt.Sprintf("%s/backend", nodename))
+ if err != nil {
+ return nil, err
+ }
+ real_dev, err = c.Client.Read(fmt.Sprintf("%s/dev", backend))
+ if err != nil {
+ return nil, err
+ }
+ }
+ name := path
+ blkid, err := runCmd("blkid", "-s", "UUID", path)
+ if err != nil {
+ return nil, err
+ }
+ if strings.Contains(blkid, "=") {
+ parts := strings.SplitN(strings.TrimSpace(blkid), "=", 2)
+ name = fmt.Sprintf("%s(%s)", name, strings.Trim(parts[1], "\""))
+ }
+ i := map[string]string{
+ "extents/0": real_dev,
+ "name": name,
+ "size": strconv.FormatInt(size*int64(blocksize), 10),
+ }
+ output, err := runCmd("pvs", "--noheadings", "--units", "b", path)
+ if err == nil && output != "" {
+ parts := regexp.MustCompile(`\s+`).Split(output, -1)[1:]
+ i["free"] = strings.TrimSpace(parts[5])[:len(parts[5])-1]
+ i["filesystem"] = strings.TrimSpace(parts[2])
+ i["mount_points/0"] = "[LVM]"
+ } else {
+ output, err = runCmd("mount")
+ if err == nil {
+ m := regexp.MustCompile(`(?m)^(\S+) on (\S+) type (\S+)`).FindAllStringSubmatch(output, -1)
+ if m != nil {
+ for _, parts := range m {
+ if parts[1] == path {
+ i["mount_points/0"] = parts[2]
+ i["filesystem"] = parts[3]
+ break
+ }
+ }
+ }
+ }
+ output, err = runCmd("df", path)
+ if err == nil {
+ scanner := bufio.NewScanner(bytes.NewReader([]byte(output)))
+ scanner.Scan()
+ scanner.Scan()
+ parts := regexp.MustCompile(`\s+`).Split(scanner.Text(), -1)
+ free, err := strconv.ParseInt(parts[3], 10, 64)
+ if err == nil {
+ i["free"] = strconv.FormatInt(free*1024, 10)
+ }
+ }
+ }
+ for k, v := range i {
+ pi[fmt.Sprintf("data/volumes/%d/%s", part_idx, k)] = v
+ }
+ part_idx += 1
+ }
+ }
+ return pi, nil
+}
diff --git a/guestmetric/guestmetric_test.go b/guestmetric/guestmetric_test.go
new file mode 100644
index 0000000..dcc5384
--- /dev/null
+++ b/guestmetric/guestmetric_test.go
@@ -0,0 +1,63 @@
+package guestmetric
+
+import (
+ "testing"
+)
+
+func TestCollector(t *testing.T) {
+ c := Collector{
+ Client: nil,
+ }
+
+ funcs := []CollectFunc{
+ c.CollectDisk,
+ c.CollectMemory,
+ c.CollectMisc,
+ c.CollectNetworkAddr,
+ }
+
+ for _, f := range funcs {
+ metric, err := f()
+ if err != nil {
+ t.Errorf("%#v error: %#v\n", f, err)
+ }
+ t.Logf("%#v return %#v", f, metric)
+ }
+}
+
+func doBenchmark(b *testing.B, f CollectFunc) {
+ b.Logf("doBenchmark 1000 for %#v", f)
+ for i := 0; i < 1000; i++ {
+ if _, err := f(); err != nil {
+ b.Errorf("%#v error: %#v\n", f, err)
+ }
+ }
+}
+
+func BenchmarkCollectorDisk(b *testing.B) {
+ c := Collector{
+ Client: nil,
+ }
+ doBenchmark(b, c.CollectDisk)
+}
+
+func BenchmarkCollectMemory(b *testing.B) {
+ c := Collector{
+ Client: nil,
+ }
+ doBenchmark(b, c.CollectMemory)
+}
+
+func BenchmarkCollectMisc(b *testing.B) {
+ c := Collector{
+ Client: nil,
+ }
+ doBenchmark(b, c.CollectMisc)
+}
+
+func BenchmarkCollectNetwork(b *testing.B) {
+ c := Collector{
+ Client: nil,
+ }
+ doBenchmark(b, c.CollectNetworkAddr)
+}
diff --git a/xe-daemon/xe-daemon.go b/xe-daemon/xe-daemon.go
new file mode 100644
index 0000000..1e5fdb1
--- /dev/null
+++ b/xe-daemon/xe-daemon.go
@@ -0,0 +1,119 @@
+package main
+
+import (
+ guestmetric "../guestmetric"
+ xenstoreclient "../xenstoreclient"
+ "flag"
+ "fmt"
+ "log"
+ "os"
+ "os/signal"
+ "syscall"
+ "time"
+)
+
+func write_pid_file(pid_file string) error {
+ f, err := os.Create(pid_file)
+ if err != nil {
+ return err
+ }
+ defer f.Close()
+
+ fmt.Fprintf(f, "%d\n", os.Getpid())
+ return nil
+}
+
+func main() {
+ var err error
+
+ sleepInterval := flag.Int("i", 60, "Interval between updates (in seconds)")
+ debugFlag := flag.Bool("d", false, "Update to stdout rather than xenstore")
+ balloonFlag := flag.Bool("B", true, "Do not report that ballooning is supported")
+ pid := flag.String("p", "", "Write the PID to FILE")
+
+ flag.Parse()
+
+ if *pid != "" {
+ write_pid_file(*pid)
+ }
+
+ logger := log.New(os.Stderr, "xe-daemon", 0)
+
+ exitChannel := make(chan os.Signal, 1)
+ signal.Notify(exitChannel, syscall.SIGTERM, syscall.SIGINT)
+
+ xs, err := xenstoreclient.NewCachedXenstore(0)
+ if err != nil {
+ logger.Printf("NewCachedXenstore error: %v", err)
+ return
+ }
+
+ collector := &guestmetric.Collector{
+ Client: xs,
+ Ballon: *balloonFlag,
+ Debug: *debugFlag,
+ }
+
+ collectors := []struct {
+ divisor int
+ name string
+ Collect func() (guestmetric.GuestMetric, error)
+ }{
+ {1, "CollectOS", collector.CollectOS},
+ {1, "CollectMisc", collector.CollectMisc},
+ {1, "CollectNetworkAddr", collector.CollectNetworkAddr},
+ {1, "CollectDisk", collector.CollectDisk},
+ {2, "CollectMemory", collector.CollectMemory},
+ }
+
+ lastUniqueID, err := xs.Read("unique-domain-id")
+ if err != nil {
+ logger.Printf("xenstore.Read unique-domain-id error: %v", err)
+ }
+
+ for count := 0; ; count += 1 {
+ uniqueID, err := xs.Read("unique-domain-id")
+ if err != nil {
+ logger.Printf("xenstore.Read unique-domain-id error: %v", err)
+ return
+ }
+ if uniqueID != lastUniqueID {
+ // VM has just resume, cache state now invalid
+ lastUniqueID = uniqueID
+ if cx, ok := xs.(*xenstoreclient.CachedXenStore); ok {
+ cx.Clear()
+ }
+ }
+
+ // invoke collectors
+ for _, collector := range collectors {
+ if count%collector.divisor == 0 {
+ logger.Printf("Running %s ...", collector.name)
+ result, err := collector.Collect()
+ if err != nil {
+ logger.Printf("%s error: %#v", collector.name, err)
+ } else {
+ for name, value := range result {
+ err := xs.Write(name, value)
+ if err != nil {
+ logger.Printf("xenstore.Write error: %v", err)
+ } else {
+ logger.Printf("xenstore.Write OK: %#v: %#v", name, value)
+ }
+ }
+ }
+ }
+ }
+
+ xs.Write("data/updated", time.Now().Format("Mon Jan _2 15:04:05 2006"))
+
+ select {
+ case <-exitChannel:
+ logger.Printf("Received an interrupt, stopping services...")
+ return
+
+ case <-time.After(time.Duration(*sleepInterval) * time.Second):
+ continue
+ }
+ }
+}
diff --git a/xenstore/xenstore.go b/xenstore/xenstore.go
new file mode 100644
index 0000000..c6fe1a5
--- /dev/null
+++ b/xenstore/xenstore.go
@@ -0,0 +1,123 @@
+package main
+
+import (
+ xenstoreclient "../xenstoreclient"
+ "fmt"
+ "os"
+ "strings"
+)
+
+func die(format string, a ...interface{}) {
+ fmt.Fprintf(os.Stderr, format, a...)
+ fmt.Fprintln(os.Stderr)
+ os.Exit(1)
+}
+
+func usage() {
+ die(
+ `Usage: xenstore read key [ key ... ]
+ write key value [ key value ... ]
+ rm key [ key ... ]
+ exists key [ key ... ]`)
+}
+
+func new_xs() xenstoreclient.XenStoreClient {
+ xs, err := xenstoreclient.NewXenstore(0)
+ if err != nil {
+ die("xenstore.Open error: %v", err)
+ }
+
+ return xs
+}
+
+func xs_read(script_name string, args []string) {
+ if len(args) == 0 || args[0] == "-h" {
+ die("Usage: %s key [ key ... ]", script_name)
+ }
+
+ xs := new_xs()
+ for _, key := range args[:] {
+ result, err := xs.Read(key)
+ if err != nil {
+ die("%s error: %v", script_name, err)
+ }
+
+ fmt.Println(result)
+ }
+}
+
+func xs_write(script_name string, args []string) {
+ if len(args) == 0 || args[0] == "-h" || len(args)%2 != 0 {
+ die("Usage: %s key value [ key value ... ]", script_name)
+ }
+
+ xs := new_xs()
+ for i := 0; i < len(args); i += 2 {
+ key := args[i]
+ value := args[i+1]
+
+ err := xs.Write(key, value)
+ if err != nil {
+ die("%s error: %v", script_name, err)
+ }
+ }
+}
+
+func xs_rm(script_name string, args []string) {
+ if len(args) == 0 || args[0] == "-h" {
+ die("Usage: %s key [ key ... ]", script_name)
+ }
+
+ xs := new_xs()
+ for _, key := range args[:] {
+ err := xs.Rm(key)
+ if err != nil {
+ die("%s error: %v", script_name, err)
+ }
+ }
+}
+
+func xs_exists(script_name string, args []string) {
+ if len(args) == 0 || args[0] == "-h" {
+ die("Usage: %s key [ key ... ]", script_name)
+ }
+
+ xs := new_xs()
+ for _, key := range args[:] {
+ _, err := xs.Read(key)
+ if err != nil {
+ die("%s error: %v", script_name, err)
+ }
+ }
+}
+
+func main() {
+ var operation string
+ var args []string
+
+ script_name := os.Args[0]
+ if strings.Contains(script_name, "-") {
+ operation = script_name[strings.LastIndex(script_name, "-")+1:]
+ args = os.Args[1:]
+ } else {
+ if len(os.Args) < 2 {
+ usage()
+ }
+ operation = os.Args[1]
+ script_name = script_name + " " + operation
+ args = os.Args[2:]
+ }
+
+ switch operation {
+ case "read":
+ xs_read(script_name, args)
+ case "write":
+ xs_write(script_name, args)
+ case "rm":
+ xs_rm(script_name, args)
+ case "exists":
+ xs_exists(script_name, args)
+ default:
+ usage()
+ }
+}
diff --git a/xenstoreclient/xenstore.go b/xenstoreclient/xenstore.go
new file mode 100644
index 0000000..0d3e2bb
--- /dev/null
+++ b/xenstoreclient/xenstore.go
@@ -0,0 +1,433 @@
+package xenstoreclient
+
+import (
+ "bufio"
+ "bytes"
+ "encoding/binary"
+ "errors"
+ "fmt"
+ "io"
+ "os"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+)
+
+type Permission int
+
+const (
+ PERM_NONE Permission = iota
+ PERM_READ
+ PERM_WRITE
+ PERM_READWRITE
+)
+
+type Operation uint32
+
+const (
+ XS_READ Operation = 2
+ XS_GET_PERMS Operation = 3
+ XS_WATCH Operation = 4
+ XS_UNWATCH Operation = 5
+ XS_TRANSACTION_START Operation = 6
+ XS_TRANSACTION_END Operation = 7
+ XS_WRITE Operation = 11
+ XS_MKDIR Operation = 12
+ XS_RM Operation = 13
+ XS_SET_PERMS Operation = 14
+ XS_WATCH_EVENT Operation = 15
+ XS_ERROR Operation = 16
+)
+
+type Packet struct {
+ OpCode Operation
+ Req uint32
+ TxID uint32
+ Length uint32
+ Value []byte
+}
+
+type Event struct {
+ Token string
+ Data []byte
+}
+
+type XenStoreClient interface {
+ Close() error
+ DO(packet *Packet) (*Packet, error)
+ Read(path string) (string, error)
+ Mkdir(path string) error
+ Rm(path string) error
+ Write(path string, value string) error
+ GetPermission(path string) (map[int]Permission, error)
+ Watch(path string) (<-chan Event, error)
+ StopWatch() error
+}
+
+func ReadPacket(r io.Reader) (packet *Packet, err error) {
+
+ packet = &Packet{}
+
+ err = binary.Read(r, binary.LittleEndian, &packet.OpCode)
+ if err != nil {
+ return nil, err
+ }
+ err = binary.Read(r, binary.LittleEndian, &packet.Req)
+ if err != nil {
+ return nil, err
+ }
+ err = binary.Read(r, binary.LittleEndian, &packet.TxID)
+ if err != nil {
+ return nil, err
+ }
+ err = binary.Read(r, binary.LittleEndian, &packet.Length)
+ if err != nil {
+ return nil, err
+ }
+
+ if packet.Length > 0 {
+ packet.Value = make([]byte, packet.Length)
+ _, err = io.ReadFull(r, packet.Value)
+ if err != nil {
+ return nil, err
+ }
+ if packet.OpCode == XS_ERROR {
+ return nil, errors.New(strings.Split(string(packet.Value), "\x00")[0])
+ }
+ }
+
+ return packet, nil
+}
+
+func (p *Packet) Write(w io.Writer) (err error) {
+ var bw *bufio.Writer
+
+ if w1, ok := w.(*bufio.Writer); ok {
+ bw = w1
+ } else {
+ bw = bufio.NewWriter(w)
+ }
+ defer bw.Flush()
+
+ err = binary.Write(bw, binary.LittleEndian, p.OpCode)
+ if err != nil {
+ return err
+ }
+ err = binary.Write(bw, binary.LittleEndian, p.Req)
+ if err != nil {
+ return err
+ }
+ err = binary.Write(bw, binary.LittleEndian, p.TxID)
+ if err != nil {
+ return err
+ }
+ err = binary.Write(bw, binary.LittleEndian, p.Length)
+ if err != nil {
+ return err
+ }
+ if p.Length > 0 {
+ _, err = bw.Write(p.Value)
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+type XenStore struct {
+ tx uint32
+ xbFile io.ReadWriteCloser
+ xbFileReader *bufio.Reader
+ muWatch *sync.Mutex
+ onceWatch *sync.Once
+ watchQueues map[string]chan Event
+ watchStopChan chan struct{}
+ watchStoppedChan chan struct{}
+ nonWatchQueue chan []byte
+}
+
+func NewXenstore(tx uint32) (XenStoreClient, error) {
+ devPath, err := getDevPath()
+ if err != nil {
+ return nil, err
+ }
+
+ xbFile, err := os.OpenFile(devPath, os.O_RDWR, 0666)
+ if err != nil {
+ return nil, err
+ }
+ return newXenstore(tx, xbFile)
+}
+
+func newXenstore(tx uint32, rwc io.ReadWriteCloser) (XenStoreClient, error) {
+ return &XenStore{
+ tx: tx,
+ xbFile: rwc,
+ xbFileReader: bufio.NewReader(rwc),
+ watchQueues: make(map[string]chan Event, 0),
+ nonWatchQueue: nil,
+ watchStopChan: make(chan struct{}, 1),
+ watchStoppedChan: make(chan struct{}, 1),
+ onceWatch: &sync.Once{},
+ muWatch: &sync.Mutex{},
+ }, nil
+}
+
+func (xs *XenStore) Close() error {
+ return xs.xbFile.Close()
+}
+
+func (xs *XenStore) DO(req *Packet) (resp *Packet, err error) {
+ err = req.Write(xs.xbFile)
+ if err != nil {
+ return nil, err
+ }
+
+ var r io.Reader
+ if xs.nonWatchQueue != nil {
+ data := <-xs.nonWatchQueue
+ r = bytes.NewReader(data)
+ } else {
+ r = xs.xbFileReader
+ }
+
+ resp, err = ReadPacket(r)
+ return resp, err
+}
+
+func (xs *XenStore) Read(path string) (string, error) {
+ v := []byte(path + "\x00")
+ req := &Packet{
+ OpCode: XS_READ,
+ Req: 0,
+ TxID: xs.tx,
+ Length: uint32(len(v)),
+ Value: v,
+ }
+ resp, err := xs.DO(req)
+ if err != nil {
+ return "", err
+ }
+ return string(resp.Value), nil
+}
+
+func (xs *XenStore) Mkdir(path string) error {
+ v := []byte(path + "\x00")
+ req := &Packet{
+ OpCode: XS_WRITE,
+ Req: 0,
+ TxID: xs.tx,
+ Length: uint32(len(v)),
+ Value: v,
+ }
+ _, err := xs.DO(req)
+ return err
+}
+
+func (xs *XenStore) Rm(path string) error {
+ v := []byte(path + "\x00")
+ req := &Packet{
+ OpCode: XS_RM,
+ Req: 0,
+ TxID: xs.tx,
+ Length: uint32(len(v)),
+ Value: v,
+ }
+ _, err := xs.DO(req)
+ return err
+}
+
+func (xs *XenStore) Write(path string, value string) error {
+ v := []byte(path + "\x00" + value)
+ req := &Packet{
+ OpCode: XS_WRITE,
+ Req: 0,
+ TxID: xs.tx,
+ Length: uint32(len(v)),
+ Value: v,
+ }
+ _, err := xs.DO(req)
+ return err
+}
+
+func (xs *XenStore) GetPermission(path string) (map[int]Permission, error) {
+ perm := make(map[int]Permission, 0)
+
+ v := []byte(path + "\x00")
+ req := &Packet{
+ OpCode: XS_GET_PERMS,
+ Req: 0,
+ TxID: xs.tx,
+ Length: uint32(len(v)),
+ Value: v,
+ }
+ resp, err := xs.DO(req)
+ if err != nil {
+ return nil, err
+ }
+
+ for _, e := range strings.Split(string(resp.Value[:len(resp.Value)-1]), "\x00") {
+ k, err := strconv.Atoi(e[1:])
+ if err != nil {
+ return nil, err
+ }
+ var p Permission
+ switch e[0] {
+ case 'n':
+ p = PERM_NONE
+ case 'r':
+ p = PERM_READ
+ case 'w':
+ p = PERM_WRITE
+ case 'b':
+ p = PERM_READWRITE
+ }
+ perm[k] = p
+ }
+ return perm, nil
+}
+
+func (xs *XenStore) Watch(path string) (<-chan Event, error) {
+ watcher := func() {
+
+ type XSData struct {
+ *Packet
+ Error error
+ }
+
+ xsDataChan := make(chan XSData, 100)
+ go func(r io.Reader, out chan<- XSData) {
+ for {
+ p, err := ReadPacket(r)
+ out <- XSData{Packet: p, Error: err}
+ }
+ }(xs.xbFileReader, xsDataChan)
+
+ xs.nonWatchQueue = make(chan []byte, 100)
+ for {
+ select {
+ case <-xs.watchStopChan:
+ fmt.Printf("watch receive stop signal, quit.")
+ xs.watchStopChan <- struct{}{}
+ return
+ case xsdata := <-xsDataChan:
+ if xsdata.Error != nil {
+ fmt.Printf("watch receive error: %#v", xsdata.Error)
+ return
+ }
+ switch xsdata.Packet.OpCode {
+ case XS_WATCH_EVENT:
+ parts := strings.SplitN(string(xsdata.Value), "\x00", 2)
+ path := parts[0]
+ token := parts[1]
+ data := []byte(parts[2])
+ if c, ok := xs.watchQueues[path]; ok {
+ c <- Event{token, data}
+ }
+ default:
+ var b bytes.Buffer
+ xsdata.Packet.Write(&b)
+ xs.nonWatchQueue <- b.Bytes()
+ }
+ }
+ }
+ }
+ xs.onceWatch.Do(watcher)
+ xs.muWatch.Lock()
+ defer xs.muWatch.Unlock()
+ if _, ok := xs.watchQueues[path]; !ok {
+ xs.watchQueues[path] = make(chan Event, 100)
+ }
+ return xs.watchQueues[path], nil
+}
+
+func (xs *XenStore) StopWatch() error {
+ xs.watchStopChan <- struct{}{}
+ <-xs.watchStoppedChan
+ xs.nonWatchQueue = nil
+ return nil
+}
+
+type CachedXenStore struct {
+ xs XenStoreClient
+ writeCache map[string]string
+ lastCommit map[string]time.Time
+}
+
+func NewCachedXenstore(tx uint32) (XenStoreClient, error) {
+ xs, err := NewXenstore(tx)
+ if err != nil {
+ return nil, err
+ }
+ return &CachedXenStore{
+ xs: xs,
+ writeCache: make(map[string]string, 0),
+ lastCommit: make(map[string]time.Time, 0),
+ }, nil
+}
+
+func (xs *CachedXenStore) Write(path string, value string) error {
+ if v, ok := xs.writeCache[path]; ok && v == value {
+ if t, ok := xs.lastCommit[path]; ok && t.After(time.Now().Add(-2*time.Minute)) {
+ return nil
+ }
+ }
+ err := xs.xs.Write(path, value)
+ if err != nil {
+ xs.writeCache[path] = value
+ xs.lastCommit[path] = time.Now()
+ }
+ return err
+}
+
+func (xs *CachedXenStore) Close() error {
+ return xs.xs.Close()
+}
+
+func (xs *CachedXenStore) DO(req *Packet) (resp *Packet, err error) {
+ return xs.xs.DO(req)
+}
+
+func (xs *CachedXenStore) Read(path string) (string, error) {
+ return xs.xs.Read(path)
+}
+
+func (xs *CachedXenStore) Mkdir(path string) error {
+ return xs.xs.Mkdir(path)
+}
+
+func (xs *CachedXenStore) Rm(path string) error {
+ return xs.xs.Rm(path)
+}
+
+func (xs *CachedXenStore) GetPermission(path string) (map[int]Permission, error) {
+ return xs.xs.GetPermission(path)
+}
+
+func (xs *CachedXenStore) Watch(path string) (<-chan Event, error) {
+ return xs.xs.Watch(path)
+}
+
+func (xs *CachedXenStore) StopWatch() error {
+ return xs.xs.StopWatch()
+}
+
+func (xs *CachedXenStore) Clear() {
+ xs.writeCache = make(map[string]string, 0)
+ xs.lastCommit = make(map[string]time.Time, 0)
+}
+
+func getDevPath() (devPath string, err error) {
+ devPaths := []string{
+ "/proc/xen/xenbus",
+ "/dev/xen/xenbus",
+ "/kern/xen/xenbus",
+ }
+ for _, devPath = range devPaths {
+ if _, err = os.Stat(devPath); err == nil {
+ return devPath, err
+ }
+ }
+ return "", fmt.Errorf("Cannot locate xenbus dev path in %v", devPaths)
+}
diff --git a/xenstoreclient/xenstore_test.go b/xenstoreclient/xenstore_test.go
new file mode 100644
index 0000000..1259447
--- /dev/null
+++ b/xenstoreclient/xenstore_test.go
@@ -0,0 +1,60 @@
+package xenstoreclient
+
+import (
+ "bytes"
+ "testing"
+)
+
+type mockFile struct {
+ t *testing.T
+}
+
+func (f *mockFile) Read(b []byte) (n int, err error) {
+ value := "i am value"
+ p := &Packet{
+ OpCode: XS_READ,
+ Req: 0,
+ TxID: 0,
+ Length: uint32(len(value)),
+ Value: []byte(value),
+ }
+ var buf bytes.Buffer
+ if err = p.Write(&buf); err != nil {
+ return 0, err
+ }
+ copy(b, buf.Bytes())
+ n = buf.Len()
+ f.t.Logf("Read %d bytes", n)
+ return n, nil
+}
+
+func (f *mockFile) Write(b []byte) (n int, err error) {
+ buf := bytes.NewBuffer(b)
+ if _, err := ReadPacket(buf); err != nil {
+ return 0, err
+ }
+ n = len(b)
+ f.t.Logf("Write %d bytes", n)
+ return n, nil
+}
+
+func (f *mockFile) Close() error {
+ f.t.Logf("Close()")
+ return nil
+}
+
+func TestXenStore(t *testing.T) {
+ xs, err := newXenstore(0, &mockFile{t})
+ if err != nil {
+ t.Errorf("newXenstore error: %#v\n", err)
+ }
+ defer xs.Close()
+
+ if _, err := xs.Read("foo"); err != nil {
+ t.Errorf("xs.Read error: %#v\n", err)
+ }
+
+ if err := xs.Write("foo", "bar"); err != nil {
+ t.Errorf("xs.Read error: %#v\n", err)
+ }
+}