1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

# This file is part of Buildbot.  Buildbot is free software: you can 

# redistribute it and/or modify it under the terms of the GNU General Public 

# License as published by the Free Software Foundation, version 2. 

# 

# This program is distributed in the hope that it will be useful, but WITHOUT 

# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 

# FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more 

# details. 

# 

# You should have received a copy of the GNU General Public License along with 

# this program; if not, write to the Free Software Foundation, Inc., 51 

# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 

# 

# Copyright Buildbot Team Members 

 

from weakref import WeakValueDictionary 

from itertools import ifilterfalse 

from twisted.python import log 

from twisted.internet import defer 

from collections import deque 

from collections import defaultdict 

 

 

class LRUCache(object): 

    """ 

    A least-recently-used cache, with a fixed maximum size. 

 

    See buildbot manual for more information. 

    """ 

 

    __slots__ = ('max_size max_queue miss_fn queue cache weakrefs ' 

                 'refcount hits refhits misses'.split()) 

    sentinel = object() 

    QUEUE_SIZE_FACTOR = 10 

 

    def __init__(self, miss_fn, max_size=50): 

        self.max_size = max_size 

        self.max_queue = max_size * self.QUEUE_SIZE_FACTOR 

        self.queue = deque() 

        self.cache = {} 

        self.weakrefs = WeakValueDictionary() 

        self.hits = self.misses = self.refhits = 0 

        self.refcount = defaultdict(lambda : 0) 

        self.miss_fn = miss_fn 

 

    def put(self, key, value): 

        if key in self.cache: 

            self.cache[key] = value 

            self.weakrefs[key] = value 

        elif key in self.weakrefs: 

            self.weakrefs[key] = value 

 

    def get(self, key, **miss_fn_kwargs): 

        try: 

            return self._get_hit(key) 

        except KeyError: 

            pass 

 

        self.misses += 1 

 

        result = self.miss_fn(key, **miss_fn_kwargs) 

        if result is not None: 

            self.cache[key] = result 

            self.weakrefs[key] = result 

            self._ref_key(key) 

            self._purge() 

 

        return result 

 

    def keys(self): 

        return self.cache.keys() 

 

    def set_max_size(self, max_size): 

        if self.max_size == max_size: 

            return 

 

        self.max_size = max_size 

        self.max_queue = max_size * self.QUEUE_SIZE_FACTOR 

        self._purge() 

 

    def inv(self): 

        global inv_failed 

 

        # the keys of the queue and cache should be identical 

        cache_keys = set(self.cache.keys()) 

        queue_keys = set(self.queue) 

        if queue_keys - cache_keys: 

            log.msg("INV: uncached keys in queue:", queue_keys - cache_keys) 

            inv_failed = True 

        if cache_keys - queue_keys: 

            log.msg("INV: unqueued keys in cache:", cache_keys - queue_keys) 

            inv_failed = True 

 

        # refcount should always represent the number of times each key appears 

        # in the queue 

        exp_refcount = dict() 

        for k in self.queue: 

            exp_refcount[k] = exp_refcount.get(k, 0) + 1 

        if exp_refcount != self.refcount: 

            log.msg("INV: refcounts differ:") 

            log.msg(" expected:", sorted(exp_refcount.items())) 

            log.msg("      got:", sorted(self.refcount.items())) 

            inv_failed = True 

 

    def _ref_key(self, key): 

        """Record a reference to the argument key.""" 

        queue = self.queue 

        refcount = self.refcount 

 

        queue.append(key) 

        refcount[key] = refcount[key] + 1 

 

        # periodically compact the queue by eliminating duplicate keys 

        # while preserving order of most recent access.  Note that this 

        # is only required when the cache does not exceed its maximum 

        # size 

        if len(queue) > self.max_queue: 

            refcount.clear() 

            queue_appendleft = queue.appendleft 

            queue_appendleft(self.sentinel) 

            for k in ifilterfalse(refcount.__contains__, 

                                    iter(queue.pop, self.sentinel)): 

                queue_appendleft(k) 

                refcount[k] = 1 

 

    def _get_hit(self, key): 

        """Try to do a value lookup from the existing cache entries.""" 

        try: 

            result = self.cache[key] 

            self.hits += 1 

            self._ref_key(key) 

            return result 

        except KeyError: 

            pass 

 

        result = self.weakrefs[key] 

        self.refhits += 1 

        self.cache[key] = result 

        self._ref_key(key) 

        return result 

 

    def _purge(self): 

        """ 

        Trim the cache down to max_size by evicting the 

        least-recently-used entries. 

        """ 

        if len(self.cache) <= self.max_size: 

            return 

 

        cache = self.cache 

        refcount = self.refcount 

        queue = self.queue 

        max_size = self.max_size 

 

        # purge least recently used entries, using refcount to count entries 

        # that appear multiple times in the queue 

        while len(cache) > max_size: 

            refc = 1 

            while refc: 

                k = queue.popleft() 

                refc = refcount[k] = refcount[k] - 1 

            del cache[k] 

            del refcount[k] 

 

 

class AsyncLRUCache(LRUCache): 

    """ 

    An LRU cache with asynchronous locking to ensure that in the common case of 

    multiple concurrent requests for the same key, only one fetch is performed. 

    """ 

 

    __slots__ = ['concurrent'] 

 

    def __init__(self, miss_fn, max_size=50): 

        LRUCache.__init__(self, miss_fn, max_size=max_size) 

        self.concurrent = {} 

 

    def get(self, key, **miss_fn_kwargs): 

        try: 

            result = self._get_hit(key) 

            return defer.succeed(result) 

        except KeyError: 

            pass 

 

        concurrent = self.concurrent 

        conc = concurrent.get(key) 

        if conc: 

            self.hits += 1 

            d = defer.Deferred() 

            conc.append(d) 

            return d 

 

        # if we're here, we've missed and need to fetch 

        self.misses += 1 

 

        # create a list of waiting deferreds for this key 

        d = defer.Deferred() 

        assert key not in concurrent 

        concurrent[key] = [ d ] 

 

        miss_d = self.miss_fn(key, **miss_fn_kwargs) 

 

        def handle_result(result): 

            if result is not None: 

                self.cache[key] = result 

                self.weakrefs[key] = result 

 

                # reference the key once, possibly standing in for multiple 

                # concurrent accesses 

                self._ref_key(key) 

 

                self._purge() 

 

            # and fire all of the waiting Deferreds 

            dlist = concurrent.pop(key) 

            for d in dlist: 

                d.callback(result) 

 

        def handle_failure(f): 

            # errback all of the waiting Deferreds 

            dlist = concurrent.pop(key) 

            for d in dlist: 

                d.errback(f) 

 

        miss_d.addCallbacks(handle_result, handle_failure) 

        miss_d.addErrback(log.err) 

 

        return d 

 

 

# for tests 

inv_failed = False