Source code for opentelemetry.exporter.otlp.proto.grpc._log_exporter

# Copyright The OpenTelemetry Authors
# SPDX-License-Identifier: Apache-2.0

from collections.abc import Iterable, Sequence
from collections.abc import Sequence as TypingSequence
from os import environ
from typing import Literal

from grpc import ChannelCredentials, Compression, StatusCode
from opentelemetry.exporter.otlp.proto.common._log_encoder import encode_logs
from opentelemetry.exporter.otlp.proto.grpc.exporter import (
    OTLPExporterMixin,
    _get_credentials,
    environ_to_compression,
)
from opentelemetry.metrics import MeterProvider
from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import (
    ExportLogsServiceRequest,
)
from opentelemetry.proto.collector.logs.v1.logs_service_pb2_grpc import (
    LogsServiceStub,
)
from opentelemetry.sdk._logs import ReadableLogRecord
from opentelemetry.sdk._logs.export import (
    LogRecordExporter,
    LogRecordExportResult,
)
from opentelemetry.sdk.environment_variables import (
    _OTEL_PYTHON_EXPORTER_OTLP_GRPC_LOGS_CREDENTIAL_PROVIDER,
    OTEL_EXPORTER_OTLP_LOGS_CERTIFICATE,
    OTEL_EXPORTER_OTLP_LOGS_CLIENT_CERTIFICATE,
    OTEL_EXPORTER_OTLP_LOGS_CLIENT_KEY,
    OTEL_EXPORTER_OTLP_LOGS_COMPRESSION,
    OTEL_EXPORTER_OTLP_LOGS_ENDPOINT,
    OTEL_EXPORTER_OTLP_LOGS_HEADERS,
    OTEL_EXPORTER_OTLP_LOGS_INSECURE,
    OTEL_EXPORTER_OTLP_LOGS_TIMEOUT,
)
from opentelemetry.semconv._incubating.attributes.otel_attributes import (
    OtelComponentTypeValues,
)


[docs] class OTLPLogExporter( LogRecordExporter, OTLPExporterMixin[ Sequence[ReadableLogRecord], ExportLogsServiceRequest, LogRecordExportResult, LogsServiceStub, ], ): def __init__( self, endpoint: str | None = None, insecure: bool | None = None, credentials: ChannelCredentials | None = None, headers: TypingSequence[tuple[str, str]] | dict[str, str] | str | None = None, timeout: float | None = None, compression: Compression | None = None, channel_options: tuple[tuple[str, str]] | None = None, retryable_error_codes: Iterable[StatusCode] | None = None, *, meter_provider: MeterProvider | None = None, ): insecure_logs = environ.get(OTEL_EXPORTER_OTLP_LOGS_INSECURE) if insecure is None and insecure_logs is not None: insecure = insecure_logs.lower() == "true" if ( not insecure and environ.get(OTEL_EXPORTER_OTLP_LOGS_CERTIFICATE) is not None ): credentials = _get_credentials( credentials, _OTEL_PYTHON_EXPORTER_OTLP_GRPC_LOGS_CREDENTIAL_PROVIDER, OTEL_EXPORTER_OTLP_LOGS_CERTIFICATE, OTEL_EXPORTER_OTLP_LOGS_CLIENT_KEY, OTEL_EXPORTER_OTLP_LOGS_CLIENT_CERTIFICATE, ) environ_timeout = environ.get(OTEL_EXPORTER_OTLP_LOGS_TIMEOUT) environ_timeout = ( float(environ_timeout) if environ_timeout is not None else None ) compression = ( environ_to_compression(OTEL_EXPORTER_OTLP_LOGS_COMPRESSION) if compression is None else compression ) OTLPExporterMixin.__init__( self, endpoint=endpoint or environ.get(OTEL_EXPORTER_OTLP_LOGS_ENDPOINT), insecure=insecure, credentials=credentials, headers=headers or environ.get(OTEL_EXPORTER_OTLP_LOGS_HEADERS), timeout=timeout or environ_timeout, compression=compression, stub=LogsServiceStub, result=LogRecordExportResult, channel_options=channel_options, retryable_error_codes=retryable_error_codes, component_type=OtelComponentTypeValues.OTLP_GRPC_LOG_EXPORTER, signal="logs", meter_provider=meter_provider, ) def _translate_data( self, data: Sequence[ReadableLogRecord] ) -> ExportLogsServiceRequest: return encode_logs(data) def _count_data(self, data: Sequence[ReadableLogRecord]): return len(data)
[docs] def export( # type: ignore [reportIncompatibleMethodOverride] self, batch: Sequence[ReadableLogRecord], ) -> Literal[LogRecordExportResult.SUCCESS, LogRecordExportResult.FAILURE]: return OTLPExporterMixin._export(self, batch)
[docs] def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: OTLPExporterMixin.shutdown(self, timeout_millis=timeout_millis)
[docs] def force_flush(self, timeout_millis: float = 10_000) -> bool: """Nothing is buffered in this exporter, so this method does nothing.""" return True
@property def _exporting(self) -> str: return "logs"