# Copyright The OpenTelemetry Authors
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
from collections.abc import Iterable
from collections.abc import Sequence as TypingSequence
from dataclasses import replace
from logging import getLogger
from os import environ
from grpc import ChannelCredentials, Compression, StatusCode
from opentelemetry.exporter.otlp.proto.common._internal.metrics_encoder import (
OTLPMetricExporterMixin,
)
from opentelemetry.exporter.otlp.proto.common.metrics_encoder import (
encode_metrics,
)
from opentelemetry.exporter.otlp.proto.grpc.exporter import ( # noqa: F401
OTLPExporterMixin,
_get_credentials,
environ_to_compression,
get_resource_data,
)
from opentelemetry.metrics import MeterProvider
from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import (
ExportMetricsServiceRequest,
)
from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2_grpc import (
MetricsServiceStub,
)
from opentelemetry.proto.common.v1.common_pb2 import ( # noqa: F401
InstrumentationScope,
)
from opentelemetry.proto.metrics.v1 import metrics_pb2 as pb2 # noqa: F401
from opentelemetry.sdk.environment_variables import (
_OTEL_PYTHON_EXPORTER_OTLP_GRPC_METRICS_CREDENTIAL_PROVIDER,
OTEL_EXPORTER_OTLP_METRICS_CERTIFICATE,
OTEL_EXPORTER_OTLP_METRICS_CLIENT_CERTIFICATE,
OTEL_EXPORTER_OTLP_METRICS_CLIENT_KEY,
OTEL_EXPORTER_OTLP_METRICS_COMPRESSION,
OTEL_EXPORTER_OTLP_METRICS_ENDPOINT,
OTEL_EXPORTER_OTLP_METRICS_HEADERS,
OTEL_EXPORTER_OTLP_METRICS_INSECURE,
OTEL_EXPORTER_OTLP_METRICS_TIMEOUT,
)
from opentelemetry.sdk.metrics._internal.aggregation import Aggregation
from opentelemetry.sdk.metrics.export import ( # noqa: F401
AggregationTemporality,
DataPointT,
Gauge,
Metric,
MetricExporter,
MetricExportResult,
MetricsData,
ResourceMetrics,
ScopeMetrics,
Sum,
)
from opentelemetry.sdk.metrics.export import ( # noqa: F401
ExponentialHistogram as ExponentialHistogramType,
)
from opentelemetry.sdk.metrics.export import ( # noqa: F401
Histogram as HistogramType,
)
from opentelemetry.semconv._incubating.attributes.otel_attributes import (
OtelComponentTypeValues,
)
_logger = getLogger(__name__)
[docs]
class OTLPMetricExporter(
MetricExporter,
OTLPExporterMixin[
MetricsData,
ExportMetricsServiceRequest,
MetricExportResult,
MetricsServiceStub,
],
OTLPMetricExporterMixin,
):
"""OTLP metric exporter
Args:
endpoint: Target URL to which the exporter is going to send metrics
max_export_batch_size: Maximum number of data points to export in a single request. This is to deal with
gRPC's 4MB message size limit. If not set there is no limit to the number of data points in a request.
If it is set and the number of data points exceeds the max, the request will be split.
"""
def __init__(
self,
endpoint: str | None = None,
insecure: bool | None = None,
credentials: ChannelCredentials | None = None,
headers: TypingSequence[tuple[str, str]]
| dict[str, str]
| str
| None = None,
timeout: float | None = None,
compression: Compression | None = None,
preferred_temporality: dict[type, AggregationTemporality]
| None = None,
preferred_aggregation: dict[type, Aggregation] | None = None,
max_export_batch_size: int | None = None,
channel_options: tuple[tuple[str, str]] | None = None,
retryable_error_codes: Iterable[StatusCode] | None = None,
*,
meter_provider: MeterProvider | None = None,
):
insecure_metrics = environ.get(OTEL_EXPORTER_OTLP_METRICS_INSECURE)
if insecure is None and insecure_metrics is not None:
insecure = insecure_metrics.lower() == "true"
if (
not insecure
and environ.get(OTEL_EXPORTER_OTLP_METRICS_CERTIFICATE) is not None
):
credentials = _get_credentials(
credentials,
_OTEL_PYTHON_EXPORTER_OTLP_GRPC_METRICS_CREDENTIAL_PROVIDER,
OTEL_EXPORTER_OTLP_METRICS_CERTIFICATE,
OTEL_EXPORTER_OTLP_METRICS_CLIENT_KEY,
OTEL_EXPORTER_OTLP_METRICS_CLIENT_CERTIFICATE,
)
environ_timeout = environ.get(OTEL_EXPORTER_OTLP_METRICS_TIMEOUT)
environ_timeout = (
float(environ_timeout) if environ_timeout is not None else None
)
compression = (
environ_to_compression(OTEL_EXPORTER_OTLP_METRICS_COMPRESSION)
if compression is None
else compression
)
self._common_configuration(
preferred_temporality, preferred_aggregation
)
OTLPExporterMixin.__init__(
self,
stub=MetricsServiceStub,
result=MetricExportResult,
endpoint=endpoint
or environ.get(OTEL_EXPORTER_OTLP_METRICS_ENDPOINT),
insecure=insecure,
credentials=credentials,
headers=headers or environ.get(OTEL_EXPORTER_OTLP_METRICS_HEADERS),
timeout=timeout or environ_timeout,
compression=compression,
channel_options=channel_options,
retryable_error_codes=retryable_error_codes,
component_type=OtelComponentTypeValues.OTLP_GRPC_METRIC_EXPORTER,
signal="metrics",
meter_provider=meter_provider,
)
self._max_export_batch_size: int | None = max_export_batch_size
def _translate_data( # type: ignore [reportIncompatibleMethodOverride]
self, data: MetricsData
) -> ExportMetricsServiceRequest:
return encode_metrics(data)
def _count_data(self, data: MetricsData):
num_items = 0
for resource_metrics in data.resource_metrics:
for scope_metrics in resource_metrics.scope_metrics:
for metric in scope_metrics.metrics:
num_items += len(metric.data.data_points)
return num_items
[docs]
def export(
self,
metrics_data: MetricsData,
timeout_millis: float = 10_000,
**kwargs,
) -> MetricExportResult:
# TODO(#2663): OTLPExporterMixin should pass timeout to gRPC
if self._max_export_batch_size is None:
return self._export(data=metrics_data)
export_result = MetricExportResult.SUCCESS
for split_metrics_data in self._split_metrics_data(metrics_data):
split_export_result = self._export(data=split_metrics_data)
if split_export_result is MetricExportResult.FAILURE:
export_result = MetricExportResult.FAILURE
return export_result
def _split_metrics_data(
self,
metrics_data: MetricsData,
) -> Iterable[MetricsData]:
assert self._max_export_batch_size is not None
batch_size: int = 0
split_resource_metrics: list[ResourceMetrics] = []
for resource_metrics in metrics_data.resource_metrics:
split_scope_metrics: list[ScopeMetrics] = []
split_resource_metrics.append(
replace(
resource_metrics,
scope_metrics=split_scope_metrics,
)
)
for scope_metrics in resource_metrics.scope_metrics:
split_metrics: list[Metric] = []
split_scope_metrics.append(
replace(
scope_metrics,
metrics=split_metrics,
)
)
for metric in scope_metrics.metrics:
split_data_points: list[DataPointT] = []
split_metrics.append(
replace(
metric,
data=replace(
metric.data,
data_points=split_data_points,
),
)
)
for data_point in metric.data.data_points:
split_data_points.append(data_point)
batch_size += 1
if batch_size >= self._max_export_batch_size:
yield MetricsData(
resource_metrics=split_resource_metrics
)
# Reset all the variables
batch_size = 0
split_data_points = []
split_metrics = [
replace(
metric,
data=replace(
metric.data,
data_points=split_data_points,
),
)
]
split_scope_metrics = [
replace(
scope_metrics,
metrics=split_metrics,
)
]
split_resource_metrics = [
replace(
resource_metrics,
scope_metrics=split_scope_metrics,
)
]
if not split_data_points:
# If data_points is empty remove the whole metric
split_metrics.pop()
if not split_metrics:
# If metrics is empty remove the whole scope_metrics
split_scope_metrics.pop()
if not split_scope_metrics:
# If scope_metrics is empty remove the whole resource_metrics
split_resource_metrics.pop()
if batch_size > 0:
yield MetricsData(resource_metrics=split_resource_metrics)
[docs]
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
OTLPExporterMixin.shutdown(self, timeout_millis=timeout_millis)
[docs]
def set_meter_provider(self, meter_provider: MeterProvider):
return self._set_meter_provider(meter_provider)
@property
def _exporting(self) -> str:
return "metrics"
[docs]
def force_flush(self, timeout_millis: float = 10_000) -> bool:
"""Nothing is buffered in this exporter, so this method does nothing."""
return True