summaryrefslogtreecommitdiff
path: root/xenstoreclient
diff options
context:
space:
mode:
authorzheng <Zheng.chai@citrix.com>2015-05-19 08:49:57 +0100
committerzheng <Zheng.chai@citrix.com>2015-05-19 08:49:57 +0100
commit2fb5afbd442e35a6e0ba3bd57a04e0bd58c7c213 (patch)
tree158e65bf08b638bff6375baeb2d13fda16667196 /xenstoreclient
parent2c85890a25bff9f327ea95018c541c1190267492 (diff)
parent75bc98ab536804963551416a4206ec8c43ebcc34 (diff)
downloadvyos-xe-guest-utilities-2fb5afbd442e35a6e0ba3bd57a04e0bd58c7c213.tar.gz
vyos-xe-guest-utilities-2fb5afbd442e35a6e0ba3bd57a04e0bd58c7c213.zip
Merge pull request #1 from xs-nanjing/master
CP-11399: Go Linux guest agent for XenServer
Diffstat (limited to 'xenstoreclient')
-rw-r--r--xenstoreclient/xenstore.go433
-rw-r--r--xenstoreclient/xenstore_test.go60
2 files changed, 493 insertions, 0 deletions
diff --git a/xenstoreclient/xenstore.go b/xenstoreclient/xenstore.go
new file mode 100644
index 0000000..0d3e2bb
--- /dev/null
+++ b/xenstoreclient/xenstore.go
@@ -0,0 +1,433 @@
+package xenstoreclient
+
+import (
+ "bufio"
+ "bytes"
+ "encoding/binary"
+ "errors"
+ "fmt"
+ "io"
+ "os"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+)
+
+type Permission int
+
+const (
+ PERM_NONE Permission = iota
+ PERM_READ
+ PERM_WRITE
+ PERM_READWRITE
+)
+
+type Operation uint32
+
+const (
+ XS_READ Operation = 2
+ XS_GET_PERMS Operation = 3
+ XS_WATCH Operation = 4
+ XS_UNWATCH Operation = 5
+ XS_TRANSACTION_START Operation = 6
+ XS_TRANSACTION_END Operation = 7
+ XS_WRITE Operation = 11
+ XS_MKDIR Operation = 12
+ XS_RM Operation = 13
+ XS_SET_PERMS Operation = 14
+ XS_WATCH_EVENT Operation = 15
+ XS_ERROR Operation = 16
+)
+
+type Packet struct {
+ OpCode Operation
+ Req uint32
+ TxID uint32
+ Length uint32
+ Value []byte
+}
+
+type Event struct {
+ Token string
+ Data []byte
+}
+
+type XenStoreClient interface {
+ Close() error
+ DO(packet *Packet) (*Packet, error)
+ Read(path string) (string, error)
+ Mkdir(path string) error
+ Rm(path string) error
+ Write(path string, value string) error
+ GetPermission(path string) (map[int]Permission, error)
+ Watch(path string) (<-chan Event, error)
+ StopWatch() error
+}
+
+func ReadPacket(r io.Reader) (packet *Packet, err error) {
+
+ packet = &Packet{}
+
+ err = binary.Read(r, binary.LittleEndian, &packet.OpCode)
+ if err != nil {
+ return nil, err
+ }
+ err = binary.Read(r, binary.LittleEndian, &packet.Req)
+ if err != nil {
+ return nil, err
+ }
+ err = binary.Read(r, binary.LittleEndian, &packet.TxID)
+ if err != nil {
+ return nil, err
+ }
+ err = binary.Read(r, binary.LittleEndian, &packet.Length)
+ if err != nil {
+ return nil, err
+ }
+
+ if packet.Length > 0 {
+ packet.Value = make([]byte, packet.Length)
+ _, err = io.ReadFull(r, packet.Value)
+ if err != nil {
+ return nil, err
+ }
+ if packet.OpCode == XS_ERROR {
+ return nil, errors.New(strings.Split(string(packet.Value), "\x00")[0])
+ }
+ }
+
+ return packet, nil
+}
+
+func (p *Packet) Write(w io.Writer) (err error) {
+ var bw *bufio.Writer
+
+ if w1, ok := w.(*bufio.Writer); ok {
+ bw = w1
+ } else {
+ bw = bufio.NewWriter(w)
+ }
+ defer bw.Flush()
+
+ err = binary.Write(bw, binary.LittleEndian, p.OpCode)
+ if err != nil {
+ return err
+ }
+ err = binary.Write(bw, binary.LittleEndian, p.Req)
+ if err != nil {
+ return err
+ }
+ err = binary.Write(bw, binary.LittleEndian, p.TxID)
+ if err != nil {
+ return err
+ }
+ err = binary.Write(bw, binary.LittleEndian, p.Length)
+ if err != nil {
+ return err
+ }
+ if p.Length > 0 {
+ _, err = bw.Write(p.Value)
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+type XenStore struct {
+ tx uint32
+ xbFile io.ReadWriteCloser
+ xbFileReader *bufio.Reader
+ muWatch *sync.Mutex
+ onceWatch *sync.Once
+ watchQueues map[string]chan Event
+ watchStopChan chan struct{}
+ watchStoppedChan chan struct{}
+ nonWatchQueue chan []byte
+}
+
+func NewXenstore(tx uint32) (XenStoreClient, error) {
+ devPath, err := getDevPath()
+ if err != nil {
+ return nil, err
+ }
+
+ xbFile, err := os.OpenFile(devPath, os.O_RDWR, 0666)
+ if err != nil {
+ return nil, err
+ }
+ return newXenstore(tx, xbFile)
+}
+
+func newXenstore(tx uint32, rwc io.ReadWriteCloser) (XenStoreClient, error) {
+ return &XenStore{
+ tx: tx,
+ xbFile: rwc,
+ xbFileReader: bufio.NewReader(rwc),
+ watchQueues: make(map[string]chan Event, 0),
+ nonWatchQueue: nil,
+ watchStopChan: make(chan struct{}, 1),
+ watchStoppedChan: make(chan struct{}, 1),
+ onceWatch: &sync.Once{},
+ muWatch: &sync.Mutex{},
+ }, nil
+}
+
+func (xs *XenStore) Close() error {
+ return xs.xbFile.Close()
+}
+
+func (xs *XenStore) DO(req *Packet) (resp *Packet, err error) {
+ err = req.Write(xs.xbFile)
+ if err != nil {
+ return nil, err
+ }
+
+ var r io.Reader
+ if xs.nonWatchQueue != nil {
+ data := <-xs.nonWatchQueue
+ r = bytes.NewReader(data)
+ } else {
+ r = xs.xbFileReader
+ }
+
+ resp, err = ReadPacket(r)
+ return resp, err
+}
+
+func (xs *XenStore) Read(path string) (string, error) {
+ v := []byte(path + "\x00")
+ req := &Packet{
+ OpCode: XS_READ,
+ Req: 0,
+ TxID: xs.tx,
+ Length: uint32(len(v)),
+ Value: v,
+ }
+ resp, err := xs.DO(req)
+ if err != nil {
+ return "", err
+ }
+ return string(resp.Value), nil
+}
+
+func (xs *XenStore) Mkdir(path string) error {
+ v := []byte(path + "\x00")
+ req := &Packet{
+ OpCode: XS_WRITE,
+ Req: 0,
+ TxID: xs.tx,
+ Length: uint32(len(v)),
+ Value: v,
+ }
+ _, err := xs.DO(req)
+ return err
+}
+
+func (xs *XenStore) Rm(path string) error {
+ v := []byte(path + "\x00")
+ req := &Packet{
+ OpCode: XS_RM,
+ Req: 0,
+ TxID: xs.tx,
+ Length: uint32(len(v)),
+ Value: v,
+ }
+ _, err := xs.DO(req)
+ return err
+}
+
+func (xs *XenStore) Write(path string, value string) error {
+ v := []byte(path + "\x00" + value)
+ req := &Packet{
+ OpCode: XS_WRITE,
+ Req: 0,
+ TxID: xs.tx,
+ Length: uint32(len(v)),
+ Value: v,
+ }
+ _, err := xs.DO(req)
+ return err
+}
+
+func (xs *XenStore) GetPermission(path string) (map[int]Permission, error) {
+ perm := make(map[int]Permission, 0)
+
+ v := []byte(path + "\x00")
+ req := &Packet{
+ OpCode: XS_GET_PERMS,
+ Req: 0,
+ TxID: xs.tx,
+ Length: uint32(len(v)),
+ Value: v,
+ }
+ resp, err := xs.DO(req)
+ if err != nil {
+ return nil, err
+ }
+
+ for _, e := range strings.Split(string(resp.Value[:len(resp.Value)-1]), "\x00") {
+ k, err := strconv.Atoi(e[1:])
+ if err != nil {
+ return nil, err
+ }
+ var p Permission
+ switch e[0] {
+ case 'n':
+ p = PERM_NONE
+ case 'r':
+ p = PERM_READ
+ case 'w':
+ p = PERM_WRITE
+ case 'b':
+ p = PERM_READWRITE
+ }
+ perm[k] = p
+ }
+ return perm, nil
+}
+
+func (xs *XenStore) Watch(path string) (<-chan Event, error) {
+ watcher := func() {
+
+ type XSData struct {
+ *Packet
+ Error error
+ }
+
+ xsDataChan := make(chan XSData, 100)
+ go func(r io.Reader, out chan<- XSData) {
+ for {
+ p, err := ReadPacket(r)
+ out <- XSData{Packet: p, Error: err}
+ }
+ }(xs.xbFileReader, xsDataChan)
+
+ xs.nonWatchQueue = make(chan []byte, 100)
+ for {
+ select {
+ case <-xs.watchStopChan:
+ fmt.Printf("watch receive stop signal, quit.")
+ xs.watchStopChan <- struct{}{}
+ return
+ case xsdata := <-xsDataChan:
+ if xsdata.Error != nil {
+ fmt.Printf("watch receive error: %#v", xsdata.Error)
+ return
+ }
+ switch xsdata.Packet.OpCode {
+ case XS_WATCH_EVENT:
+ parts := strings.SplitN(string(xsdata.Value), "\x00", 2)
+ path := parts[0]
+ token := parts[1]
+ data := []byte(parts[2])
+ if c, ok := xs.watchQueues[path]; ok {
+ c <- Event{token, data}
+ }
+ default:
+ var b bytes.Buffer
+ xsdata.Packet.Write(&b)
+ xs.nonWatchQueue <- b.Bytes()
+ }
+ }
+ }
+ }
+ xs.onceWatch.Do(watcher)
+ xs.muWatch.Lock()
+ defer xs.muWatch.Unlock()
+ if _, ok := xs.watchQueues[path]; !ok {
+ xs.watchQueues[path] = make(chan Event, 100)
+ }
+ return xs.watchQueues[path], nil
+}
+
+func (xs *XenStore) StopWatch() error {
+ xs.watchStopChan <- struct{}{}
+ <-xs.watchStoppedChan
+ xs.nonWatchQueue = nil
+ return nil
+}
+
+type CachedXenStore struct {
+ xs XenStoreClient
+ writeCache map[string]string
+ lastCommit map[string]time.Time
+}
+
+func NewCachedXenstore(tx uint32) (XenStoreClient, error) {
+ xs, err := NewXenstore(tx)
+ if err != nil {
+ return nil, err
+ }
+ return &CachedXenStore{
+ xs: xs,
+ writeCache: make(map[string]string, 0),
+ lastCommit: make(map[string]time.Time, 0),
+ }, nil
+}
+
+func (xs *CachedXenStore) Write(path string, value string) error {
+ if v, ok := xs.writeCache[path]; ok && v == value {
+ if t, ok := xs.lastCommit[path]; ok && t.After(time.Now().Add(-2*time.Minute)) {
+ return nil
+ }
+ }
+ err := xs.xs.Write(path, value)
+ if err != nil {
+ xs.writeCache[path] = value
+ xs.lastCommit[path] = time.Now()
+ }
+ return err
+}
+
+func (xs *CachedXenStore) Close() error {
+ return xs.xs.Close()
+}
+
+func (xs *CachedXenStore) DO(req *Packet) (resp *Packet, err error) {
+ return xs.xs.DO(req)
+}
+
+func (xs *CachedXenStore) Read(path string) (string, error) {
+ return xs.xs.Read(path)
+}
+
+func (xs *CachedXenStore) Mkdir(path string) error {
+ return xs.xs.Mkdir(path)
+}
+
+func (xs *CachedXenStore) Rm(path string) error {
+ return xs.xs.Rm(path)
+}
+
+func (xs *CachedXenStore) GetPermission(path string) (map[int]Permission, error) {
+ return xs.xs.GetPermission(path)
+}
+
+func (xs *CachedXenStore) Watch(path string) (<-chan Event, error) {
+ return xs.xs.Watch(path)
+}
+
+func (xs *CachedXenStore) StopWatch() error {
+ return xs.xs.StopWatch()
+}
+
+func (xs *CachedXenStore) Clear() {
+ xs.writeCache = make(map[string]string, 0)
+ xs.lastCommit = make(map[string]time.Time, 0)
+}
+
+func getDevPath() (devPath string, err error) {
+ devPaths := []string{
+ "/proc/xen/xenbus",
+ "/dev/xen/xenbus",
+ "/kern/xen/xenbus",
+ }
+ for _, devPath = range devPaths {
+ if _, err = os.Stat(devPath); err == nil {
+ return devPath, err
+ }
+ }
+ return "", fmt.Errorf("Cannot locate xenbus dev path in %v", devPaths)
+}
diff --git a/xenstoreclient/xenstore_test.go b/xenstoreclient/xenstore_test.go
new file mode 100644
index 0000000..1259447
--- /dev/null
+++ b/xenstoreclient/xenstore_test.go
@@ -0,0 +1,60 @@
+package xenstoreclient
+
+import (
+ "bytes"
+ "testing"
+)
+
+type mockFile struct {
+ t *testing.T
+}
+
+func (f *mockFile) Read(b []byte) (n int, err error) {
+ value := "i am value"
+ p := &Packet{
+ OpCode: XS_READ,
+ Req: 0,
+ TxID: 0,
+ Length: uint32(len(value)),
+ Value: []byte(value),
+ }
+ var buf bytes.Buffer
+ if err = p.Write(&buf); err != nil {
+ return 0, err
+ }
+ copy(b, buf.Bytes())
+ n = buf.Len()
+ f.t.Logf("Read %d bytes", n)
+ return n, nil
+}
+
+func (f *mockFile) Write(b []byte) (n int, err error) {
+ buf := bytes.NewBuffer(b)
+ if _, err := ReadPacket(buf); err != nil {
+ return 0, err
+ }
+ n = len(b)
+ f.t.Logf("Write %d bytes", n)
+ return n, nil
+}
+
+func (f *mockFile) Close() error {
+ f.t.Logf("Close()")
+ return nil
+}
+
+func TestXenStore(t *testing.T) {
+ xs, err := newXenstore(0, &mockFile{t})
+ if err != nil {
+ t.Errorf("newXenstore error: %#v\n", err)
+ }
+ defer xs.Close()
+
+ if _, err := xs.Read("foo"); err != nil {
+ t.Errorf("xs.Read error: %#v\n", err)
+ }
+
+ if err := xs.Write("foo", "bar"); err != nil {
+ t.Errorf("xs.Read error: %#v\n", err)
+ }
+}