Script pagure_events_py
[hide private]
[frames] | no frames]

Source Code for Script script-pagure_events_py

  1  #!/usr/bin/python3 
  2   
  3  import json 
  4  import pprint 
  5  import zmq 
  6  import sys 
  7  import os 
  8  import logging 
  9  import requests 
 10  import re 
 11  import munch 
 12   
 13  sys.path.append( 
 14      os.path.dirname(os.path.dirname(os.path.realpath(__file__))) 
 15  ) 
 16   
 17  from coprs import db, app, models 
 18  from coprs.logic.coprs_logic import CoprDirsLogic 
 19  from coprs.logic.builds_logic import BuildsLogic 
 20  from coprs.logic.complex_logic import ComplexLogic 
 21  from coprs.logic.packages_logic import PackagesLogic 
 22  from coprs import helpers 
 23   
 24  from urllib.parse import urlparse 
 25   
 26  SCM_SOURCE_TYPE = helpers.BuildSourceEnum("scm") 
 27   
 28  logging.basicConfig( 
 29      filename='{0}/pagure-events.log'.format(app.config.get('LOG_DIR')), 
 30      format='[%(asctime)s][%(levelname)6s]: %(message)s', 
 31      level=logging.DEBUG) 
 32   
 33  log = logging.getLogger(__name__) 
 34  log.addHandler(logging.StreamHandler(sys.stdout)) 
 35   
 36  if os.getenv('PAGURE_EVENTS_TESTONLY'): 
 37      ENDPOINT = 'tcp://stg.pagure.io:9940' 
 38  else: 
 39      ENDPOINT = 'tcp://hub.fedoraproject.org:9940' 
 40   
 41  log.info("ENDPOINT = {}".format(ENDPOINT)) 
 42   
 43  pagure_instances = { 
 44      'https://pagure.io/':             'io.pagure.prod.pagure', 
 45      'https://src.fedoraproject.org/': 'org.fedoraproject.prod.pagure', 
 46      'https://stg.pagure.io/':         'io.pagure.stg.pagure', # testing only 
 47  } 
 48   
 49  topics = [ 
 50      'git.receive', 
 51      'pull-request.new', 
 52      'pull-request.rebased', 
 53      'pull-request.updated', 
 54      'pull-request.comment.added', 
 55  ] 
 56   
 57  TOPICS = {} 
 58  for url, fedmsg_prefix in pagure_instances.items(): 
 59      for topic in topics: 
 60          TOPICS['{0}.{1}'.format(fedmsg_prefix, topic)] = url 
