1
2
3 import json
4 import pprint
5 import zmq
6 import sys
7 import os
8 import logging
9 import requests
10 import re
11 import munch
12
13 sys.path.append(
14 os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
15 )
16
17 from coprs import db, app, models
18 from coprs.logic.coprs_logic import CoprDirsLogic
19 from coprs.logic.builds_logic import BuildsLogic
20 from coprs.logic.complex_logic import ComplexLogic
21 from coprs.logic.packages_logic import PackagesLogic
22 from coprs import helpers
23
24 from urllib.parse import urlparse
25
26 SCM_SOURCE_TYPE = helpers.BuildSourceEnum("scm")
27
28 logging.basicConfig(
29 filename='{0}/pagure-events.log'.format(app.config.get('LOG_DIR')),
30 format='[%(asctime)s][%(levelname)6s]: %(message)s',
31 level=logging.DEBUG)
32
33 log = logging.getLogger(__name__)
34 log.addHandler(logging.StreamHandler(sys.stdout))
35
36 if os.getenv('PAGURE_EVENTS_TESTONLY'):
37 ENDPOINT = 'tcp://stg.pagure.io:9940'
38 else:
39 ENDPOINT = 'tcp://hub.fedoraproject.org:9940'
40
41 log.info("ENDPOINT = {}".format(ENDPOINT))
42
43 pagure_instances = {
44 'https://pagure.io/': 'io.pagure.prod.pagure',
45 'https://src.fedoraproject.org/': 'org.fedoraproject.prod.pagure',
46 'https://stg.pagure.io/': 'io.pagure.stg.pagure',
47 }
48
49 topics = [
50 'git.receive',
51 'pull-request.new',
52 'pull-request.rebased',
53 'pull-request.updated',
54 'pull-request.comment.added',
55 ]
56
57 TOPICS = {}
58 for url, fedmsg_prefix in pagure_instances.items():
59 for topic in topics:
60 TOPICS['{0}.{1}'.format(fedmsg_prefix, topic)] = url
63 log.info("getting url {}".format(url))
64 for attempt in range(1, 4):
65 r = requests.get(url)
66 if r.status_code == requests.codes.ok:
67 return r.text
68 else:
69 log.error('Bad http status {0} from url {1}, attempt {2}'.format(
70 r.status_code, url, attempt))
71
72 return ""
73
83
84 - def build(self, source_dict_update, copr_dir, update_callback,
85 scm_object_type, scm_object_id, scm_object_url, agent_url):
96
97 @classmethod
99 if db.engine.url.drivername == 'sqlite':
100 placeholder = '?'
101 true = '1'
102 else:
103 placeholder = '%s'
104 true = 'true'
105
106 rows = db.engine.execute(
107 """
108 SELECT package.id AS package_id, package.source_json AS source_json, package.copr_id AS copr_id
109 FROM package JOIN copr_dir ON package.copr_dir_id = copr_dir.id
110 WHERE package.source_type = {0} AND
111 package.webhook_rebuild = {1} AND
112 copr_dir.main = {2} AND
113 package.source_json ILIKE {placeholder}
114 """.format(SCM_SOURCE_TYPE, true, true, placeholder=placeholder), '%'+clone_url+'%'
115 )
116 return [ScmPackage(row) for row in rows]
117
118
120 if not changed_files:
121 return False
122
123 sm = helpers.SubdirMatch(self.subdirectory)
124 for filename in changed_files:
125 if sm.match(filename):
126 return True
127
128 return False
129
168
171 """
172 Message handler for new pull-request opened in pagure.
173 Topic: ``*.pagure.pull-request.new``
174 """
175 return munch.Munch({
176 'object_id': data['msg']['pullrequest']['id'],
177 'object_type': 'pull-request',
178 'base_project_url_path': data['msg']['pullrequest']['project']['url_path'],
179 'base_clone_url_path': data['msg']['pullrequest']['project']['fullname'],
180 'base_clone_url': base_url + data['msg']['pullrequest']['project']['fullname'],
181 'project_url_path': data['msg']['pullrequest']['repo_from']['url_path'],
182 'clone_url_path': data['msg']['pullrequest']['repo_from']['fullname'],
183 'clone_url': base_url + data['msg']['pullrequest']['repo_from']['fullname'],
184 'branch_from': data['msg']['pullrequest']['branch_from'],
185 'branch_to': data['msg']['pullrequest']['branch'],
186 'start_commit': data['msg']['pullrequest']['commit_start'],
187 'end_commit': data['msg']['pullrequest']['commit_stop'],
188 'agent': data['msg']['agent'],
189 })
190
193 """
194 Message handler for push event in pagure.
195 Topic: ``*.pagure.git.receive``
196 """
197 return munch.Munch({
198 'object_id': data['msg']['end_commit'],
199 'object_type': 'commit',
200 'base_project_url_path': data['msg']['repo']['url_path'],
201 'base_clone_url_path': data['msg']['repo']['fullname'],
202 'base_clone_url': base_url + data['msg']['repo']['fullname'],
203 'project_url_path': data['msg']['repo']['url_path'],
204 'clone_url_path': data['msg']['repo']['fullname'],
205 'clone_url': base_url + data['msg']['repo']['fullname'],
206 'branch_from': data['msg']['branch'],
207 'branch_to': data['msg']['branch'],
208 'start_commit': data['msg']['start_commit'],
209 'end_commit': data['msg']['end_commit'],
210 'agent': data['msg']['agent'],
211 })
212
215 url1 = re.sub(r'(\.git)?/*$', '', str(url1))
216 url2 = re.sub(r'(\.git)?/*$', '', str(url2))
217 o1 = urlparse(url1)
218 o2 = urlparse(url2)
219 return (o1.netloc == o2.netloc and o1.path == o2.path)
220
223 log.debug("Setting up poller...")
224 pp = pprint.PrettyPrinter(width=120)
225
226 ctx = zmq.Context()
227 s = ctx.socket(zmq.SUB)
228
229
230
231 s.setsockopt(zmq.TCP_KEEPALIVE, 1)
232 s.setsockopt(zmq.TCP_KEEPALIVE_IDLE, 30)
233 s.setsockopt(zmq.TCP_KEEPALIVE_INTVL, 5)
234 s.setsockopt(zmq.TCP_KEEPALIVE_CNT, 3)
235
236 s.connect(ENDPOINT)
237
238 for topic in TOPICS:
239 s.setsockopt_string(zmq.SUBSCRIBE, topic)
240
241 poller = zmq.Poller()
242 poller.register(s, zmq.POLLIN)
243
244 while True:
245 log.debug('Polling...')
246 evts = poller.poll(10000)
247 if not evts:
248 continue
249
250 log.debug('Receiving...')
251 _, msg_bytes = s.recv_multipart()
252 msg = msg_bytes.decode('utf-8')
253
254 log.debug('Parsing...')
255 data = json.loads(msg)
256
257 log.info('Got topic: {}'.format(data['topic']))
258 base_url = TOPICS.get(data['topic'])
259 if not base_url:
260 log.error('Unknown topic {} received. Continuing.')
261 continue
262
263 if re.match(r'^.*.pull-request.(new|rebased|updated)$', data['topic']):
264 event_info = event_info_from_pr(data, base_url)
265 elif re.match(r'^.*.pull-request.comment.added$', data['topic']):
266 event_info = event_info_from_pr_comment(data, base_url)
267 else:
268 event_info = event_info_from_push(data, base_url)
269
270 log.info('event_info = {}'.format(pp.pformat(event_info)))
271
272 if not event_info:
273 log.info('Received event was discarded. Continuing.')
274 continue
275
276 candidates = ScmPackage.get_candidates_for_rebuild(event_info.base_clone_url)
277 changed_files = set()
278
279 if candidates:
280 raw_commit_url = base_url + event_info.project_url_path + '/raw/' + event_info.start_commit
281 raw_commit_text = get_repeatedly(raw_commit_url)
282 changed_files |= helpers.raw_commit_changes(raw_commit_text)
283
284 if event_info.start_commit != event_info.end_commit:
285
286
287 change_html_url = '{base_url}{project}/c/{start}..{end}'.format(
288 base_url=base_url,
289 project=event_info.project_url_path,
290 start=event_info.start_commit,
291 end=event_info.end_commit)
292
293 change_html_text = get_repeatedly(change_html_url)
294 changed_files |= helpers.pagure_html_diff_changed(change_html_text)
295
296 log.info("changed files: {}".format(", ".join(changed_files)))
297
298 for pkg in candidates:
299 package = '{}/{}(id={})'.format(
300 pkg.package.copr.full_name,
301 pkg.package.name,
302 pkg.package.id
303 )
304 log.info('Considering pkg package: {}, source_json: {}'
305 .format(package, pkg.source_json_dict))
306
307 if (git_compare_urls(pkg.clone_url, event_info.base_clone_url)
308 and (not pkg.committish or event_info.branch_to.endswith(pkg.committish))
309 and pkg.is_dir_in_commit(changed_files)):
310
311 log.info('\t -> accepted.')
312
313 if event_info.object_type == 'pull-request':
314 dirname = pkg.copr.name + ':pr:' + str(event_info.object_id)
315 copr_dir = CoprDirsLogic.get_or_create(pkg.copr, dirname)
316 update_callback = 'pagure_flag_pull_request'
317 scm_object_url = os.path.join(base_url, event_info.project_url_path,
318 'c', str(event_info.end_commit))
319 else:
320 copr_dir = pkg.copr.main_dir
321 update_callback = 'pagure_flag_commit'
322 scm_object_url = os.path.join(base_url, event_info.base_project_url_path,
323 'c', str(event_info.object_id))
324
325 if not git_compare_urls(pkg.copr.scm_repo_url, event_info.base_clone_url):
326 update_callback = ''
327
328 source_dict_update = {
329 'clone_url': event_info.clone_url,
330 'committish': event_info.end_commit,
331 }
332
333 try:
334 build = pkg.build(
335 source_dict_update,
336 copr_dir,
337 update_callback,
338 event_info.object_type,
339 event_info.object_id,
340 scm_object_url,
341 "{}user/{}".format(base_url, event_info.agent),
342 )
343 if build:
344 log.info('\t -> {}'.format(build.to_dict()))
345 except Exception as e:
346 log.error(str(e))
347 db.session.rollback()
348 else:
349 db.session.commit()
350 else:
351 log.info('\t -> skipping.')
352
353
354 if __name__ == '__main__':
355 while True:
356 try:
357 build_on_fedmsg_loop()
358 except KeyboardInterrupt:
359 sys.exit(1)
360 except:
361 log.exception('Error in fedmsg loop. Restarting it.')
362