1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

304

305

306

307

308

309

310

311

312

313

314

315

316

317

318

319

320

321

322

323

324

325

326

327

328

329

330

331

332

333

334

335

336

337

338

339

340

341

342

343

344

345

346

347

348

349

350

351

352

353

354

355

356

357

358

359

360

361

362

363

364

365

366

367

368

369

370

371

372

373

374

375

376

377

378

379

380

381

382

# This file is part of Buildbot.  Buildbot is free software: you can 

# redistribute it and/or modify it under the terms of the GNU General Public 

# License as published by the Free Software Foundation, version 2. 

# 

# This program is distributed in the hope that it will be useful, but WITHOUT 

# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 

# FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more 

# details. 

# 

# You should have received a copy of the GNU General Public License along with 

# this program; if not, write to the Free Software Foundation, Inc., 51 

# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 

# 

# Copyright Buildbot Team Members 

 

from __future__ import with_statement 

 

 

from collections import deque 

import os 

import cPickle as pickle 

 

from zope.interface import implements, Interface 

 

 

def ReadFile(path): 

    with open(path, 'rb') as f: 

        return f.read() 

 

 

def WriteFile(path, buf): 

    with open(path, 'wb') as f: 

        f.write(buf) 

 

 

class IQueue(Interface): 

    """Abstraction of a queue.""" 

    def pushItem(item): 

        """Adds an individual item to the end of the queue. 

 

        Returns an item if it was overflowed.""" 

 

    def insertBackChunk(items): 

        """Adds a list of items as the oldest entries. 

 

        Normally called in case of failure to process the data, queue the data 

        back so it can be retrieved at a later time. 

 

        Returns a list of items if it was overflowed.""" 

 

    def popChunk(nbItems=None): 

        """Pop many items at once. Defaults to self.maxItems().""" 

 

    def save(): 

        """Save the queue to storage if implemented.""" 

 

    def items(): 

        """Returns items in the queue. 

 

        Warning: Can be extremely slow for queue on disk.""" 

 

    def nbItems(): 

        """Returns the number of items in the queue.""" 

 

    def maxItems(): 

        """Returns the maximum number of items this queue can hold.""" 

 

 

class MemoryQueue(object): 

    """Simple length bounded queue using deque. 

 

    list.pop(0) operation is O(n) so for a 10000 items list, it can start to 

    be real slow. On the contrary, deque.popleft() is O(1) most of the time. 

    See http://docs.python.org/library/collections.html for more 

    information. 

    """ 

    implements(IQueue) 

 

    def __init__(self, maxItems=None): 

        self._maxItems = maxItems 

        if self._maxItems is None: 

            self._maxItems = 10000 

        self._items = deque() 

 

    def pushItem(self, item): 

        ret = None 

        if len(self._items) == self._maxItems: 

            ret = self._items.popleft() 

        self._items.append(item) 

        return ret 

 

    def insertBackChunk(self, chunk): 

        ret = None 

        excess = len(self._items) + len(chunk) - self._maxItems 

        if excess > 0: 

            ret = chunk[0:excess] 

            chunk = chunk[excess:] 

        self._items.extendleft(reversed(chunk)) 

        return ret 

 

    def popChunk(self, nbItems=None): 

        if nbItems is None: 

            nbItems = self._maxItems 

        if nbItems > len(self._items): 

            items = list(self._items) 

            self._items = deque() 

        else: 

            items = [] 

            for i in range(nbItems): 

                items.append(self._items.popleft()) 

        return items 

 

    def save(self): 

        pass 

 

    def items(self): 

        return list(self._items) 

 

    def nbItems(self): 

        return len(self._items) 

 

    def maxItems(self): 

        return self._maxItems 

 

 

