$ pyhton strees.py 10

def send_request():  
  url = "http://127.0.0.1:11434/api/generate"  
    data = {       
     "model": "deepseek-r1:32b",   
         "prompt": "解释一下量子计算",     
            "stream": False   
         }  
      response = requests.post(url, json=data, timeout=None)


import requests
import threading
import sys
import time
def send_request():   
 url = "http://127.0.1.1:11434/api/generate"  
   data = {      
     "model": "deepseek-r1:32b",  
     "prompt": "解释一下量子计算",        
     "stream": False    
     }    
     while True:        
     try:            
     response = requests.post(url, json=data, timeout=None)            
     print(f"Status Code: {response.status_code}, Time: {response.elapsed.total_seconds()}s")        
     except Exception as e:           
      print(f"Request failed: {e}")
try:    
concurrent_workers = int(sys.argv[1])
    # 启动工作线程    
    for _ in range(concurrent_workers):       
     t = threading.Thread(target=send_request)       
     t.daemon = True        
     t.start()
    # 添加线程数监控    
    while True:        
    # 计算实际工作线程数(总线程数 - 主线程)        
    current_workers = threading.active_count() - 1        
    print(f"\n[监控] 目标线程数: {concurrent_workers} | 实际运行线程数: {current_workers}")        
    time.sleep(3)  # 每3秒输出一次监控信息
except KeyboardInterrupt:    
     print("\n[!] 已终止持续并发请求")本文链接:https://www.kinber.cn/post/4983.html 转载需授权!
推荐本站淘宝优惠价购买喜欢的宝贝:

 支付宝微信扫一扫,打赏作者吧~
支付宝微信扫一扫,打赏作者吧~

 
        