summaryrefslogtreecommitdiff
path: root/xenstoreclient
diff options
context:
space:
mode:
Diffstat (limited to 'xenstoreclient')
-rw-r--r--xenstoreclient/xenstore.go232
-rw-r--r--xenstoreclient/xenstore_test.go89
2 files changed, 239 insertions, 82 deletions
diff --git a/xenstoreclient/xenstore.go b/xenstoreclient/xenstore.go
index 0d3e2bb..cc30828 100644
--- a/xenstoreclient/xenstore.go
+++ b/xenstoreclient/xenstore.go
@@ -1,12 +1,14 @@
package xenstoreclient
import (
+ syslog "../syslog"
"bufio"
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"
+ "log"
"os"
"strconv"
"strings"
@@ -26,18 +28,27 @@ const (
type Operation uint32
const (
- XS_READ Operation = 2
- XS_GET_PERMS Operation = 3
- XS_WATCH Operation = 4
- XS_UNWATCH Operation = 5
- XS_TRANSACTION_START Operation = 6
- XS_TRANSACTION_END Operation = 7
- XS_WRITE Operation = 11
- XS_MKDIR Operation = 12
- XS_RM Operation = 13
- XS_SET_PERMS Operation = 14
- XS_WATCH_EVENT Operation = 15
- XS_ERROR Operation = 16
+ XS_DEBUG Operation = 0
+ XS_DIRECTORY Operation = 1
+ XS_READ Operation = 2
+ XS_GET_PERMS Operation = 3
+ XS_WATCH Operation = 4
+ XS_UNWATCH Operation = 5
+ XS_TRANSACTION_START Operation = 6
+ XS_TRANSACTION_END Operation = 7
+ XS_INTRODUCE Operation = 8
+ XS_RELEASE Operation = 9
+ XS_GET_DOMAIN_PATH Operation = 10
+ XS_WRITE Operation = 11
+ XS_MKDIR Operation = 12
+ XS_RM Operation = 13
+ XS_SET_PERMS Operation = 14
+ XS_WATCH_EVENT Operation = 15
+ XS_ERROR Operation = 16
+ XS_IS_DOMAIN_INTRODUCED Operation = 17
+ XS_RESUME Operation = 18
+ XS_SET_TARGET Operation = 19
+ XS_RESTRICT Operation = 128
)
type Packet struct {
@@ -48,11 +59,6 @@ type Packet struct {
Value []byte
}
-type Event struct {
- Token string
- Data []byte
-}
-
type XenStoreClient interface {
Close() error
DO(packet *Packet) (*Packet, error)
@@ -61,7 +67,9 @@ type XenStoreClient interface {
Rm(path string) error
Write(path string, value string) error
GetPermission(path string) (map[int]Permission, error)
- Watch(path string) (<-chan Event, error)
+ Watch(path string, token string) error
+ WatchEvent(key string) (token string, ok bool)
+ UnWatch(path string, token string) error
StopWatch() error
}
@@ -135,16 +143,55 @@ func (p *Packet) Write(w io.Writer) (err error) {
return nil
}
+type WatchQueueManager struct {
+ watchQueues map[string]chan string
+ rwlocker *sync.RWMutex
+}
+
+func (wq *WatchQueueManager) RemoveByKey(key string) {
+ wq.rwlocker.Lock()
+ defer wq.rwlocker.Unlock()
+ delete(wq.watchQueues, key)
+ return
+}
+
+func (wq *WatchQueueManager) SetEventByKey(key string, token string) (ok bool) {
+ wq.rwlocker.RLock()
+ defer wq.rwlocker.RUnlock()
+ wq.watchQueues[key] <- token
+ return
+}
+
+func (wq *WatchQueueManager) GetEventByKey(key string) (token string, ok bool) {
+ wq.rwlocker.RLock()
+ defer wq.rwlocker.RUnlock()
+ ec, ok := wq.watchQueues[key]
+ if len(ec) != 0 {
+ return <-ec, ok
+ } else {
+ ok = false
+ }
+ return
+}
+
+func (wq *WatchQueueManager) AddChanByKey(key string) {
+ wq.rwlocker.Lock()
+ defer wq.rwlocker.Unlock()
+ wq.watchQueues[key] = make(chan string, 100)
+}
+
type XenStore struct {
- tx uint32
- xbFile io.ReadWriteCloser
- xbFileReader *bufio.Reader
- muWatch *sync.Mutex
- onceWatch *sync.Once
- watchQueues map[string]chan Event
- watchStopChan chan struct{}
- watchStoppedChan chan struct{}
- nonWatchQueue chan []byte
+ tx uint32
+ xbFile io.ReadWriteCloser
+ xbFileReader *bufio.Reader
+ muWatch *sync.Mutex
+ onceWatch *sync.Once
+ watchQueue WatchQueueManager
+ watchStopChan chan struct{}
+ watchStoppedChan chan struct{}
+ nonWatchQueue chan []byte
+ xbFileReaderLocker *sync.Mutex
+ logger *log.Logger
}
func NewXenstore(tx uint32) (XenStoreClient, error) {
@@ -160,17 +207,38 @@ func NewXenstore(tx uint32) (XenStoreClient, error) {
return newXenstore(tx, xbFile)
}
+const (
+ LoggerName string = "xenstore"
+)
+
func newXenstore(tx uint32, rwc io.ReadWriteCloser) (XenStoreClient, error) {
+ var loggerWriter io.Writer = os.Stderr
+ var topic string = LoggerName
+ if w, err := syslog.NewSyslogWriter(topic); err == nil {
+ loggerWriter = w
+ topic = ""
+ } else {
+ fmt.Fprintf(os.Stderr, "NewSyslogWriter(%s) error: %s, use stderr logging\n", topic, err)
+ topic = LoggerName + ": "
+ }
+
+ logger := log.New(loggerWriter, topic, 0)
+
return &XenStore{
- tx: tx,
- xbFile: rwc,
- xbFileReader: bufio.NewReader(rwc),
- watchQueues: make(map[string]chan Event, 0),
- nonWatchQueue: nil,
- watchStopChan: make(chan struct{}, 1),
- watchStoppedChan: make(chan struct{}, 1),
- onceWatch: &sync.Once{},
- muWatch: &sync.Mutex{},
+ tx: tx,
+ xbFile: rwc,
+ xbFileReader: bufio.NewReader(rwc),
+ watchQueue: WatchQueueManager{
+ watchQueues: make(map[string]chan string, 0),
+ rwlocker: &sync.RWMutex{},
+ },
+ nonWatchQueue: nil,
+ watchStopChan: make(chan struct{}, 1),
+ watchStoppedChan: make(chan struct{}, 1),
+ onceWatch: &sync.Once{},
+ muWatch: &sync.Mutex{},
+ xbFileReaderLocker: &sync.Mutex{},
+ logger: logger,
}, nil
}
@@ -179,6 +247,8 @@ func (xs *XenStore) Close() error {
}
func (xs *XenStore) DO(req *Packet) (resp *Packet, err error) {
+ xs.xbFileReaderLocker.Lock()
+ defer xs.xbFileReaderLocker.Unlock()
err = req.Write(xs.xbFile)
if err != nil {
return nil, err
@@ -191,7 +261,6 @@ func (xs *XenStore) DO(req *Packet) (resp *Packet, err error) {
} else {
r = xs.xbFileReader
}
-
resp, err = ReadPacket(r)
return resp, err
}
@@ -288,32 +357,61 @@ func (xs *XenStore) GetPermission(path string) (map[int]Permission, error) {
return perm, nil
}
-func (xs *XenStore) Watch(path string) (<-chan Event, error) {
- watcher := func() {
+func (xs *XenStore) UnWatch(path string, token string) (err error) {
+ v := []byte(path + "\x00" + token + "\x00")
+ req := &Packet{
+ OpCode: XS_UNWATCH,
+ Req: 0,
+ TxID: xs.tx,
+ Length: uint32(len(v)),
+ Value: v,
+ }
+ _, err = xs.DO(req)
+ if err != nil {
+ return
+ }
+ xs.watchQueue.RemoveByKey(path)
+ return nil
+}
+func (xs *XenStore) Watch(path string, token string) error {
+ watcher := func() {
+ logger.Printf("Watch: Start")
type XSData struct {
*Packet
Error error
}
xsDataChan := make(chan XSData, 100)
- go func(r io.Reader, out chan<- XSData) {
+ xsReadStop := make(chan bool)
+ go func(r io.Reader, out chan<- XSData, c <-chan bool) {
for {
+ // The read will return at once if no data in r
p, err := ReadPacket(r)
+ ticker := time.Tick(1 * time.Second)
+ if err != nil {
+ select {
+ case <-c:
+ return
+ case <-ticker:
+ continue
+ }
+ }
out <- XSData{Packet: p, Error: err}
}
- }(xs.xbFileReader, xsDataChan)
+ }(xs.xbFileReader, xsDataChan, xsReadStop)
xs.nonWatchQueue = make(chan []byte, 100)
for {
select {
case <-xs.watchStopChan:
- fmt.Printf("watch receive stop signal, quit.")
- xs.watchStopChan <- struct{}{}
+ logger.Printf("Watch: receive stop signal, quit.\n")
+ xs.watchStoppedChan <- struct{}{}
+ xsReadStop <- true
return
case xsdata := <-xsDataChan:
if xsdata.Error != nil {
- fmt.Printf("watch receive error: %#v", xsdata.Error)
+ logger.Printf("Watch: receive error: %#v", xsdata.Error)
return
}
switch xsdata.Packet.OpCode {
@@ -321,11 +419,10 @@ func (xs *XenStore) Watch(path string) (<-chan Event, error) {
parts := strings.SplitN(string(xsdata.Value), "\x00", 2)
path := parts[0]
token := parts[1]
- data := []byte(parts[2])
- if c, ok := xs.watchQueues[path]; ok {
- c <- Event{token, data}
- }
+ logger.Printf("Get XS_WATCH_EVENT key:%s, token:%s\n", path, token)
+ xs.watchQueue.SetEventByKey(path, token)
default:
+ logger.Printf("Get non watch event %#v\n", xsdata.Packet.OpCode)
var b bytes.Buffer
xsdata.Packet.Write(&b)
xs.nonWatchQueue <- b.Bytes()
@@ -333,19 +430,34 @@ func (xs *XenStore) Watch(path string) (<-chan Event, error) {
}
}
}
- xs.onceWatch.Do(watcher)
xs.muWatch.Lock()
defer xs.muWatch.Unlock()
- if _, ok := xs.watchQueues[path]; !ok {
- xs.watchQueues[path] = make(chan Event, 100)
+ v := []byte(path + "\x00" + token + "\x00")
+ req := &Packet{
+ OpCode: XS_WATCH,
+ Req: 0,
+ TxID: xs.tx,
+ Length: uint32(len(v)),
+ Value: v,
}
- return xs.watchQueues[path], nil
+ _, err := xs.DO(req)
+ if err != nil {
+ logger.Errorf("Watch failed with error %#v\n", err)
+ return err
+ }
+ xs.watchQueue.AddChanByKey(path)
+ go xs.onceWatch.Do(watcher)
+ return nil
+}
+
+func (xs *XenStore) WatchEvent(key string) (token string, ok bool) {
+ return xs.watchQueue.GetEventByKey(key)
}
func (xs *XenStore) StopWatch() error {
xs.watchStopChan <- struct{}{}
- <-xs.watchStoppedChan
xs.nonWatchQueue = nil
+ <-xs.watchStoppedChan
return nil
}
@@ -405,8 +517,16 @@ func (xs *CachedXenStore) GetPermission(path string) (map[int]Permission, error)
return xs.xs.GetPermission(path)
}
-func (xs *CachedXenStore) Watch(path string) (<-chan Event, error) {
- return xs.xs.Watch(path)
+func (xs *CachedXenStore) Watch(path string, token string) error {
+ return xs.xs.Watch(path, token)
+}
+
+func (xs *CachedXenStore) WatchEvent(key string) (token string, ok bool) {
+ return xs.xs.WatchEvent(key)
+}
+
+func (xs *CachedXenStore) UnWatch(path string, token string) error {
+ return xs.xs.UnWatch(path, token)
}
func (xs *CachedXenStore) StopWatch() error {
@@ -429,5 +549,5 @@ func getDevPath() (devPath string, err error) {
return devPath, err
}
}
- return "", fmt.Errorf("Cannot locate xenbus dev path in %v", devPaths)
+ return "", logger.Errorf("Cannot locate xenbus dev path in %v", devPaths)
}
diff --git a/xenstoreclient/xenstore_test.go b/xenstoreclient/xenstore_test.go
index 1259447..6335e6b 100644
--- a/xenstoreclient/xenstore_test.go
+++ b/xenstoreclient/xenstore_test.go
@@ -2,40 +2,48 @@ package xenstoreclient
import (
"bytes"
+ "fmt"
+ "io"
"testing"
+ "time"
)
type mockFile struct {
- t *testing.T
+ r io.Reader
+ w io.Writer
+ watchKeys map[string]struct{}
+ t *testing.T
}
-func (f *mockFile) Read(b []byte) (n int, err error) {
- value := "i am value"
- p := &Packet{
- OpCode: XS_READ,
- Req: 0,
- TxID: 0,
- Length: uint32(len(value)),
- Value: []byte(value),
+func NewMockFile(t *testing.T) io.ReadWriteCloser {
+ var b bytes.Buffer
+
+ return &mockFile{
+ r: &b,
+ w: &b,
+ t: t,
+ watchKeys: make(map[string]struct{}),
}
- var buf bytes.Buffer
- if err = p.Write(&buf); err != nil {
- return 0, err
+}
+
+func (f *mockFile) Read(p []byte) (n int, err error) {
+ for i := 0; i < 1; i++ {
+ n, err = f.r.Read(p)
+ if err == io.EOF {
+ fmt.Printf("Read sleep %#v second\n", i)
+ time.Sleep(1 * time.Second)
+ } else {
+ fmt.Printf("Read=%#v err %#v\n", n, err)
+ return
+ }
}
- copy(b, buf.Bytes())
- n = buf.Len()
- f.t.Logf("Read %d bytes", n)
- return n, nil
+ return 0, io.EOF
}
func (f *mockFile) Write(b []byte) (n int, err error) {
- buf := bytes.NewBuffer(b)
- if _, err := ReadPacket(buf); err != nil {
- return 0, err
- }
- n = len(b)
- f.t.Logf("Write %d bytes", n)
- return n, nil
+ n, err = f.w.Write(b)
+ fmt.Printf("Write=%#v err %#v\n", n, err)
+ return
}
func (f *mockFile) Close() error {
@@ -44,17 +52,46 @@ func (f *mockFile) Close() error {
}
func TestXenStore(t *testing.T) {
- xs, err := newXenstore(0, &mockFile{t})
+ xs, err := newXenstore(0, NewMockFile(t))
if err != nil {
t.Errorf("newXenstore error: %#v\n", err)
}
defer xs.Close()
+ if err := xs.Write("foo", "bar"); err != nil {
+ t.Errorf("xs.Write error: %#v\n", err)
+ }
+
if _, err := xs.Read("foo"); err != nil {
t.Errorf("xs.Read error: %#v\n", err)
}
+}
- if err := xs.Write("foo", "bar"); err != nil {
- t.Errorf("xs.Read error: %#v\n", err)
+func TestXenStoreWatch2(t *testing.T) {
+ xs, err := newXenstore(0, NewMockFile(t))
+ if err != nil {
+ t.Errorf("newXenstore error: %#v\n", err)
}
+ defer xs.Close()
+
+ go func() {
+ time.Sleep(5 * time.Second)
+ if err := xs.StopWatch(); err != nil {
+ t.Errorf("xs.StopWatch error: %#v\n", err)
+ }
+ }()
+
+ go func() {
+ for i := 0; i < 5; i++ {
+ xs.Write("foo", "bar")
+ time.Sleep(1 * time.Second)
+ }
+ }()
+
+ err = xs.Watch("foo", "test")
+ if err != nil {
+ t.Errorf("xs.Watch(\"foo\") error: %#v\n", err)
+ }
+
+ time.Sleep(6 * time.Second)
}