Source code for opencensus.ext.zipkin.trace_exporter

# Copyright 2017, OpenCensus Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Export the spans data to Zipkin Collector."""

import json
import logging

import requests

from opencensus.common.transports import sync
from opencensus.common.utils import check_str_length, timestamp_to_microseconds
from opencensus.trace import base_exporter

DEFAULT_ENDPOINT = '/api/v2/spans'
DEFAULT_HOST_NAME = 'localhost'
DEFAULT_PORT = 9411
DEFAULT_PROTOCOL = 'http'
ZIPKIN_HEADERS = {'Content-Type': 'application/json'}

SPAN_KIND_MAP = {
    0: None,  # span kind unspecified
    1: "SERVER",
    2: "CLIENT",
}

SUCCESS_STATUS_CODE = (200, 202)


[docs]class ZipkinExporter(base_exporter.Exporter): """Export the spans to Zipkin. See: http://zipkin.io/zipkin-api/# :type service_name: str :param service_name: Service that logged an annotation in a trace. Classifier when query for spans. :type host_name: str :param host_name: (Optional) The host name of the Zipkin server. :type port: int :param port: (Optional) The port of the Zipkin server. :type end_point: str :param end_point: (Optional) The path for the span exporting endpoint. :type protocol: str :param protocol: (Optional) The protocol used for the request. :type transport: :class:`type` :param transport: Class for creating new transport objects. It should extend from the base_exporter :class:`.Transport` type and implement :meth:`.Transport.export`. Defaults to :class:`.SyncTransport`. The other option is :class:`.AsyncTransport`. """ def __init__( self, service_name='my_service', host_name=DEFAULT_HOST_NAME, port=DEFAULT_PORT, endpoint=DEFAULT_ENDPOINT, protocol=DEFAULT_PROTOCOL, transport=sync.SyncTransport, ipv4=None, ipv6=None): self.service_name = service_name self.host_name = host_name self.port = port self.endpoint = endpoint self.protocol = protocol self.url = self.get_url self.transport = transport(self) self.ipv4 = ipv4 self.ipv6 = ipv6 @property def get_url(self): return '{}://{}:{}{}'.format( self.protocol, self.host_name, self.port, self.endpoint)
[docs] def emit(self, span_datas): """Send SpanData tuples to Zipkin server, default using the v2 API. :type span_datas: list of :class: `~opencensus.trace.span_data.SpanData` :param list of opencensus.trace.span_data.SpanData span_datas: SpanData tuples to emit """ try: zipkin_spans = self.translate_to_zipkin(span_datas) result = requests.post( url=self.url, data=json.dumps(zipkin_spans), headers=ZIPKIN_HEADERS) if result.status_code not in SUCCESS_STATUS_CODE: logging.error( "Failed to send spans to Zipkin server! Spans are {}" .format(zipkin_spans)) except Exception as e: # pragma: NO COVER logging.error(getattr(e, 'message', e))
[docs] def export(self, span_datas): self.transport.export(span_datas)
[docs] def translate_to_zipkin(self, span_datas): """Translate the opencensus spans to zipkin spans. :type span_datas: list of :class: `~opencensus.trace.span_data.SpanData` :param span_datas: SpanData tuples to emit :rtype: list :returns: List of zipkin format spans. """ local_endpoint = { 'serviceName': self.service_name, 'port': self.port, } if self.ipv4 is not None: local_endpoint['ipv4'] = self.ipv4 if self.ipv6 is not None: local_endpoint['ipv6'] = self.ipv6 zipkin_spans = [] for span in span_datas: # Timestamp in zipkin spans is int of microseconds. start_timestamp_mus = timestamp_to_microseconds(span.start_time) end_timestamp_mus = timestamp_to_microseconds(span.end_time) duration_mus = end_timestamp_mus - start_timestamp_mus zipkin_span = { 'traceId': span.context.trace_id, 'id': str(span.span_id), 'name': span.name, 'timestamp': int(round(start_timestamp_mus)), 'duration': int(round(duration_mus)), 'localEndpoint': local_endpoint, 'tags': _extract_tags_from_span(span.attributes), 'annotations': _extract_annotations_from_span(span), } span_kind = span.span_kind parent_span_id = span.parent_span_id if span_kind is not None: kind = SPAN_KIND_MAP.get(span_kind) # Zipkin API for span kind only accept # enum(CLIENT|SERVER|PRODUCER|CONSUMER|Absent) if kind is not None: zipkin_span['kind'] = kind if parent_span_id is not None: zipkin_span['parentId'] = str(parent_span_id) zipkin_spans.append(zipkin_span) return zipkin_spans
def _extract_tags_from_span(attr): if attr is None: return {} tags = {} for attribute_key, attribute_value in attr.items(): if isinstance(attribute_value, (int, bool, float)): value = str(attribute_value) elif isinstance(attribute_value, str): res, _ = check_str_length(str_to_check=attribute_value) value = res else: logging.warning('Could not serialize tag %s', attribute_key) continue tags[attribute_key] = value return tags def _extract_annotations_from_span(span): """Extract and convert time event annotations to zipkin annotations""" if span.annotations is None: return [] annotations = [] for annotation in span.annotations: event_timestamp_mus = timestamp_to_microseconds(annotation.timestamp) annotations.append({'timestamp': int(round(event_timestamp_mus)), 'value': annotation.description}) return annotations