class DiskQueue(object): 

    """Keeps a list of abstract items and serializes it to the disk. 

 

    Use pickle for serialization.""" 

    implements(IQueue) 

 

    def __init__(self, path, maxItems=None, pickleFn=pickle.dumps, 

                 unpickleFn=pickle.loads): 

        """ 

        @path: directory to save the items. 

        @maxItems: maximum number of items to keep on disk, flush the 

        older ones. 

        @pickleFn: function used to pack the items to disk. 

        @unpickleFn: function used to unpack items from disk. 

        """ 

        self.path = path 

        self._maxItems = maxItems 

        if self._maxItems is None: 

            self._maxItems = 100000 

        if not os.path.isdir(self.path): 

            os.mkdir(self.path) 

        self.pickleFn = pickleFn 

        self.unpickleFn = unpickleFn 

 

        # Total number of items. 

        self._nbItems = 0 

        # The actual items id start at one. 

        self.firstItemId = 0 

        self.lastItemId = 0 

        self._loadFromDisk() 

 

    def pushItem(self, item): 

        ret = None 

        if self._nbItems == self._maxItems: 

            id = self._findNext(self.firstItemId) 

            path = os.path.join(self.path, str(id)) 

            ret = self.unpickleFn(ReadFile(path)) 

            os.remove(path) 

            self.firstItemId = id + 1 

        else: 

            self._nbItems += 1 

        self.lastItemId += 1 

        path = os.path.join(self.path, str(self.lastItemId)) 

        if os.path.exists(path): 

            raise IOError('%s already exists.' % path) 

        WriteFile(path, self.pickleFn(item)) 

        return ret 

 

    def insertBackChunk(self, chunk): 

        ret = None 

        excess = self._nbItems + len(chunk) - self._maxItems 

        if excess > 0: 

            ret = chunk[0:excess] 

            chunk = chunk[excess:] 

        for i in reversed(chunk): 

            self.firstItemId -= 1 

            path = os.path.join(self.path, str(self.firstItemId)) 

            if os.path.exists(path): 

                raise IOError('%s already exists.' % path) 

            WriteFile(path, self.pickleFn(i)) 

            self._nbItems += 1 

        return ret 

 

    def popChunk(self, nbItems=None): 

        if nbItems is None: 

            nbItems = self._maxItems 

        ret = [] 

        for i in range(nbItems): 

            if self._nbItems == 0: 

                break 

            id = self._findNext(self.firstItemId) 

            path = os.path.join(self.path, str(id)) 

            ret.append(self.unpickleFn(ReadFile(path))) 

            os.remove(path) 

            self._nbItems -= 1 

            self.firstItemId = id + 1 

        return ret 

 

    def save(self): 

        pass 

 

    def items(self): 

        """Warning, very slow.""" 

        ret = [] 

        for id in range(self.firstItemId, self.lastItemId + 1): 

            path = os.path.join(self.path, str(id)) 

            if os.path.exists(path): 

                ret.append(self.unpickleFn(ReadFile(path))) 

        return ret 

 

    def nbItems(self): 

        return self._nbItems 

 

    def maxItems(self): 

        return self._maxItems 

 

    #### Protected functions 

 

    def _findNext(self, id): 

        while True: 

            path = os.path.join(self.path, str(id)) 

            if os.path.isfile(path): 

                return id 

            id += 1 

        return None 

 

    def _loadFromDisk(self): 

        """Loads the queue from disk upto self.maxMemoryItems items into 

        self.items. 

        """ 

        def SafeInt(item): 

            try: 

                return int(item) 

            except ValueError: 

                return None 

 

        files = filter(None, [SafeInt(x) for x in os.listdir(self.path)]) 

        files.sort() 

        self._nbItems = len(files) 

        if self._nbItems: 

            self.firstItemId = files[0] 

            self.lastItemId = files[-1] 

 

 