61 62 -def get_repeatedly(url):
63 log.info("getting url {}".format(url)) 64 for attempt in range(1, 4): 65 r = requests.get(url) 66 if r.status_code == requests.codes.ok: 67 return r.text 68 else: 69 log.error('Bad http status {0} from url {1}, attempt {2}'.format( 70 r.status_code, url, attempt)) 71 # pagure down? 72 return ""
73
74 -class ScmPackage(object):
75 - def __init__(self, db_row):
76 self.source_json_dict = json.loads(db_row.source_json) 77 self.clone_url = self.source_json_dict.get('clone_url') or '' 78 self.committish = self.source_json_dict.get('committish') or '' 79 self.subdirectory = self.source_json_dict.get('subdirectory') or '' 80 81 self.package = ComplexLogic.get_package_by_id_safe(db_row.package_id) 82 self.copr = self.package.copr
83
84 - def build(self, source_dict_update, copr_dir, update_callback, 85 scm_object_type, scm_object_id, scm_object_url, agent_url):
86 87 if self.package.copr_dir.name != copr_dir.name: 88 package = PackagesLogic.get_or_create(copr_dir, self.package.name, self.package) 89 else: 90 package = self.package 91 92 db.session.execute('LOCK TABLE build IN EXCLUSIVE MODE') 93 return BuildsLogic.rebuild_package( 94 package, source_dict_update, copr_dir, update_callback, 95 scm_object_type, scm_object_id, scm_object_url, submitted_by=agent_url)
96 97 @classmethod
98 - def get_candidates_for_rebuild(cls, clone_url):
99 if db.engine.url.drivername == 'sqlite': 100 placeholder = '?' 101 true = '1' 102 else: 103 placeholder = '%s' 104 true = 'true' 105 106 rows = db.engine.execute( 107 """ 108 SELECT package.id AS package_id, package.source_json AS source_json, package.copr_id AS copr_id 109 FROM package JOIN copr_dir ON package.copr_dir_id = copr_dir.id 110 WHERE package.source_type = {0} AND 111 package.webhook_rebuild = {1} AND 112 copr_dir.main = {2} AND 113 package.source_json ILIKE {placeholder} 114 """.format(SCM_SOURCE_TYPE, true, true, placeholder=placeholder), '%'+clone_url+'%' 115 ) 116 return [ScmPackage(row) for row in rows]
117 118
119 - def is_dir_in_commit(self, changed_files):
120 if not changed_files: 121 return False 122 123 sm = helpers.SubdirMatch(self.subdirectory) 124 for filename in changed_files: 125 if sm.match(filename): 126 return True 127 128 return False
129
130 131 -def event_info_from_pr_comment(data, base_url):
132 """ 133 Message handler for updated pull-request opened in pagure. 134 Topic: ``*.pagure.pull-request.comment.added`` 135 """ 136 if data['msg']['pullrequest']['status'] != 'Open': 137 log.info('Pull-request not open, discarding.') 138 return False 139 140 if not data['msg']['pullrequest']['comments']: 141 log.info('This is most odd, we\'re not seeing comments.') 142 return False 143 144 last_comment = data['msg']['pullrequest']['comments'][-1] 145 if not last_comment: 146 log.info('Can not access last comment, discarding.') 147 return False 148 149 if not 'comment' in last_comment or '[copr-build]' not in last_comment['comment']: 150 log.info('The [copr-build] is not present in the message.') 151 return False 152 153 return munch.Munch({ 154 'object_id': data['msg']['pullrequest']['id'], 155 'object_type': 'pull-request', 156 'base_project_url_path': data['msg']['pullrequest']['project']['url_path'], 157 'base_clone_url_path': data['msg']['pullrequest']['project']['fullname'], 158 'base_clone_url': base_url + data['msg']['pullrequest']['project']['fullname'], 159 'project_url_path': data['msg']['pullrequest']['repo_from']['url_path'], 160 'clone_url_path': data['msg']['pullrequest']['repo_from']['fullname'], 161 'clone_url': base_url + data['msg']['pullrequest']['repo_from']['fullname'], 162 'branch_from': data['msg']['pullrequest']['branch_from'], 163 'branch_to': data['msg']['pullrequest']['branch'], 164 'start_commit': data['msg']['pullrequest']['commit_start'], 165 'end_commit': data['msg']['pullrequest']['commit_stop'], 166 'agent': data['msg']['agent'], 167 })
168
169 170 -def event_info_from_pr(data, base_url):
171 """ 172 Message handler for new pull-request opened in pagure. 173 Topic: ``*.pagure.pull-request.new`` 174 """ 175 return munch.Munch({ 176 'object_id': data['msg']['pullrequest']['id'], 177 'object_type': 'pull-request', 178 'base_project_url_path': data['msg']['pullrequest']['project']['url_path'], 179 'base_clone_url_path': data['msg']['pullrequest']['project']['fullname'], 180 'base_clone_url': base_url + data['msg']['pullrequest']['project']['fullname'], 181 'project_url_path': data['msg']['pullrequest']['repo_from']['url_path'], 182 'clone_url_path': data['msg']['pullrequest']['repo_from']['fullname'], 183 'clone_url': base_url + data['msg']['pullrequest']['repo_from']['fullname'], 184 'branch_from': data['msg']['pullrequest']['branch_from'], 185 'branch_to': data['msg']['pullrequest']['branch'], 186 'start_commit': data['msg']['pullrequest']['commit_start'], 187 'end_commit': data['msg']['pullrequest']['commit_stop'], 188 'agent': data['msg']['agent'], 189 })
190
191 192 -def event_info_from_push(data, base_url):
193 """ 194 Message handler for push event in pagure. 195 Topic: ``*.pagure.git.receive`` 196 """ 197 return munch.Munch({ 198 'object_id': data['msg']['end_commit'], 199 'object_type': 'commit', 200 'base_project_url_path': data['msg']['repo']['url_path'], 201 'base_clone_url_path': data['msg']['repo']['fullname'], 202 'base_clone_url': base_url + data['msg']['repo']['fullname'], 203 'project_url_path': data['msg']['repo']['url_path'], 204 'clone_url_path': data['msg']['repo']['fullname'], 205 'clone_url': base_url + data['msg']['repo']['fullname'], 206 'branch_from': data['msg']['branch'], 207 'branch_to': data['msg']['branch'], 208 'start_commit': data['msg']['start_commit'], 209 'end_commit': data['msg']['end_commit'], 210 'agent': data['msg']['agent'], 211 })
212
213 214 -def git_compare_urls(url1, url2):
215 url1 = re.sub(r'(\.git)?/*$', '', str(url1)) 216 url2 = re.sub(r'(\.git)?/*$', '', str(url2)) 217 o1 = urlparse(url1) 218 o2 = urlparse(url2) 219 return (o1.netloc == o2.netloc and o1.path == o2.path)
220
221 222 -def build_on_fedmsg_loop():
223 log.debug("Setting up poller...") 224 pp = pprint.PrettyPrinter(width=120) 225 226 ctx = zmq.Context() 227 s = ctx.socket(zmq.SUB) 228 229 # detect server hang/restart (still a chance to loose ~45s events) 230 # for more info see man tcp(7). 231 s.setsockopt(zmq.TCP_KEEPALIVE, 1) # turn on keep-alive 232 s.setsockopt(zmq.TCP_KEEPALIVE_IDLE, 30) # start when 30s inactive 233 s.setsockopt(zmq.TCP_KEEPALIVE_INTVL, 5) # send keep-alive packet each 5s 234 s.setsockopt(zmq.TCP_KEEPALIVE_CNT, 3) # restart after 3 fails 235 236 s.connect(ENDPOINT) 237 238 for topic in TOPICS: 239 s.setsockopt_string(zmq.SUBSCRIBE, topic) 240 241 poller = zmq.Poller() 242 poller.register(s, zmq.POLLIN) 243 244 while True: 245 log.debug('Polling...') 246 evts = poller.poll(10000) 247 if not evts: 248 continue 249 250 log.debug('Receiving...') 251 _, msg_bytes = s.recv_multipart() 252 msg = msg_bytes.decode('utf-8') 253 254 log.debug('Parsing...') 255 data = json.loads(msg) 256 257 log.info('Got topic: {}'.format(data['topic'])) 258 base_url = TOPICS.get(data['topic']) 259 if not base_url: 260 log.error('Unknown topic {} received. Continuing.') 261 continue 262 263 if re.match(r'^.*.pull-request.(new|rebased|updated)$', data['topic']): 264 event_info = event_info_from_pr(data, base_url) 265 elif re.match(r'^.*.pull-request.comment.added$', data['topic']): 266 event_info = event_info_from_pr_comment(data, base_url) 267 else: 268 event_info = event_info_from_push(data, base_url) 269 270 log.info('event_info = {}'.format(pp.pformat(event_info))) 271 272 if not event_info: 273 log.info('Received event was discarded. Continuing.') 274 continue 275 276 candidates = ScmPackage.get_candidates_for_rebuild(event_info.base_clone_url) 277 changed_files = set() 278 279 if candidates: 280 raw_commit_url = base_url + event_info.project_url_path + '/raw/' + event_info.start_commit 281 raw_commit_text = get_repeatedly(raw_commit_url) 282 changed_files |= helpers.raw_commit_changes(raw_commit_text) 283 284 if event_info.start_commit != event_info.end_commit: 285 # we want to show changes in start_commit + diff 286 # start_commit..end_commit 287 change_html_url = '{base_url}{project}/c/{start}..{end}'.format( 288 base_url=base_url, 289 project=event_info.project_url_path, 290 start=event_info.start_commit, 291 end=event_info.end_commit) 292 293 change_html_text = get_repeatedly(change_html_url) 294 changed_files |= helpers.pagure_html_diff_changed(change_html_text) 295 296 log.info("changed files: {}".format(", ".join(changed_files))) 297 298 for pkg in candidates: 299 package = '{}/{}(id={})'.format( 300 pkg.package.copr.full_name, 301 pkg.package.name, 302 pkg.package.id 303 ) 304 log.info('Considering pkg package: {}, source_json: {}' 305 .format(package, pkg.source_json_dict)) 306 307 if (git_compare_urls(pkg.clone_url, event_info.base_clone_url) 308 and (not pkg.committish or event_info.branch_to.endswith(pkg.committish)) 309 and pkg.is_dir_in_commit(changed_files)): 310 311 log.info('\t -> accepted.') 312 313 if event_info.object_type == 'pull-request': 314 dirname = pkg.copr.name + ':pr:' + str(event_info.object_id) 315 copr_dir = CoprDirsLogic.get_or_create(pkg.copr, dirname) 316 update_callback = 'pagure_flag_pull_request' 317 scm_object_url = os.path.join(base_url, event_info.project_url_path, 318 'c', str(event_info.end_commit)) 319 else: 320 copr_dir = pkg.copr.main_dir 321 update_callback = 'pagure_flag_commit' 322 scm_object_url = os.path.join(base_url, event_info.base_project_url_path, 323 'c', str(event_info.object_id)) 324 325 if not git_compare_urls(pkg.copr.scm_repo_url, event_info.base_clone_url): 326 update_callback = '' 327 328 source_dict_update = { 329 'clone_url': event_info.clone_url, 330 'committish': event_info.end_commit, 331 } 332 333 try: 334 build = pkg.build( 335 source_dict_update, 336 copr_dir, 337 update_callback, 338 event_info.object_type, 339 event_info.object_id, 340 scm_object_url, 341 "{}user/{}".format(base_url, event_info.agent), 342 ) 343 if build: 344 log.info('\t -> {}'.format(build.to_dict())) 345 except Exception as e: 346 log.error(str(e)) 347 db.session.rollback() 348 else: 349 db.session.commit() 350 else: 351 log.info('\t -> skipping.')
352 353 354 if __name__ == '__main__': 355 while True: 356 try: 357 build_on_fedmsg_loop() 358 except KeyboardInterrupt: 359 sys.exit(1) 360 except: 361 log.exception('Error in fedmsg loop. Restarting it.') 362