Fork me on GitHub

python模拟MQTTproducer

以下模拟100个客户端、订阅100个topic
需要1. locust,2.paho

以下是python代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
from locust import User, TaskSet, events, task, between
import paho.mqtt.client as mqtt
import time
import os
import random
import string

broker_address="127.0.0.1" #服务器ip地址,实际压测要根据mqtt的实际ip进行修改
REQUEST_TYPE = 'MQTT'
PUBLISH_TIMEOUT = 10000 #超时时间
TOPIC_NUM = 100 # 每个客户端订阅的topic数量
MESSAGE_NUM = 15 # 每个topic每5秒发送的消息数量
CLIENT_NUM = 100 # 模拟的客户端数量

def fire_locust_success(**kwargs):
events.request.fire(
request_type=kwargs.get('request_type', 'GET'),
name=kwargs.get('name', 'unknown'),
response_time=kwargs.get('response_time', 0),
response_length=kwargs.get('response_length', 0),
exception=None,
context=None,
request_meta={}
)

def time_delta(t1, t2):
return int((t2 - t1)*1000)

class PublishTask(TaskSet):
def on_start(self):
self.start_time = time.time()
username = "****"
password = "****"
self.client.username_pw_set(username, password)
self.client.connect(host=broker_address, port=61613, keepalive=60)
self.client.loop_start()
self.topics = [f"/yace/device{random_string(4)}" for _ in range(TOPIC_NUM)]
for topic in self.topics:
self.client.subscribe(topic)
print(f"Connected and subscribed to {TOPIC_NUM} topics.")

def on_stop(self):
self.client.loop_stop()
self.client.disconnect()
print("Disconnected.")

@task(1)
def task_pub(self):
for topic in self.topics:
for _ in range(MESSAGE_NUM):
payload = "Message - " + str(self.client._client_id)
infot = self.client.publish(topic, payload, qos=0)
infot.wait_for_publish()

end_time = time.time()
total_time = time_delta(self.start_time, end_time)
fire_locust_success(
request_type=REQUEST_TYPE,
name=str(self.client._client_id),
response_time=total_time,
response_length=len(payload) * MESSAGE_NUM * TOPIC_NUM
)
time.sleep(5)

wait_time = between(1, 2)

class MQTTLocust(User):
tasks = [PublishTask]
count = 0

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

MQTTLocust.count += 1
if MQTTLocust.count > CLIENT_NUM:
return None # 超过100个客户端后不再创建
client_name = "Device - " + str(MQTTLocust.count)
self.client = mqtt.Client(client_name)

def on_disconnect(self, client, userdata, rc):
if rc != 0:
print("Unexpected disconnection.")

def random_string(length):
digits = string.digits # 包含所有数字字符的字符串
return ''.join(random.choice(digits) for _ in range(length))
if __name__ == '__main__':
os.system("locust -f yace_4.py -u 100 -r 1 --headless --host=127.0.0.1") # Start 100 Locust users

-------------The End-------------