class PersistentQueue(object): 

    """Keeps a list of abstract items and serializes it to the disk. 

 

    It has 2 layers of queue, normally an in-memory queue and an on-disk queue. 

    When the number of items in the primary queue gets too large, the new items 

    are automatically saved to the secondary queue. The older items are kept in 

    the primary queue. 

    """ 

    implements(IQueue) 

 

    def __init__(self, primaryQueue=None, secondaryQueue=None, path=None): 

        """ 

        @primaryQueue: memory queue to use before buffering to disk. 

        @secondaryQueue: disk queue to use as permanent buffer. 

        @path: path is a shortcut when using default DiskQueue settings. 

        """ 

        self.primaryQueue = primaryQueue 

        if self.primaryQueue is None: 

            self.primaryQueue = MemoryQueue() 

        self.secondaryQueue = secondaryQueue 

        if self.secondaryQueue is None: 

            self.secondaryQueue = DiskQueue(path) 

        # Preload data from the secondary queue only if we know we won't start 

        # using the secondary queue right away. 

        if self.secondaryQueue.nbItems() < self.primaryQueue.maxItems(): 

            self.primaryQueue.insertBackChunk( 

                self.secondaryQueue.popChunk(self.primaryQueue.maxItems())) 

 

    def pushItem(self, item): 

        # If there is already items in secondaryQueue, we'd need to pop them 

        # all to start inserting them into primaryQueue so don't bother and 

        # just push it in secondaryQueue. 

        if (self.secondaryQueue.nbItems() or 

            self.primaryQueue.nbItems() == self.primaryQueue.maxItems()): 

            item = self.secondaryQueue.pushItem(item) 

            if item is None: 

                return item 

            # If item is not None, secondaryQueue overflowed. We need to push it 

            # back to primaryQueue so the oldest item is dumped. 

        # Or everything fit in the primaryQueue. 

        return self.primaryQueue.pushItem(item) 

 

    def insertBackChunk(self, chunk): 

        ret = None 

        # Overall excess 

        excess = self.nbItems() + len(chunk) - self.maxItems() 

        if excess > 0: 

            ret = chunk[0:excess] 

            chunk = chunk[excess:] 

        # Memory excess 

        excess = (self.primaryQueue.nbItems() + len(chunk) - 

                  self.primaryQueue.maxItems()) 

        if excess > 0: 

            chunk2 = [] 

            for i in range(excess): 

                chunk2.append(self.primaryQueue.items().pop()) 

            chunk2.reverse() 

        x = self.primaryQueue.insertBackChunk(chunk) 

        assert x is None, "primaryQueue.insertBackChunk did not return None" 

        if excess > 0: 

            x = self.secondaryQueue.insertBackChunk(chunk2) 

            assert x is None, ("secondaryQueue.insertBackChunk did not return " 

                               " None") 

        return ret 

 

    def popChunk(self, nbItems=None): 

        if nbItems is None: 

            nbItems = self.primaryQueue.maxItems() 

        ret = self.primaryQueue.popChunk(nbItems) 

        nbItems -= len(ret) 

        if nbItems and self.secondaryQueue.nbItems(): 

            ret.extend(self.secondaryQueue.popChunk(nbItems)) 

        return ret 

 

    def save(self): 

        self.secondaryQueue.insertBackChunk(self.primaryQueue.popChunk()) 

 

    def items(self): 

        return self.primaryQueue.items() + self.secondaryQueue.items() 

 

    def nbItems(self): 

        return self.primaryQueue.nbItems() + self.secondaryQueue.nbItems() 

 

    def maxItems(self): 

        return self.primaryQueue.maxItems() + self.secondaryQueue.maxItems() 

 

 

class IndexedQueue(object): 

    """Adds functionality to a IQueue object to track its usage. 

 

    Adds a new member function getIndex() and modify popChunk() and 

    insertBackChunk() to keep a virtual pointer to the queue's first entry 

    index.""" 

    implements(IQueue) 

 

    def __init__(self, queue): 

        # Copy all the member functions from the other object that this class 

        # doesn't already define. 

        assert IQueue.providedBy(queue) 

        def Filter(m): 

            return (m[0] != '_' and callable(getattr(queue, m)) 

                    and not hasattr(self, m)) 

        for member in filter(Filter, dir(queue)): 

            setattr(self, member, getattr(queue, member)) 

        self.queue = queue 

        self._index = 0 

 

    def getIndex(self): 

        return self._index 

 

    def popChunk(self, *args, **kwargs): 

        items = self.queue.popChunk(*args, **kwargs) 

        if items: 

            self._index += len(items) 

        return items 

 

    def insertBackChunk(self, items): 

        self._index -= len(items) 

        ret = self.queue.insertBackChunk(items) 

        if ret: 

            self._index += len(ret) 

        return ret 

 

 

def ToIndexedQueue(queue): 

    """If the IQueue wasn't already a IndexedQueue, makes it an IndexedQueue.""" 

    if not IQueue.providedBy(queue): 

        raise TypeError("queue doesn't implement IQueue", queue) 

    if isinstance(queue, IndexedQueue): 

        return queue 

    return IndexedQueue(queue) 

 

# vim: set ts=4 sts=4 sw=4 et: