diff options
author | Cheng Zhang <cheng.zhang@citrix.com> | 2016-04-21 11:23:19 +0800 |
---|---|---|
committer | Cheng Zhang <cheng.zhang@citrix.com> | 2016-04-21 13:51:01 +0800 |
commit | 1b8951f35294cc6d3091dafa2643abbf7d4b0feb (patch) | |
tree | a9ae6d5f3e47cb94ead673ddd8ec768a23de3e34 /xenstoreclient/xenstore.go | |
parent | 2e0e2cbdb9022b9b2bd2555ee12eec0d1abf8667 (diff) | |
download | vyos-xe-guest-utilities-1b8951f35294cc6d3091dafa2643abbf7d4b0feb.tar.gz vyos-xe-guest-utilities-1b8951f35294cc6d3091dafa2643abbf7d4b0feb.zip |
CP-16739: Make watch in status
Signed-off-by: Cheng Zhang <cheng.zhang@citrix.com>
Diffstat (limited to 'xenstoreclient/xenstore.go')
-rw-r--r-- | xenstoreclient/xenstore.go | 232 |
1 files changed, 176 insertions, 56 deletions
diff --git a/xenstoreclient/xenstore.go b/xenstoreclient/xenstore.go index 0d3e2bb..cc30828 100644 --- a/xenstoreclient/xenstore.go +++ b/xenstoreclient/xenstore.go @@ -1,12 +1,14 @@ package xenstoreclient import ( + syslog "../syslog" "bufio" "bytes" "encoding/binary" "errors" "fmt" "io" + "log" "os" "strconv" "strings" @@ -26,18 +28,27 @@ const ( type Operation uint32 const ( - XS_READ Operation = 2 - XS_GET_PERMS Operation = 3 - XS_WATCH Operation = 4 - XS_UNWATCH Operation = 5 - XS_TRANSACTION_START Operation = 6 - XS_TRANSACTION_END Operation = 7 - XS_WRITE Operation = 11 - XS_MKDIR Operation = 12 - XS_RM Operation = 13 - XS_SET_PERMS Operation = 14 - XS_WATCH_EVENT Operation = 15 - XS_ERROR Operation = 16 + XS_DEBUG Operation = 0 + XS_DIRECTORY Operation = 1 + XS_READ Operation = 2 + XS_GET_PERMS Operation = 3 + XS_WATCH Operation = 4 + XS_UNWATCH Operation = 5 + XS_TRANSACTION_START Operation = 6 + XS_TRANSACTION_END Operation = 7 + XS_INTRODUCE Operation = 8 + XS_RELEASE Operation = 9 + XS_GET_DOMAIN_PATH Operation = 10 + XS_WRITE Operation = 11 + XS_MKDIR Operation = 12 + XS_RM Operation = 13 + XS_SET_PERMS Operation = 14 + XS_WATCH_EVENT Operation = 15 + XS_ERROR Operation = 16 + XS_IS_DOMAIN_INTRODUCED Operation = 17 + XS_RESUME Operation = 18 + XS_SET_TARGET Operation = 19 + XS_RESTRICT Operation = 128 ) type Packet struct { @@ -48,11 +59,6 @@ type Packet struct { Value []byte } -type Event struct { - Token string - Data []byte -} - type XenStoreClient interface { Close() error DO(packet *Packet) (*Packet, error) @@ -61,7 +67,9 @@ type XenStoreClient interface { Rm(path string) error Write(path string, value string) error GetPermission(path string) (map[int]Permission, error) - Watch(path string) (<-chan Event, error) + Watch(path string, token string) error + WatchEvent(key string) (token string, ok bool) + UnWatch(path string, token string) error StopWatch() error } @@ -135,16 +143,55 @@ func (p *Packet) Write(w io.Writer) (err error) { return nil } +type WatchQueueManager struct { + watchQueues map[string]chan string + rwlocker *sync.RWMutex +} + +func (wq *WatchQueueManager) RemoveByKey(key string) { + wq.rwlocker.Lock() + defer wq.rwlocker.Unlock() + delete(wq.watchQueues, key) + return +} + +func (wq *WatchQueueManager) SetEventByKey(key string, token string) (ok bool) { + wq.rwlocker.RLock() + defer wq.rwlocker.RUnlock() + wq.watchQueues[key] <- token + return +} + +func (wq *WatchQueueManager) GetEventByKey(key string) (token string, ok bool) { + wq.rwlocker.RLock() + defer wq.rwlocker.RUnlock() + ec, ok := wq.watchQueues[key] + if len(ec) != 0 { + return <-ec, ok + } else { + ok = false + } + return +} + +func (wq *WatchQueueManager) AddChanByKey(key string) { + wq.rwlocker.Lock() + defer wq.rwlocker.Unlock() + wq.watchQueues[key] = make(chan string, 100) +} + type XenStore struct { - tx uint32 - xbFile io.ReadWriteCloser - xbFileReader *bufio.Reader - muWatch *sync.Mutex - onceWatch *sync.Once - watchQueues map[string]chan Event - watchStopChan chan struct{} - watchStoppedChan chan struct{} - nonWatchQueue chan []byte + tx uint32 + xbFile io.ReadWriteCloser + xbFileReader *bufio.Reader + muWatch *sync.Mutex + onceWatch *sync.Once + watchQueue WatchQueueManager + watchStopChan chan struct{} + watchStoppedChan chan struct{} + nonWatchQueue chan []byte + xbFileReaderLocker *sync.Mutex + logger *log.Logger } func NewXenstore(tx uint32) (XenStoreClient, error) { @@ -160,17 +207,38 @@ func NewXenstore(tx uint32) (XenStoreClient, error) { return newXenstore(tx, xbFile) } +const ( + LoggerName string = "xenstore" +) + func newXenstore(tx uint32, rwc io.ReadWriteCloser) (XenStoreClient, error) { + var loggerWriter io.Writer = os.Stderr + var topic string = LoggerName + if w, err := syslog.NewSyslogWriter(topic); err == nil { + loggerWriter = w + topic = "" + } else { + fmt.Fprintf(os.Stderr, "NewSyslogWriter(%s) error: %s, use stderr logging\n", topic, err) + topic = LoggerName + ": " + } + + logger := log.New(loggerWriter, topic, 0) + return &XenStore{ - tx: tx, - xbFile: rwc, - xbFileReader: bufio.NewReader(rwc), - watchQueues: make(map[string]chan Event, 0), - nonWatchQueue: nil, - watchStopChan: make(chan struct{}, 1), - watchStoppedChan: make(chan struct{}, 1), - onceWatch: &sync.Once{}, - muWatch: &sync.Mutex{}, + tx: tx, + xbFile: rwc, + xbFileReader: bufio.NewReader(rwc), + watchQueue: WatchQueueManager{ + watchQueues: make(map[string]chan string, 0), + rwlocker: &sync.RWMutex{}, + }, + nonWatchQueue: nil, + watchStopChan: make(chan struct{}, 1), + watchStoppedChan: make(chan struct{}, 1), + onceWatch: &sync.Once{}, + muWatch: &sync.Mutex{}, + xbFileReaderLocker: &sync.Mutex{}, + logger: logger, }, nil } @@ -179,6 +247,8 @@ func (xs *XenStore) Close() error { } func (xs *XenStore) DO(req *Packet) (resp *Packet, err error) { + xs.xbFileReaderLocker.Lock() + defer xs.xbFileReaderLocker.Unlock() err = req.Write(xs.xbFile) if err != nil { return nil, err @@ -191,7 +261,6 @@ func (xs *XenStore) DO(req *Packet) (resp *Packet, err error) { } else { r = xs.xbFileReader } - resp, err = ReadPacket(r) return resp, err } @@ -288,32 +357,61 @@ func (xs *XenStore) GetPermission(path string) (map[int]Permission, error) { return perm, nil } -func (xs *XenStore) Watch(path string) (<-chan Event, error) { - watcher := func() { +func (xs *XenStore) UnWatch(path string, token string) (err error) { + v := []byte(path + "\x00" + token + "\x00") + req := &Packet{ + OpCode: XS_UNWATCH, + Req: 0, + TxID: xs.tx, + Length: uint32(len(v)), + Value: v, + } + _, err = xs.DO(req) + if err != nil { + return + } + xs.watchQueue.RemoveByKey(path) + return nil +} +func (xs *XenStore) Watch(path string, token string) error { + watcher := func() { + logger.Printf("Watch: Start") type XSData struct { *Packet Error error } xsDataChan := make(chan XSData, 100) - go func(r io.Reader, out chan<- XSData) { + xsReadStop := make(chan bool) + go func(r io.Reader, out chan<- XSData, c <-chan bool) { for { + // The read will return at once if no data in r p, err := ReadPacket(r) + ticker := time.Tick(1 * time.Second) + if err != nil { + select { + case <-c: + return + case <-ticker: + continue + } + } out <- XSData{Packet: p, Error: err} } - }(xs.xbFileReader, xsDataChan) + }(xs.xbFileReader, xsDataChan, xsReadStop) xs.nonWatchQueue = make(chan []byte, 100) for { select { case <-xs.watchStopChan: - fmt.Printf("watch receive stop signal, quit.") - xs.watchStopChan <- struct{}{} + logger.Printf("Watch: receive stop signal, quit.\n") + xs.watchStoppedChan <- struct{}{} + xsReadStop <- true return case xsdata := <-xsDataChan: if xsdata.Error != nil { - fmt.Printf("watch receive error: %#v", xsdata.Error) + logger.Printf("Watch: receive error: %#v", xsdata.Error) return } switch xsdata.Packet.OpCode { @@ -321,11 +419,10 @@ func (xs *XenStore) Watch(path string) (<-chan Event, error) { parts := strings.SplitN(string(xsdata.Value), "\x00", 2) path := parts[0] token := parts[1] - data := []byte(parts[2]) - if c, ok := xs.watchQueues[path]; ok { - c <- Event{token, data} - } + logger.Printf("Get XS_WATCH_EVENT key:%s, token:%s\n", path, token) + xs.watchQueue.SetEventByKey(path, token) default: + logger.Printf("Get non watch event %#v\n", xsdata.Packet.OpCode) var b bytes.Buffer xsdata.Packet.Write(&b) xs.nonWatchQueue <- b.Bytes() @@ -333,19 +430,34 @@ func (xs *XenStore) Watch(path string) (<-chan Event, error) { } } } - xs.onceWatch.Do(watcher) xs.muWatch.Lock() defer xs.muWatch.Unlock() - if _, ok := xs.watchQueues[path]; !ok { - xs.watchQueues[path] = make(chan Event, 100) + v := []byte(path + "\x00" + token + "\x00") + req := &Packet{ + OpCode: XS_WATCH, + Req: 0, + TxID: xs.tx, + Length: uint32(len(v)), + Value: v, } - return xs.watchQueues[path], nil + _, err := xs.DO(req) + if err != nil { + logger.Errorf("Watch failed with error %#v\n", err) + return err + } + xs.watchQueue.AddChanByKey(path) + go xs.onceWatch.Do(watcher) + return nil +} + +func (xs *XenStore) WatchEvent(key string) (token string, ok bool) { + return xs.watchQueue.GetEventByKey(key) } func (xs *XenStore) StopWatch() error { xs.watchStopChan <- struct{}{} - <-xs.watchStoppedChan xs.nonWatchQueue = nil + <-xs.watchStoppedChan return nil } @@ -405,8 +517,16 @@ func (xs *CachedXenStore) GetPermission(path string) (map[int]Permission, error) return xs.xs.GetPermission(path) } -func (xs *CachedXenStore) Watch(path string) (<-chan Event, error) { - return xs.xs.Watch(path) +func (xs *CachedXenStore) Watch(path string, token string) error { + return xs.xs.Watch(path, token) +} + +func (xs *CachedXenStore) WatchEvent(key string) (token string, ok bool) { + return xs.xs.WatchEvent(key) +} + +func (xs *CachedXenStore) UnWatch(path string, token string) error { + return xs.xs.UnWatch(path, token) } func (xs *CachedXenStore) StopWatch() error { @@ -429,5 +549,5 @@ func getDevPath() (devPath string, err error) { return devPath, err } } - return "", fmt.Errorf("Cannot locate xenbus dev path in %v", devPaths) + return "", logger.Errorf("Cannot locate xenbus dev path in %v", devPaths) } |