diff options
-rw-r--r-- | .gitignore | 1 | ||||
-rw-r--r-- | Makefile | 36 | ||||
-rw-r--r-- | README.md | 21 | ||||
-rw-r--r-- | guestmetric/guestmetric.go | 38 | ||||
-rw-r--r-- | guestmetric/guestmetric_linux.go | 257 | ||||
-rw-r--r-- | guestmetric/guestmetric_test.go | 63 | ||||
-rw-r--r-- | xe-daemon/xe-daemon.go | 119 | ||||
-rw-r--r-- | xenstore/xenstore.go | 123 | ||||
-rw-r--r-- | xenstoreclient/xenstore.go | 433 | ||||
-rw-r--r-- | xenstoreclient/xenstore_test.go | 60 |
10 files changed, 1151 insertions, 0 deletions
@@ -6,6 +6,7 @@ # Folders _obj _test +bin # Architecture specific extensions/prefixes *.[568vq] diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..d5eeb5c --- /dev/null +++ b/Makefile @@ -0,0 +1,36 @@ + +GO_BUILD = go build +GO_FLAGS = -a -x + +SRC_DIR = . +BIN_DIR = bin + +BINARIES := +BINARIES += $(BIN_DIR)/xe-daemon +BINARIES += $(BIN_DIR)/xenstore + +XE_DAEMON_SOURCES := +XE_DAEMON_SOURCES += $(SRC_DIR)/xe-daemon/xe-daemon.go +XE_DAEMON_SOURCES += $(SRC_DIR)/guestmetric/guestmetric.go +XE_DAEMON_SOURCES += $(SRC_DIR)/guestmetric/guestmetric_linux.go +XE_DAEMON_SOURCES += $(SRC_DIR)/xenstoreclient/xenstore.go + +XENSTORE_SOURCES := +XENSTORE_SOURCES += $(SRC_DIR)/xenstore/xenstore.go +XENSTORE_SOURCES += $(SRC_DIR)/xenstoreclient/xenstore.go + +.PHONY: build +build: $(BINARIES) + +.PHONY: clean +clean: + -rm -f $(BINARIES) + +$(BIN_DIR)/xe-daemon: $(XE_DAEMON_SOURCES) + mkdir -p $(BIN_DIR) + $(GO_BUILD) $(GO_FLAGS) -o $@ $< + +$(BIN_DIR)/xenstore: $(XENSTORE_SOURCES) + mkdir -p $(BIN_DIR) + $(GO_BUILD) $(GO_FLAGS) -o $@ $< + diff --git a/README.md b/README.md new file mode 100644 index 0000000..ac8c91b --- /dev/null +++ b/README.md @@ -0,0 +1,21 @@ +go-guest-utilites +=================== + +This is the golang guest utilites for XenServer + + +XenStore CLI +----------- +xe-guest-utilities.git/xenstore + + +XenServer Guest Utilities +----------- +xe-guest-utilities.git/xe-daemon + + +Build Instructions +=================== +[Go development environment](https://golang.org/doc/install) is required to build the guest utilities. + +Type `make` or `make build` to build the xenstore and xe-daemon. diff --git a/guestmetric/guestmetric.go b/guestmetric/guestmetric.go new file mode 100644 index 0000000..836f105 --- /dev/null +++ b/guestmetric/guestmetric.go @@ -0,0 +1,38 @@ +package guestmetric + +import ( + "bytes" + "os/exec" +) + +type GuestMetric map[string]string + +type CollectFunc func() (GuestMetric, error) + +type GuestMetricsCollector interface { + CollectOS() (GuestMetric, error) + CollectMisc() (GuestMetric, error) + CollectNetworkAddr() (GuestMetric, error) + CollectDisk() (GuestMetric, error) + CollectMemory() (GuestMetric, error) +} + +func runCmd(name string, args ...string) (output string, err error) { + cmd := exec.Command(name, args...) + var out bytes.Buffer + cmd.Stdout = &out + err = cmd.Run() + if err != nil { + return "", err + } + output = out.String() + return output, nil +} + +func prefixKeys(prefix string, m GuestMetric) GuestMetric { + m1 := make(GuestMetric, 0) + for k, v := range m { + m1[prefix+k] = v + } + return m1 +} diff --git a/guestmetric/guestmetric_linux.go b/guestmetric/guestmetric_linux.go new file mode 100644 index 0000000..3970da4 --- /dev/null +++ b/guestmetric/guestmetric_linux.go @@ -0,0 +1,257 @@ +package guestmetric + +import ( + xenstoreclient "../xenstoreclient" + "bufio" + "bytes" + "fmt" + "os" + "path/filepath" + "regexp" + "sort" + "strconv" + "strings" +) + +type Collector struct { + Client xenstoreclient.XenStoreClient + Ballon bool + Debug bool +} + +func (c *Collector) CollectOS() (GuestMetric, error) { + current := make(GuestMetric, 0) + f, err := os.OpenFile("/var/cache/xe-linux-distribution", os.O_RDONLY, 0666) + if err != nil { + return nil, err + } + defer f.Close() + scanner := bufio.NewScanner(f) + for scanner.Scan() { + line := scanner.Text() + if strings.Contains(line, "=") { + parts := strings.SplitN(line, "=", 2) + k := strings.TrimSpace(parts[0]) + v := strings.TrimSpace(strings.Trim(strings.TrimSpace(parts[1]), "\"")) + current[k] = v + } + } + return prefixKeys("data/", current), nil +} + +func (c *Collector) CollectMisc() (GuestMetric, error) { + current := make(GuestMetric, 0) + if c.Ballon { + current["control/feature-balloon"] = "1" + } else { + current["control/feature-balloon"] = "0" + } + current["attr/PVAddons/Installed"] = "1" + current["attr/PVAddons/MajorVersion"] = "@PRODUCT_MAJOR_VERSION@" + current["attr/PVAddons/MinorVersion"] = "@PRODUCT_MINOR_VERSION@" + current["attr/PVAddons/MicroVersion"] = "@PRODUCT_MICRO_VERSION@" + current["attr/PVAddons/BuildVersion"] = "@NUMERIC_BUILD_NUMBER@" + + return current, nil +} + +func (c *Collector) CollectMemory() (GuestMetric, error) { + current := make(GuestMetric, 0) + f, err := os.OpenFile("/proc/meminfo", os.O_RDONLY, 0666) + if err != nil { + return nil, err + } + defer f.Close() + scanner := bufio.NewScanner(f) + for scanner.Scan() { + parts := regexp.MustCompile(`\w+`).FindAllString(scanner.Text(), -1) + switch parts[0] { + case "MemTotal": + current["meminfo_total"] = parts[1] + case "MemFree": + current["meminfo_free"] = parts[1] + } + } + return prefixKeys("data/", current), nil +} + +func EnumNetworkAddresses(iface string) (GuestMetric, error) { + const ( + IP_RE string = `(\d{1,3}\.){3}\d{1,3}` + IPV6_RE string = `[\da-f:]+[\da-f]` + ) + + var ( + IP_IPV4_ADDR_RE = regexp.MustCompile(`inet\s*(` + IP_RE + `).*\se[a-zA-Z0-9]+\s`) + IP_IPV6_ADDR_RE = regexp.MustCompile(`inet6\s*(` + IPV6_RE + `)`) + IFCONFIG_IPV4_ADDR_RE = regexp.MustCompile(`inet addr:\s*(` + IP_RE + `)`) + IFCONFIG_IPV6_ADDR_RE = regexp.MustCompile(`inet6 addr:\s*(` + IPV6_RE + `)`) + ) + + d := make(GuestMetric, 0) + + var v4re, v6re *regexp.Regexp + var out string + var err error + if out, err = runCmd("ip", "addr", "show", iface); err == nil { + v4re = IP_IPV4_ADDR_RE + v6re = IP_IPV6_ADDR_RE + } else if out, err = runCmd("ifconfig", iface); err == nil { + v4re = IFCONFIG_IPV4_ADDR_RE + v6re = IFCONFIG_IPV6_ADDR_RE + } else { + return nil, fmt.Errorf("Cannot found ip/ifconfig command") + } + + m := v4re.FindAllStringSubmatch(out, -1) + if m != nil { + for _, parts := range m { + d["ip"] = parts[1] + } + } + m = v6re.FindAllStringSubmatch(out, -1) + if m != nil { + for i, parts := range m { + d[fmt.Sprintf("ipv6/%d/addr", i)] = parts[1] + } + } + return d, nil +} + +func (c *Collector) CollectNetworkAddr() (GuestMetric, error) { + current := make(GuestMetric, 0) + + paths, err := filepath.Glob("/sys/class/net/e*") + if err != nil { + return nil, err + } + + for _, path := range paths { + iface := filepath.Base(path) + if addrs, err := EnumNetworkAddresses(iface); err == nil { + for tag, addr := range addrs { + current[fmt.Sprintf("%s/%s", iface, tag)] = addr + } + } + } + return prefixKeys("attr/", current), nil +} + +func readSysfs(filename string) (string, error) { + f, err := os.OpenFile(filename, os.O_RDONLY, 0666) + if err != nil { + return "", err + } + defer f.Close() + scanner := bufio.NewScanner(f) + scanner.Scan() + return scanner.Text(), nil +} + +func (c *Collector) CollectDisk() (GuestMetric, error) { + pi := make(GuestMetric, 0) + + disks := make([]string, 0) + paths, err := filepath.Glob("/sys/block/*/device") + if err != nil { + return nil, err + } + for _, path := range paths { + disk := filepath.Base(strings.TrimSuffix(filepath.Dir(path), "/")) + disks = append(disks, disk) + } + + var sortedDisks sort.StringSlice = disks + sortedDisks.Sort() + + part_idx := 0 + for _, disk := range sortedDisks[:] { + paths, err = filepath.Glob(fmt.Sprintf("/dev/%s?*", disk)) + if err != nil { + return nil, err + } + for _, path := range paths { + p := filepath.Base(path) + line, err := readSysfs(fmt.Sprintf("/sys/block/%s/%s/size", disk, p)) + if err != nil { + return nil, err + } + size, err := strconv.ParseInt(line, 10, 64) + if err != nil { + return nil, err + } + blocksize := 512 + if bs, err := readSysfs(fmt.Sprintf("/sys/block/%s/queue/physical_block_size", p)); err == nil { + if bs1, err := strconv.Atoi(bs); err == nil { + blocksize = bs1 + } + } + real_dev := "" + if c.Client != nil { + nodename, err := readSysfs(fmt.Sprintf("/sys/block/%s/device/nodename", disk)) + if err != nil { + return nil, err + } + backend, err := c.Client.Read(fmt.Sprintf("%s/backend", nodename)) + if err != nil { + return nil, err + } + real_dev, err = c.Client.Read(fmt.Sprintf("%s/dev", backend)) + if err != nil { + return nil, err + } + } + name := path + blkid, err := runCmd("blkid", "-s", "UUID", path) + if err != nil { + return nil, err + } + if strings.Contains(blkid, "=") { + parts := strings.SplitN(strings.TrimSpace(blkid), "=", 2) + name = fmt.Sprintf("%s(%s)", name, strings.Trim(parts[1], "\"")) + } + i := map[string]string{ + "extents/0": real_dev, + "name": name, + "size": strconv.FormatInt(size*int64(blocksize), 10), + } + output, err := runCmd("pvs", "--noheadings", "--units", "b", path) + if err == nil && output != "" { + parts := regexp.MustCompile(`\s+`).Split(output, -1)[1:] + i["free"] = strings.TrimSpace(parts[5])[:len(parts[5])-1] + i["filesystem"] = strings.TrimSpace(parts[2]) + i["mount_points/0"] = "[LVM]" + } else { + output, err = runCmd("mount") + if err == nil { + m := regexp.MustCompile(`(?m)^(\S+) on (\S+) type (\S+)`).FindAllStringSubmatch(output, -1) + if m != nil { + for _, parts := range m { + if parts[1] == path { + i["mount_points/0"] = parts[2] + i["filesystem"] = parts[3] + break + } + } + } + } + output, err = runCmd("df", path) + if err == nil { + scanner := bufio.NewScanner(bytes.NewReader([]byte(output))) + scanner.Scan() + scanner.Scan() + parts := regexp.MustCompile(`\s+`).Split(scanner.Text(), -1) + free, err := strconv.ParseInt(parts[3], 10, 64) + if err == nil { + i["free"] = strconv.FormatInt(free*1024, 10) + } + } + } + for k, v := range i { + pi[fmt.Sprintf("data/volumes/%d/%s", part_idx, k)] = v + } + part_idx += 1 + } + } + return pi, nil +} diff --git a/guestmetric/guestmetric_test.go b/guestmetric/guestmetric_test.go new file mode 100644 index 0000000..dcc5384 --- /dev/null +++ b/guestmetric/guestmetric_test.go @@ -0,0 +1,63 @@ +package guestmetric + +import ( + "testing" +) + +func TestCollector(t *testing.T) { + c := Collector{ + Client: nil, + } + + funcs := []CollectFunc{ + c.CollectDisk, + c.CollectMemory, + c.CollectMisc, + c.CollectNetworkAddr, + } + + for _, f := range funcs { + metric, err := f() + if err != nil { + t.Errorf("%#v error: %#v\n", f, err) + } + t.Logf("%#v return %#v", f, metric) + } +} + +func doBenchmark(b *testing.B, f CollectFunc) { + b.Logf("doBenchmark 1000 for %#v", f) + for i := 0; i < 1000; i++ { + if _, err := f(); err != nil { + b.Errorf("%#v error: %#v\n", f, err) + } + } +} + +func BenchmarkCollectorDisk(b *testing.B) { + c := Collector{ + Client: nil, + } + doBenchmark(b, c.CollectDisk) +} + +func BenchmarkCollectMemory(b *testing.B) { + c := Collector{ + Client: nil, + } + doBenchmark(b, c.CollectMemory) +} + +func BenchmarkCollectMisc(b *testing.B) { + c := Collector{ + Client: nil, + } + doBenchmark(b, c.CollectMisc) +} + +func BenchmarkCollectNetwork(b *testing.B) { + c := Collector{ + Client: nil, + } + doBenchmark(b, c.CollectNetworkAddr) +} diff --git a/xe-daemon/xe-daemon.go b/xe-daemon/xe-daemon.go new file mode 100644 index 0000000..1e5fdb1 --- /dev/null +++ b/xe-daemon/xe-daemon.go @@ -0,0 +1,119 @@ +package main + +import ( + guestmetric "../guestmetric" + xenstoreclient "../xenstoreclient" + "flag" + "fmt" + "log" + "os" + "os/signal" + "syscall" + "time" +) + +func write_pid_file(pid_file string) error { + f, err := os.Create(pid_file) + if err != nil { + return err + } + defer f.Close() + + fmt.Fprintf(f, "%d\n", os.Getpid()) + return nil +} + +func main() { + var err error + + sleepInterval := flag.Int("i", 60, "Interval between updates (in seconds)") + debugFlag := flag.Bool("d", false, "Update to stdout rather than xenstore") + balloonFlag := flag.Bool("B", true, "Do not report that ballooning is supported") + pid := flag.String("p", "", "Write the PID to FILE") + + flag.Parse() + + if *pid != "" { + write_pid_file(*pid) + } + + logger := log.New(os.Stderr, "xe-daemon", 0) + + exitChannel := make(chan os.Signal, 1) + signal.Notify(exitChannel, syscall.SIGTERM, syscall.SIGINT) + + xs, err := xenstoreclient.NewCachedXenstore(0) + if err != nil { + logger.Printf("NewCachedXenstore error: %v", err) + return + } + + collector := &guestmetric.Collector{ + Client: xs, + Ballon: *balloonFlag, + Debug: *debugFlag, + } + + collectors := []struct { + divisor int + name string + Collect func() (guestmetric.GuestMetric, error) + }{ + {1, "CollectOS", collector.CollectOS}, + {1, "CollectMisc", collector.CollectMisc}, + {1, "CollectNetworkAddr", collector.CollectNetworkAddr}, + {1, "CollectDisk", collector.CollectDisk}, + {2, "CollectMemory", collector.CollectMemory}, + } + + lastUniqueID, err := xs.Read("unique-domain-id") + if err != nil { + logger.Printf("xenstore.Read unique-domain-id error: %v", err) + } + + for count := 0; ; count += 1 { + uniqueID, err := xs.Read("unique-domain-id") + if err != nil { + logger.Printf("xenstore.Read unique-domain-id error: %v", err) + return + } + if uniqueID != lastUniqueID { + // VM has just resume, cache state now invalid + lastUniqueID = uniqueID + if cx, ok := xs.(*xenstoreclient.CachedXenStore); ok { + cx.Clear() + } + } + + // invoke collectors + for _, collector := range collectors { + if count%collector.divisor == 0 { + logger.Printf("Running %s ...", collector.name) + result, err := collector.Collect() + if err != nil { + logger.Printf("%s error: %#v", collector.name, err) + } else { + for name, value := range result { + err := xs.Write(name, value) + if err != nil { + logger.Printf("xenstore.Write error: %v", err) + } else { + logger.Printf("xenstore.Write OK: %#v: %#v", name, value) + } + } + } + } + } + + xs.Write("data/updated", time.Now().Format("Mon Jan _2 15:04:05 2006")) + + select { + case <-exitChannel: + logger.Printf("Received an interrupt, stopping services...") + return + + case <-time.After(time.Duration(*sleepInterval) * time.Second): + continue + } + } +} diff --git a/xenstore/xenstore.go b/xenstore/xenstore.go new file mode 100644 index 0000000..c6fe1a5 --- /dev/null +++ b/xenstore/xenstore.go @@ -0,0 +1,123 @@ +package main + +import ( + xenstoreclient "../xenstoreclient" + "fmt" + "os" + "strings" +) + +func die(format string, a ...interface{}) { + fmt.Fprintf(os.Stderr, format, a...) + fmt.Fprintln(os.Stderr) + os.Exit(1) +} + +func usage() { + die( + `Usage: xenstore read key [ key ... ] + write key value [ key value ... ] + rm key [ key ... ] + exists key [ key ... ]`) +} + +func new_xs() xenstoreclient.XenStoreClient { + xs, err := xenstoreclient.NewXenstore(0) + if err != nil { + die("xenstore.Open error: %v", err) + } + + return xs +} + +func xs_read(script_name string, args []string) { + if len(args) == 0 || args[0] == "-h" { + die("Usage: %s key [ key ... ]", script_name) + } + + xs := new_xs() + for _, key := range args[:] { + result, err := xs.Read(key) + if err != nil { + die("%s error: %v", script_name, err) + } + + fmt.Println(result) + } +} + +func xs_write(script_name string, args []string) { + if len(args) == 0 || args[0] == "-h" || len(args)%2 != 0 { + die("Usage: %s key value [ key value ... ]", script_name) + } + + xs := new_xs() + for i := 0; i < len(args); i += 2 { + key := args[i] + value := args[i+1] + + err := xs.Write(key, value) + if err != nil { + die("%s error: %v", script_name, err) + } + } +} + +func xs_rm(script_name string, args []string) { + if len(args) == 0 || args[0] == "-h" { + die("Usage: %s key [ key ... ]", script_name) + } + + xs := new_xs() + for _, key := range args[:] { + err := xs.Rm(key) + if err != nil { + die("%s error: %v", script_name, err) + } + } +} + +func xs_exists(script_name string, args []string) { + if len(args) == 0 || args[0] == "-h" { + die("Usage: %s key [ key ... ]", script_name) + } + + xs := new_xs() + for _, key := range args[:] { + _, err := xs.Read(key) + if err != nil { + die("%s error: %v", script_name, err) + } + } +} + +func main() { + var operation string + var args []string + + script_name := os.Args[0] + if strings.Contains(script_name, "-") { + operation = script_name[strings.LastIndex(script_name, "-")+1:] + args = os.Args[1:] + } else { + if len(os.Args) < 2 { + usage() + } + operation = os.Args[1] + script_name = script_name + " " + operation + args = os.Args[2:] + } + + switch operation { + case "read": + xs_read(script_name, args) + case "write": + xs_write(script_name, args) + case "rm": + xs_rm(script_name, args) + case "exists": + xs_exists(script_name, args) + default: + usage() + } +} diff --git a/xenstoreclient/xenstore.go b/xenstoreclient/xenstore.go new file mode 100644 index 0000000..0d3e2bb --- /dev/null +++ b/xenstoreclient/xenstore.go @@ -0,0 +1,433 @@ +package xenstoreclient + +import ( + "bufio" + "bytes" + "encoding/binary" + "errors" + "fmt" + "io" + "os" + "strconv" + "strings" + "sync" + "time" +) + +type Permission int + +const ( + PERM_NONE Permission = iota + PERM_READ + PERM_WRITE + PERM_READWRITE +) + +type Operation uint32 + +const ( + XS_READ Operation = 2 + XS_GET_PERMS Operation = 3 + XS_WATCH Operation = 4 + XS_UNWATCH Operation = 5 + XS_TRANSACTION_START Operation = 6 + XS_TRANSACTION_END Operation = 7 + XS_WRITE Operation = 11 + XS_MKDIR Operation = 12 + XS_RM Operation = 13 + XS_SET_PERMS Operation = 14 + XS_WATCH_EVENT Operation = 15 + XS_ERROR Operation = 16 +) + +type Packet struct { + OpCode Operation + Req uint32 + TxID uint32 + Length uint32 + Value []byte +} + +type Event struct { + Token string + Data []byte +} + +type XenStoreClient interface { + Close() error + DO(packet *Packet) (*Packet, error) + Read(path string) (string, error) + Mkdir(path string) error + Rm(path string) error + Write(path string, value string) error + GetPermission(path string) (map[int]Permission, error) + Watch(path string) (<-chan Event, error) + StopWatch() error +} + +func ReadPacket(r io.Reader) (packet *Packet, err error) { + + packet = &Packet{} + + err = binary.Read(r, binary.LittleEndian, &packet.OpCode) + if err != nil { + return nil, err + } + err = binary.Read(r, binary.LittleEndian, &packet.Req) + if err != nil { + return nil, err + } + err = binary.Read(r, binary.LittleEndian, &packet.TxID) + if err != nil { + return nil, err + } + err = binary.Read(r, binary.LittleEndian, &packet.Length) + if err != nil { + return nil, err + } + + if packet.Length > 0 { + packet.Value = make([]byte, packet.Length) + _, err = io.ReadFull(r, packet.Value) + if err != nil { + return nil, err + } + if packet.OpCode == XS_ERROR { + return nil, errors.New(strings.Split(string(packet.Value), "\x00")[0]) + } + } + + return packet, nil +} + +func (p *Packet) Write(w io.Writer) (err error) { + var bw *bufio.Writer + + if w1, ok := w.(*bufio.Writer); ok { + bw = w1 + } else { + bw = bufio.NewWriter(w) + } + defer bw.Flush() + + err = binary.Write(bw, binary.LittleEndian, p.OpCode) + if err != nil { + return err + } + err = binary.Write(bw, binary.LittleEndian, p.Req) + if err != nil { + return err + } + err = binary.Write(bw, binary.LittleEndian, p.TxID) + if err != nil { + return err + } + err = binary.Write(bw, binary.LittleEndian, p.Length) + if err != nil { + return err + } + if p.Length > 0 { + _, err = bw.Write(p.Value) + if err != nil { + return err + } + } + return nil +} + +type XenStore struct { + tx uint32 + xbFile io.ReadWriteCloser + xbFileReader *bufio.Reader + muWatch *sync.Mutex + onceWatch *sync.Once + watchQueues map[string]chan Event + watchStopChan chan struct{} + watchStoppedChan chan struct{} + nonWatchQueue chan []byte +} + +func NewXenstore(tx uint32) (XenStoreClient, error) { + devPath, err := getDevPath() + if err != nil { + return nil, err + } + + xbFile, err := os.OpenFile(devPath, os.O_RDWR, 0666) + if err != nil { + return nil, err + } + return newXenstore(tx, xbFile) +} + +func newXenstore(tx uint32, rwc io.ReadWriteCloser) (XenStoreClient, error) { + return &XenStore{ + tx: tx, + xbFile: rwc, + xbFileReader: bufio.NewReader(rwc), + watchQueues: make(map[string]chan Event, 0), + nonWatchQueue: nil, + watchStopChan: make(chan struct{}, 1), + watchStoppedChan: make(chan struct{}, 1), + onceWatch: &sync.Once{}, + muWatch: &sync.Mutex{}, + }, nil +} + +func (xs *XenStore) Close() error { + return xs.xbFile.Close() +} + +func (xs *XenStore) DO(req *Packet) (resp *Packet, err error) { + err = req.Write(xs.xbFile) + if err != nil { + return nil, err + } + + var r io.Reader + if xs.nonWatchQueue != nil { + data := <-xs.nonWatchQueue + r = bytes.NewReader(data) + } else { + r = xs.xbFileReader + } + + resp, err = ReadPacket(r) + return resp, err +} + +func (xs *XenStore) Read(path string) (string, error) { + v := []byte(path + "\x00") + req := &Packet{ + OpCode: XS_READ, + Req: 0, + TxID: xs.tx, + Length: uint32(len(v)), + Value: v, + } + resp, err := xs.DO(req) + if err != nil { + return "", err + } + return string(resp.Value), nil +} + +func (xs *XenStore) Mkdir(path string) error { + v := []byte(path + "\x00") + req := &Packet{ + OpCode: XS_WRITE, + Req: 0, + TxID: xs.tx, + Length: uint32(len(v)), + Value: v, + } + _, err := xs.DO(req) + return err +} + +func (xs *XenStore) Rm(path string) error { + v := []byte(path + "\x00") + req := &Packet{ + OpCode: XS_RM, + Req: 0, + TxID: xs.tx, + Length: uint32(len(v)), + Value: v, + } + _, err := xs.DO(req) + return err +} + +func (xs *XenStore) Write(path string, value string) error { + v := []byte(path + "\x00" + value) + req := &Packet{ + OpCode: XS_WRITE, + Req: 0, + TxID: xs.tx, + Length: uint32(len(v)), + Value: v, + } + _, err := xs.DO(req) + return err +} + +func (xs *XenStore) GetPermission(path string) (map[int]Permission, error) { + perm := make(map[int]Permission, 0) + + v := []byte(path + "\x00") + req := &Packet{ + OpCode: XS_GET_PERMS, + Req: 0, + TxID: xs.tx, + Length: uint32(len(v)), + Value: v, + } + resp, err := xs.DO(req) + if err != nil { + return nil, err + } + + for _, e := range strings.Split(string(resp.Value[:len(resp.Value)-1]), "\x00") { + k, err := strconv.Atoi(e[1:]) + if err != nil { + return nil, err + } + var p Permission + switch e[0] { + case 'n': + p = PERM_NONE + case 'r': + p = PERM_READ + case 'w': + p = PERM_WRITE + case 'b': + p = PERM_READWRITE + } + perm[k] = p + } + return perm, nil +} + +func (xs *XenStore) Watch(path string) (<-chan Event, error) { + watcher := func() { + + type XSData struct { + *Packet + Error error + } + + xsDataChan := make(chan XSData, 100) + go func(r io.Reader, out chan<- XSData) { + for { + p, err := ReadPacket(r) + out <- XSData{Packet: p, Error: err} + } + }(xs.xbFileReader, xsDataChan) + + xs.nonWatchQueue = make(chan []byte, 100) + for { + select { + case <-xs.watchStopChan: + fmt.Printf("watch receive stop signal, quit.") + xs.watchStopChan <- struct{}{} + return + case xsdata := <-xsDataChan: + if xsdata.Error != nil { + fmt.Printf("watch receive error: %#v", xsdata.Error) + return + } + switch xsdata.Packet.OpCode { + case XS_WATCH_EVENT: + parts := strings.SplitN(string(xsdata.Value), "\x00", 2) + path := parts[0] + token := parts[1] + data := []byte(parts[2]) + if c, ok := xs.watchQueues[path]; ok { + c <- Event{token, data} + } + default: + var b bytes.Buffer + xsdata.Packet.Write(&b) + xs.nonWatchQueue <- b.Bytes() + } + } + } + } + xs.onceWatch.Do(watcher) + xs.muWatch.Lock() + defer xs.muWatch.Unlock() + if _, ok := xs.watchQueues[path]; !ok { + xs.watchQueues[path] = make(chan Event, 100) + } + return xs.watchQueues[path], nil +} + +func (xs *XenStore) StopWatch() error { + xs.watchStopChan <- struct{}{} + <-xs.watchStoppedChan + xs.nonWatchQueue = nil + return nil +} + +type CachedXenStore struct { + xs XenStoreClient + writeCache map[string]string + lastCommit map[string]time.Time +} + +func NewCachedXenstore(tx uint32) (XenStoreClient, error) { + xs, err := NewXenstore(tx) + if err != nil { + return nil, err + } + return &CachedXenStore{ + xs: xs, + writeCache: make(map[string]string, 0), + lastCommit: make(map[string]time.Time, 0), + }, nil +} + +func (xs *CachedXenStore) Write(path string, value string) error { + if v, ok := xs.writeCache[path]; ok && v == value { + if t, ok := xs.lastCommit[path]; ok && t.After(time.Now().Add(-2*time.Minute)) { + return nil + } + } + err := xs.xs.Write(path, value) + if err != nil { + xs.writeCache[path] = value + xs.lastCommit[path] = time.Now() + } + return err +} + +func (xs *CachedXenStore) Close() error { + return xs.xs.Close() +} + +func (xs *CachedXenStore) DO(req *Packet) (resp *Packet, err error) { + return xs.xs.DO(req) +} + +func (xs *CachedXenStore) Read(path string) (string, error) { + return xs.xs.Read(path) +} + +func (xs *CachedXenStore) Mkdir(path string) error { + return xs.xs.Mkdir(path) +} + +func (xs *CachedXenStore) Rm(path string) error { + return xs.xs.Rm(path) +} + +func (xs *CachedXenStore) GetPermission(path string) (map[int]Permission, error) { + return xs.xs.GetPermission(path) +} + +func (xs *CachedXenStore) Watch(path string) (<-chan Event, error) { + return xs.xs.Watch(path) +} + +func (xs *CachedXenStore) StopWatch() error { + return xs.xs.StopWatch() +} + +func (xs *CachedXenStore) Clear() { + xs.writeCache = make(map[string]string, 0) + xs.lastCommit = make(map[string]time.Time, 0) +} + +func getDevPath() (devPath string, err error) { + devPaths := []string{ + "/proc/xen/xenbus", + "/dev/xen/xenbus", + "/kern/xen/xenbus", + } + for _, devPath = range devPaths { + if _, err = os.Stat(devPath); err == nil { + return devPath, err + } + } + return "", fmt.Errorf("Cannot locate xenbus dev path in %v", devPaths) +} diff --git a/xenstoreclient/xenstore_test.go b/xenstoreclient/xenstore_test.go new file mode 100644 index 0000000..1259447 --- /dev/null +++ b/xenstoreclient/xenstore_test.go @@ -0,0 +1,60 @@ +package xenstoreclient + +import ( + "bytes" + "testing" +) + +type mockFile struct { + t *testing.T +} + +func (f *mockFile) Read(b []byte) (n int, err error) { + value := "i am value" + p := &Packet{ + OpCode: XS_READ, + Req: 0, + TxID: 0, + Length: uint32(len(value)), + Value: []byte(value), + } + var buf bytes.Buffer + if err = p.Write(&buf); err != nil { + return 0, err + } + copy(b, buf.Bytes()) + n = buf.Len() + f.t.Logf("Read %d bytes", n) + return n, nil +} + +func (f *mockFile) Write(b []byte) (n int, err error) { + buf := bytes.NewBuffer(b) + if _, err := ReadPacket(buf); err != nil { + return 0, err + } + n = len(b) + f.t.Logf("Write %d bytes", n) + return n, nil +} + +func (f *mockFile) Close() error { + f.t.Logf("Close()") + return nil +} + +func TestXenStore(t *testing.T) { + xs, err := newXenstore(0, &mockFile{t}) + if err != nil { + t.Errorf("newXenstore error: %#v\n", err) + } + defer xs.Close() + + if _, err := xs.Read("foo"); err != nil { + t.Errorf("xs.Read error: %#v\n", err) + } + + if err := xs.Write("foo", "bar"); err != nil { + t.Errorf("xs.Read error: %#v\n", err) + } +} |