Python语言接入
概述
本文介绍Python语言的应用接入流程,点击“应用性能监控->应用列表->新建应用”进入到接入应用页面。

接入流程
点击Python标签页,按照流程接入后即可在“应用性能监控->应用列表”显示接入的应用。

OpenTelemetry
步骤 1:获取接入点和鉴权
注:该信息因地域和用户而异,可在控制台“接入应用”页面获取,下方为示例
- 接入点:
http://apm-collector.bj.baidubce.com - Authentication:
UFSpMM3VFr3q4lnFrVBqtPDK
步骤 2:安装所需的依赖包
1pip install opentelemetry-instrumentation-redis
2pip install opentelemetry-instrumentation-mysql
3pip install opentelemetry-distro opentelemetry-exporter-otlp
4pip install opentelemetry-instrumentation-system-metrics
5opentelemetry-bootstrap -a install
步骤 3:修改上报参数
OpenTelemetry Python 运行时指标默认 不包含 GC 耗时与 GC 次数,而这类指标对于分析应用内存压力与性能瓶颈非常重要。 因此,推荐在接入应用监控时,参考采集python GC指标文档,额外开启 GC 指标采集。
创建app.py文件并填写以下示例代码
1from flask import Flask
2import requests
3import time
4
5backend_addr = 'https://example.com/'
6
7app = Flask(__name__)
8
9# 访问外部站点
10@app.route('/')
11def index():
12 start = time.time()
13 r = requests.get(backend_addr)
14 return r.text
15
16# 访问数据库
17@app.route('/mysql')
18def func_rdb():
19 return "rdb res"
20
21# 访问Redis
22@app.route("/redis")
23def func_kvop():
24 return "kv res"
25
26# 开启GC指标采集
27# utils.collect_gc_metrics()
28app.run(host='0.0.0.0', port=8080)
按照如下方式启动 python 应用:
1opentelemetry-instrument \
2--traces_exporter otlp \
3--metrics_exporter otlp \
4--service_name <serviceName> \
5--exporter_otlp_endpoint <endpoint> \
6--exporter_otlp_protocol http/protobuf \
7--exporter_otlp_headers Authentication=<Authentication> \
8--resource_attributes host.name=<hostName> \
9python3 app.py
对应的字段说明如下:
| 参数 | 描述 | 示例 |
|---|---|---|
| <serviceName> | 应用名,多个使用相同应用名接入的进程,在 APM 中会表现为相同应用下的多个实例。对于 Spring Cloud 或 Dubbo 应用,应用名通常和服务名保持一致 | csm |
| <Authentication> | 步骤 1 中拿到业务系统 Authentication | UFSpMM***lnFrVBqtPDK |
| <endpoint> | 步骤 1 中拿到的接入点 | http://apm-collector.bj.baidubce.com |
| <hostName> | 该实例的主机名,是应用实例的唯一标识,通常情况下可以设置为应用实例的 IP 地址。 | my-instance-host |
步骤4:接入验证
启动 Python 应用后,通过 8080 端口访问对应的接口,例如 curl http://localhost:8081/mysql -H "Content-Type: application/json" 。在有正常流量的情况下,<应用性能监控>→<应用列表> 中将展示接入的应用。由于数据的处理存在一定延时,如果接入后在控制台没有查询到应用,请等待 30 秒左右。
Jaeger
步骤1: 获取接入点和Token
注:该信息因地域和用户而异,可在控制台“接入应用”页面获取,下方为示例
- 接入点:
http://apm-collector.bj.baidubce.com - Authentication:
UFSpMM3VFr3q4lnFrVBqtPDK
步骤2: 设置依赖库
- 配置 Jaeger Client 环境:以 Python v3.12 为例,需要安装以下 Python Package:
1flask==3.0.3
2opentracing==2.4.0
3jaeger-client==4.6.0
4blinker==1.8.2
- 编写 Python Demo 应用程序:创建一个 Python 文件 main.py,将下面的代码拷贝到文件中
1import logging
2import time
3import requests
4
5from jaeger_client import Config
6import opentracing
7
8
9# ================== 自定义 HTTP 上报 ==================
10class HTTPReporter:
11 def __init__(self, endpoint):
12 self.endpoint = endpoint
13
14 def report_span(self, span):
15 try:
16 requests.post(
17 self.endpoint,
18 data=str(span), # demo 用
19 headers={
20 "Authorization": "Bearer xxxxx", # 替换 token
21 "Content-Type": "application/json",
22 },
23 timeout=2,
24 )
25 except Exception as e:
26 print("report error:", e)
27
28 def close(self):
29 pass
30
31
32# ================== tracer 初始化 ==================
33def init_tracer():
34 config = Config(
35 config={
36 'sampler': {'type': 'const', 'param': 1},
37 'logging': True,
38 },
39 service_name="python-demo-service",
40 validate=True
41 )
42
43 tracer = config.new_tracer()
44
45 # 替换 reporter(关键)
46 tracer._reporter = HTTPReporter(
47 "http://apm-collector.bj.baidubce.com/api/traces"
48 )
49
50 opentracing.set_global_tracer(tracer)
51
52 return tracer
53
54def construct_span(tracer):
55 with tracer.start_span('TestSpan') as span:
56 span.log_kv({'event': 'test message', 'life': 42})
57 print("tracer.tags:", tracer.tags)
58
59 with tracer.start_span('TestChildSpan', child_of=span):
60 span.log_kv({'event': 'down below'})
61
62 return span
63
64
65# ================== main ==================
66if __name__ == "__main__":
67 logging.basicConfig(format='%(asctime)s %(message)s', level=logging.DEBUG)
68
69 tracer = init_tracer()
70
71 construct_span(tracer)
72
73 print("waiting flush...")
74 time.sleep(2)
75
76 tracer.close()
- 启动 Python 应用程序以生成并上报 Trace 数据
1python main.py
根据需求配置其他可选参数,参数详情参见 Skywalking python 官方文档
步骤3: 重启验证
配置好后直接重启Python服务,即可开始向APM上传链路数据,在有正常流量的情况下,应用性能监控 > 应用列表 中将展示接入的应用。由于数据的处理存在一定延时,如果接入后在控制台没有查询到应用,请等待 30 秒左右。
Skywalking
Python 使用 SkyWalking 可以实现自动埋点上报,需 Python 3.7 及以上版本。
当前支持自动埋点的组件详情请见 官方插件支持说明
步骤 1:获取接入点和鉴权
注:该信息因地域和用户而异,可在控制台“接入应用”页面获取,下方为示例
- 接入点:
http://apm-collector.bj.baidubce.com - Authentication:
UFSpMM3VFr3q4lnFrVBqtPDK
步骤 2:安装 Skywalking Agent 相关依赖
- 打开 Skywalking 下载页面,选择 Python Agent(建议下载最新版本)。
- 在 Python 项目中引入 Skywalking-python Agent。
步骤 3:修改代码
用 SkyWalking 为 Python 应用自动埋点,可以通过以下两种方式配置参数:
- 方式 1:代码中配置参数
1from skywalking import config
2config.init(ConfigurationName=ConfigurationValue)
- 方式2 将参数配置到环境变量:
1export SW_AGENT_ConfigurationName=ConfigurationValue
配置示例
-
引入 Skywalking:
Python1from skywalking import config, agent -
配置接入点和 Token:
Python1config.init( 2 agent_collector_backend_services='endpoint', # 填写步骤 1 中获取的接入点 3 agent_authentication='token', # 填写步骤 1 中获取的 token 4) -
配置应用名作为应用标识:
Python1config.init( 2 agent_name='service_name', 3) -
选择数据上报协议(目前只支持 GRPC 上报):
Python1config.init( 2 agent_protocol='grpc', 3) - 其他可选参数: 根据需求配置其他参数,参数详情参见 Skywalking Python 官方文档。
步骤 4:重启验证
配置好后直接重启 Python 服务。开始向 APM 上传链路数据。在有正常流量的情况下,应用性能监控 > 应用列表 中将展示接入的应用。由于数据处理存在一定延时,如果接入后在控制台没有查询到应用,请等待 30 秒左右。
Demo 示例
以下是一个基于 Flask 的 Demo 示例:
1import argparse
2from flask import Flask, request, jsonify
3from skywalking import config, agent
4
5# 初始化 SkyWalking 配置
6config.init(
7 agent_name='demo_service',
8 agent_collector_backend_services='apm-collector.bj.baidubce.com:11800',
9 agent_protocol='grpc',
10 agent_authentication='token',
11 agent_meter_reporter_active=False,
12 agent_log_reporter_active=False
13)
14
15agent.start()
16
17app = Flask(__name__)
18
19# 模拟的数据库(内存字典)
20mock_db = {}
21next_id = [1] # 用列表包装,使其在函数中可变
22
23# 用户模型模拟
24class User:
25 def __init__(self, username, password):
26 self.id = next_id[0]
27 next_id[0] += 1
28 self.username = username
29 self.password = password
30
31 def to_dict(self):
32 return {'id': self.id, 'username': self.username, 'password': self.password}
33
34# API 逻辑
35@app.route('/user', methods=['GET'])
36def get_all_users():
37 return jsonify([user.to_dict() for user in mock_db.values()])
38
39@app.route('/user', methods=['POST'])
40def add_user():
41 data = request.get_json()
42 username = data.get('username')
43 password = data.get('password')
44 if username and password:
45 user = User(username, password)
46 mock_db[user.id] = user
47 return jsonify(user.to_dict()), 201
48 else:
49 return jsonify(error='Bad Request'), 400
50
51@app.route('/user/<int:user_id>', methods=['PUT'])
52def update_user(user_id):
53 user = mock_db.get(user_id)
54 if not user:
55 return jsonify(error='User not found'), 404
56 data = request.get_json()
57 username = data.get('username')
58 password = data.get('password')
59 if username:
60 user.username = username
61 if password:
62 user.password = password
63 return jsonify(user.to_dict())
64
65@app.route('/user/<int:user_id>', methods=['DELETE'])
66def delete_user(user_id):
67 if user_id in mock_db:
68 del mock_db[user_id]
69 return '', 204
70 else:
71 return jsonify(error='User not found'), 404
72
73@app.route('/test')
74def test():
75 return 'Hello, World! SkyWalking is tracing this.'
76
77if __name__ == '__main__':
78 parser = argparse.ArgumentParser()
79 parser.add_argument('--host', type=str, default='0.0.0.0')
80 parser.add_argument('--port', type=int, default=5000)
81 args = parser.parse_args()
82 app.run(debug=False, host=args.host, port=args.port)
评价此篇文章
