/usr/lib/python2.7/dist-packages/jenkinsapi/queue.py is in python-jenkinsapi 0.2.30-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 | """
Queue module for jenkinsapi
"""
from requests import HTTPError
from jenkinsapi.jenkinsbase import JenkinsBase
from jenkinsapi.custom_exceptions import UnknownQueueItem, NotBuiltYet
import logging
import time
log = logging.getLogger(__name__)
class Queue(JenkinsBase):
"""
Class that represents the Jenkins queue
"""
def __init__(self, baseurl, jenkins_obj):
"""
Init the Jenkins queue object
:param baseurl: basic url for the queue
:param jenkins_obj: ref to the jenkins obj
"""
self.jenkins = jenkins_obj
JenkinsBase.__init__(self, baseurl)
def __str__(self):
return self.baseurl
def get_jenkins_obj(self):
return self.jenkins
def iteritems(self):
for item in self._data['items']:
queue_id = item['id']
item_baseurl = "%s/item/%i" % (self.baseurl, queue_id)
yield item['id'], QueueItem(baseurl=item_baseurl,
jenkins_obj=self.jenkins)
def iterkeys(self):
for item in self._data['items']:
yield item['id']
def itervalues(self):
for item in self._data['items']:
yield QueueItem(self.jenkins, **item)
def keys(self):
return list(self.iterkeys())
def values(self):
return list(self.itervalues())
def __len__(self):
return len(self._data['items'])
def __getitem__(self, item_id):
self_as_dict = dict(self.iteritems())
if item_id in self_as_dict:
return self_as_dict[item_id]
else:
raise UnknownQueueItem(item_id)
def _get_queue_items_for_job(self, job_name):
for item in self._data["items"]:
if item['task']['name'] == job_name:
yield QueueItem(self.get_queue_item_url(item),
jenkins_obj=self.jenkins)
def get_queue_items_for_job(self, job_name):
return list(self._get_queue_items_for_job(job_name))
def get_queue_item_url(self, item):
return "%s/item/%i" % (self.baseurl, item["id"])
def delete_item(self, queue_item):
self.delete_item_by_id(queue_item.queue_id)
def delete_item_by_id(self, item_id):
deleteurl = '%s/cancelItem?id=%s' % (self.baseurl, item_id)
self.get_jenkins_obj().requester.post_url(deleteurl)
class QueueItem(JenkinsBase):
"""An individual item in the queue
"""
def __init__(self, baseurl, jenkins_obj):
self.jenkins = jenkins_obj
JenkinsBase.__init__(self, baseurl)
@property
def queue_id(self):
return self._data['id']
@property
def name(self):
return self._data['task']['name']
def get_jenkins_obj(self):
return self.jenkins
def get_job(self):
"""
Return the job associated with this queue item
"""
return self.jenkins[self._data['task']['name']]
def get_parameters(self):
"""returns parameters of queue item"""
actions = self._data.get('actions', [])
for action in actions:
if isinstance(action, dict) and 'parameters' in action:
parameters = action['parameters']
return dict([(x['name'], x.get('value', None))
for x in parameters])
return []
def __repr__(self):
return "<%s.%s %s>" % (self.__class__.__module__,
self.__class__.__name__, str(self))
def __str__(self):
return "%s Queue #%i" % (self.name, self.queue_id)
def get_build(self):
build_number = self.get_build_number()
job_name = self.get_job_name()
return self.jenkins[job_name][build_number]
def block_until_complete(self, delay=5):
build = self.block_until_building(delay)
return build.block_until_complete(delay=delay)
def block_until_building(self, delay=5):
while True:
try:
self.poll()
return self.get_build()
except (NotBuiltYet, HTTPError):
time.sleep(delay)
continue
def is_running(self):
"""Return True if this queued item is running.
"""
try:
return self.get_build().is_running()
except NotBuiltYet:
return False
def get_build_number(self):
try:
return self._data['executable']['number']
except KeyError:
raise NotBuiltYet()
def get_job_name(self):
try:
return self._data['task']['name']
except KeyError:
raise NotBuiltYet()
|