diff options
Diffstat (limited to 'xenstoreclient')
-rw-r--r-- | xenstoreclient/xenstore.go | 232 | ||||
-rw-r--r-- | xenstoreclient/xenstore_test.go | 89 |
2 files changed, 239 insertions, 82 deletions
diff --git a/xenstoreclient/xenstore.go b/xenstoreclient/xenstore.go index 0d3e2bb..cc30828 100644 --- a/xenstoreclient/xenstore.go +++ b/xenstoreclient/xenstore.go @@ -1,12 +1,14 @@ package xenstoreclient import ( + syslog "../syslog" "bufio" "bytes" "encoding/binary" "errors" "fmt" "io" + "log" "os" "strconv" "strings" @@ -26,18 +28,27 @@ const ( type Operation uint32 const ( - XS_READ Operation = 2 - XS_GET_PERMS Operation = 3 - XS_WATCH Operation = 4 - XS_UNWATCH Operation = 5 - XS_TRANSACTION_START Operation = 6 - XS_TRANSACTION_END Operation = 7 - XS_WRITE Operation = 11 - XS_MKDIR Operation = 12 - XS_RM Operation = 13 - XS_SET_PERMS Operation = 14 - XS_WATCH_EVENT Operation = 15 - XS_ERROR Operation = 16 + XS_DEBUG Operation = 0 + XS_DIRECTORY Operation = 1 + XS_READ Operation = 2 + XS_GET_PERMS Operation = 3 + XS_WATCH Operation = 4 + XS_UNWATCH Operation = 5 + XS_TRANSACTION_START Operation = 6 + XS_TRANSACTION_END Operation = 7 + XS_INTRODUCE Operation = 8 + XS_RELEASE Operation = 9 + XS_GET_DOMAIN_PATH Operation = 10 + XS_WRITE Operation = 11 + XS_MKDIR Operation = 12 + XS_RM Operation = 13 + XS_SET_PERMS Operation = 14 + XS_WATCH_EVENT Operation = 15 + XS_ERROR Operation = 16 + XS_IS_DOMAIN_INTRODUCED Operation = 17 + XS_RESUME Operation = 18 + XS_SET_TARGET Operation = 19 + XS_RESTRICT Operation = 128 ) type Packet struct { @@ -48,11 +59,6 @@ type Packet struct { Value []byte } -type Event struct { - Token string - Data []byte -} - type XenStoreClient interface { Close() error DO(packet *Packet) (*Packet, error) @@ -61,7 +67,9 @@ type XenStoreClient interface { Rm(path string) error Write(path string, value string) error GetPermission(path string) (map[int]Permission, error) - Watch(path string) (<-chan Event, error) + Watch(path string, token string) error + WatchEvent(key string) (token string, ok bool) + UnWatch(path string, token string) error StopWatch() error } @@ -135,16 +143,55 @@ func (p *Packet) Write(w io.Writer) (err error) { return nil } +type WatchQueueManager struct { + watchQueues map[string]chan string + rwlocker *sync.RWMutex +} + +func (wq *WatchQueueManager) RemoveByKey(key string) { + wq.rwlocker.Lock() + defer wq.rwlocker.Unlock() + delete(wq.watchQueues, key) + return +} + +func (wq *WatchQueueManager) SetEventByKey(key string, token string) (ok bool) { + wq.rwlocker.RLock() + defer wq.rwlocker.RUnlock() + wq.watchQueues[key] <- token + return +} + +func (wq *WatchQueueManager) GetEventByKey(key string) (token string, ok bool) { + wq.rwlocker.RLock() + defer wq.rwlocker.RUnlock() + ec, ok := wq.watchQueues[key] + if len(ec) != 0 { + return <-ec, ok + } else { + ok = false + } + return +} + +func (wq *WatchQueueManager) AddChanByKey(key string) { + wq.rwlocker.Lock() + defer wq.rwlocker.Unlock() + wq.watchQueues[key] = make(chan string, 100) +} + type XenStore struct { - tx uint32 - xbFile io.ReadWriteCloser - xbFileReader *bufio.Reader - muWatch *sync.Mutex - onceWatch *sync.Once - watchQueues map[string]chan Event - watchStopChan chan struct{} - watchStoppedChan chan struct{} - nonWatchQueue chan []byte + tx uint32 + xbFile io.ReadWriteCloser + xbFileReader *bufio.Reader + muWatch *sync.Mutex + onceWatch *sync.Once + watchQueue WatchQueueManager + watchStopChan chan struct{} + watchStoppedChan chan struct{} + nonWatchQueue chan []byte + xbFileReaderLocker *sync.Mutex + logger *log.Logger } func NewXenstore(tx uint32) (XenStoreClient, error) { @@ -160,17 +207,38 @@ func NewXenstore(tx uint32) (XenStoreClient, error) { return newXenstore(tx, xbFile) } +const ( + LoggerName string = "xenstore" +) + func newXenstore(tx uint32, rwc io.ReadWriteCloser) (XenStoreClient, error) { + var loggerWriter io.Writer = os.Stderr + var topic string = LoggerName + if w, err := syslog.NewSyslogWriter(topic); err == nil { + loggerWriter = w + topic = "" + } else { + fmt.Fprintf(os.Stderr, "NewSyslogWriter(%s) error: %s, use stderr logging\n", topic, err) + topic = LoggerName + ": " + } + + logger := log.New(loggerWriter, topic, 0) + return &XenStore{ - tx: tx, - xbFile: rwc, - xbFileReader: bufio.NewReader(rwc), - watchQueues: make(map[string]chan Event, 0), - nonWatchQueue: nil, - watchStopChan: make(chan struct{}, 1), - watchStoppedChan: make(chan struct{}, 1), - onceWatch: &sync.Once{}, - muWatch: &sync.Mutex{}, + tx: tx, + xbFile: rwc, + xbFileReader: bufio.NewReader(rwc), + watchQueue: WatchQueueManager{ + watchQueues: make(map[string]chan string, 0), + rwlocker: &sync.RWMutex{}, + }, + nonWatchQueue: nil, + watchStopChan: make(chan struct{}, 1), + watchStoppedChan: make(chan struct{}, 1), + onceWatch: &sync.Once{}, + muWatch: &sync.Mutex{}, + xbFileReaderLocker: &sync.Mutex{}, + logger: logger, }, nil } @@ -179,6 +247,8 @@ func (xs *XenStore) Close() error { } func (xs *XenStore) DO(req *Packet) (resp *Packet, err error) { + xs.xbFileReaderLocker.Lock() + defer xs.xbFileReaderLocker.Unlock() err = req.Write(xs.xbFile) if err != nil { return nil, err @@ -191,7 +261,6 @@ func (xs *XenStore) DO(req *Packet) (resp *Packet, err error) { } else { r = xs.xbFileReader } - resp, err = ReadPacket(r) return resp, err } @@ -288,32 +357,61 @@ func (xs *XenStore) GetPermission(path string) (map[int]Permission, error) { return perm, nil } -func (xs *XenStore) Watch(path string) (<-chan Event, error) { - watcher := func() { +func (xs *XenStore) UnWatch(path string, token string) (err error) { + v := []byte(path + "\x00" + token + "\x00") + req := &Packet{ + OpCode: XS_UNWATCH, + Req: 0, + TxID: xs.tx, + Length: uint32(len(v)), + Value: v, + } + _, err = xs.DO(req) + if err != nil { + return + } + xs.watchQueue.RemoveByKey(path) + return nil +} +func (xs *XenStore) Watch(path string, token string) error { + watcher := func() { + logger.Printf("Watch: Start") type XSData struct { *Packet Error error } xsDataChan := make(chan XSData, 100) - go func(r io.Reader, out chan<- XSData) { + xsReadStop := make(chan bool) + go func(r io.Reader, out chan<- XSData, c <-chan bool) { for { + // The read will return at once if no data in r p, err := ReadPacket(r) + ticker := time.Tick(1 * time.Second) + if err != nil { + select { + case <-c: + return + case <-ticker: + continue + } + } out <- XSData{Packet: p, Error: err} } - }(xs.xbFileReader, xsDataChan) + }(xs.xbFileReader, xsDataChan, xsReadStop) xs.nonWatchQueue = make(chan []byte, 100) for { select { case <-xs.watchStopChan: - fmt.Printf("watch receive stop signal, quit.") - xs.watchStopChan <- struct{}{} + logger.Printf("Watch: receive stop signal, quit.\n") + xs.watchStoppedChan <- struct{}{} + xsReadStop <- true return case xsdata := <-xsDataChan: if xsdata.Error != nil { - fmt.Printf("watch receive error: %#v", xsdata.Error) + logger.Printf("Watch: receive error: %#v", xsdata.Error) return } switch xsdata.Packet.OpCode { @@ -321,11 +419,10 @@ func (xs *XenStore) Watch(path string) (<-chan Event, error) { parts := strings.SplitN(string(xsdata.Value), "\x00", 2) path := parts[0] token := parts[1] - data := []byte(parts[2]) - if c, ok := xs.watchQueues[path]; ok { - c <- Event{token, data} - } + logger.Printf("Get XS_WATCH_EVENT key:%s, token:%s\n", path, token) + xs.watchQueue.SetEventByKey(path, token) default: + logger.Printf("Get non watch event %#v\n", xsdata.Packet.OpCode) var b bytes.Buffer xsdata.Packet.Write(&b) xs.nonWatchQueue <- b.Bytes() @@ -333,19 +430,34 @@ func (xs *XenStore) Watch(path string) (<-chan Event, error) { } } } - xs.onceWatch.Do(watcher) xs.muWatch.Lock() defer xs.muWatch.Unlock() - if _, ok := xs.watchQueues[path]; !ok { - xs.watchQueues[path] = make(chan Event, 100) + v := []byte(path + "\x00" + token + "\x00") + req := &Packet{ + OpCode: XS_WATCH, + Req: 0, + TxID: xs.tx, + Length: uint32(len(v)), + Value: v, } - return xs.watchQueues[path], nil + _, err := xs.DO(req) + if err != nil { + logger.Errorf("Watch failed with error %#v\n", err) + return err + } + xs.watchQueue.AddChanByKey(path) + go xs.onceWatch.Do(watcher) + return nil +} + +func (xs *XenStore) WatchEvent(key string) (token string, ok bool) { + return xs.watchQueue.GetEventByKey(key) } func (xs *XenStore) StopWatch() error { xs.watchStopChan <- struct{}{} - <-xs.watchStoppedChan xs.nonWatchQueue = nil + <-xs.watchStoppedChan return nil } @@ -405,8 +517,16 @@ func (xs *CachedXenStore) GetPermission(path string) (map[int]Permission, error) return xs.xs.GetPermission(path) } -func (xs *CachedXenStore) Watch(path string) (<-chan Event, error) { - return xs.xs.Watch(path) +func (xs *CachedXenStore) Watch(path string, token string) error { + return xs.xs.Watch(path, token) +} + +func (xs *CachedXenStore) WatchEvent(key string) (token string, ok bool) { + return xs.xs.WatchEvent(key) +} + +func (xs *CachedXenStore) UnWatch(path string, token string) error { + return xs.xs.UnWatch(path, token) } func (xs *CachedXenStore) StopWatch() error { @@ -429,5 +549,5 @@ func getDevPath() (devPath string, err error) { return devPath, err } } - return "", fmt.Errorf("Cannot locate xenbus dev path in %v", devPaths) + return "", logger.Errorf("Cannot locate xenbus dev path in %v", devPaths) } diff --git a/xenstoreclient/xenstore_test.go b/xenstoreclient/xenstore_test.go index 1259447..6335e6b 100644 --- a/xenstoreclient/xenstore_test.go +++ b/xenstoreclient/xenstore_test.go @@ -2,40 +2,48 @@ package xenstoreclient import ( "bytes" + "fmt" + "io" "testing" + "time" ) type mockFile struct { - t *testing.T + r io.Reader + w io.Writer + watchKeys map[string]struct{} + t *testing.T } -func (f *mockFile) Read(b []byte) (n int, err error) { - value := "i am value" - p := &Packet{ - OpCode: XS_READ, - Req: 0, - TxID: 0, - Length: uint32(len(value)), - Value: []byte(value), +func NewMockFile(t *testing.T) io.ReadWriteCloser { + var b bytes.Buffer + + return &mockFile{ + r: &b, + w: &b, + t: t, + watchKeys: make(map[string]struct{}), } - var buf bytes.Buffer - if err = p.Write(&buf); err != nil { - return 0, err +} + +func (f *mockFile) Read(p []byte) (n int, err error) { + for i := 0; i < 1; i++ { + n, err = f.r.Read(p) + if err == io.EOF { + fmt.Printf("Read sleep %#v second\n", i) + time.Sleep(1 * time.Second) + } else { + fmt.Printf("Read=%#v err %#v\n", n, err) + return + } } - copy(b, buf.Bytes()) - n = buf.Len() - f.t.Logf("Read %d bytes", n) - return n, nil + return 0, io.EOF } func (f *mockFile) Write(b []byte) (n int, err error) { - buf := bytes.NewBuffer(b) - if _, err := ReadPacket(buf); err != nil { - return 0, err - } - n = len(b) - f.t.Logf("Write %d bytes", n) - return n, nil + n, err = f.w.Write(b) + fmt.Printf("Write=%#v err %#v\n", n, err) + return } func (f *mockFile) Close() error { @@ -44,17 +52,46 @@ func (f *mockFile) Close() error { } func TestXenStore(t *testing.T) { - xs, err := newXenstore(0, &mockFile{t}) + xs, err := newXenstore(0, NewMockFile(t)) if err != nil { t.Errorf("newXenstore error: %#v\n", err) } defer xs.Close() + if err := xs.Write("foo", "bar"); err != nil { + t.Errorf("xs.Write error: %#v\n", err) + } + if _, err := xs.Read("foo"); err != nil { t.Errorf("xs.Read error: %#v\n", err) } +} - if err := xs.Write("foo", "bar"); err != nil { - t.Errorf("xs.Read error: %#v\n", err) +func TestXenStoreWatch2(t *testing.T) { + xs, err := newXenstore(0, NewMockFile(t)) + if err != nil { + t.Errorf("newXenstore error: %#v\n", err) } + defer xs.Close() + + go func() { + time.Sleep(5 * time.Second) + if err := xs.StopWatch(); err != nil { + t.Errorf("xs.StopWatch error: %#v\n", err) + } + }() + + go func() { + for i := 0; i < 5; i++ { + xs.Write("foo", "bar") + time.Sleep(1 * time.Second) + } + }() + + err = xs.Watch("foo", "test") + if err != nil { + t.Errorf("xs.Watch(\"foo\") error: %#v\n", err) + } + + time.Sleep(6 * time.Second) } |