import json
import requests
# Zabbix服务器信息
zabbix_url = "http://your-zabbix-server/zabbix/api_jsonrpc.php"
username = "Admin"
password = "zabbix"
# 获取认证token
def get_auth_token():
headers = {"Content-Type": "application/json-rpc"}
data = {
"jsonrpc": "2.0",
"method": "user.login",
"params": {
"username": username,
"password": password
},
"id": 1,
"auth": None
}
response = requests.post(zabbix_url, headers=headers, data=json.dumps(data))
return response.json()["result"]
def create_hosts_batch(hosts_list, auth_token, group_ids, template_ids):
"""批量创建主机"""
headers = {"Content-Type": "application/json-rpc"}
# 分批处理,每批50个主机(避免请求过大)
batch_size = 50
created_hosts = []
for i in range(0, len(hosts_list), batch_size):
batch = hosts_list[i:i+batch_size]
host_data = []
for host in batch:
host_config = {
"host": host["hostname"],
"interfaces": [
{
"type": 1, # Agent接口
"main": 1,
"useip": 1,
"ip": host["ip"],
"dns": "",
"port": "10050"
}
],
"groups": [{"groupid": gid} for gid in group_ids],
"templates": [{"templateid": tid} for tid in template_ids],
"inventory_mode": 0,
"inventory": host.get("inventory", {}),
"tags": host.get("tags", []),
"macros": host.get("macros", [])
}
host_data.append(host_config)
data = {
"jsonrpc": "2.0",
"method": "host.create",
"params": host_data,
"auth": auth_token,
"id": i+1
}
try:
response = requests.post(zabbix_url, headers=headers, data=json.dumps(data, indent=2))
result = response.json()
if "result" in result:
created_hosts.extend(result["result"]["hostids"])
print(f"成功创建 {len(result['result']['hostids'])} 台主机")
else:
print(f"批量创建出错: {result.get('error', {})}")
except Exception as e:
print(f"请求失败: {e}")
return created_hosts
import csv
def import_hosts_from_csv(csv_file, auth_token):
"""从CSV文件导入主机"""
hosts = []
with open(csv_file, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
host = {
"hostname": row["hostname"],
"ip": row["ip"],
"groups": [{"groupid": row["groupid"]}] if "groupid" in row else [],
"templates": [{"templateid": row["templateid"]}] if "templateid" in row else [],
"inventory": {
"location": row.get("location", ""),
"site": row.get("site", ""),
"tag": row.get("tag", "")
}
}
hosts.append(host)
return hosts
#!/usr/bin/env python3
"""
Zabbix批量添加主机脚本
"""
import json
import requests
import csv
import time
from typing import List, Dict
class ZabbixBatchManager:
def __init__(self, url, username, password):
self.url = url
self.auth_token = self.login(username, password)
def login(self, username, password):
"""登录Zabbix获取认证token"""
data = {
"jsonrpc": "2.0",
"method": "user.login",
"params": {
"username": username,
"password": password
},
"id": 1,
"auth": None
}
response = requests.post(self.url, json=data, timeout=30)
result = response.json()
if "result" in result:
print("登录成功")
return result["result"]
else:
raise Exception(f"登录失败: {result.get('error', {})}")
def get_group_id(self, group_name):
"""获取主机组ID"""
data = {
"jsonrpc": "2.0",
"method": "hostgroup.get",
"params": {
"output": ["groupid"],
"filter": {"name": group_name}
},
"auth": self.auth_token,
"id": 2
}
response = requests.post(self.url, json=data)
result = response.json()
if result.get("result"):
return result["result"][0]["groupid"]
return None
def get_template_id(self, template_name):
"""获取模板ID"""
data = {
"jsonrpc": "2.0",
"method": "template.get",
"params": {
"output": ["templateid"],
"filter": {"host": template_name}
},
"auth": self.auth_token,
"id": 3
}
response = requests.post(self.url, json=data)
result = response.json()
if result.get("result"):
return result["result"][0]["templateid"]
return None
def add_hosts_batch(self, hosts_data: List[Dict], batch_size=100):
"""批量添加主机"""
added_hosts = []
failed_hosts = []
for i in range(0, len(hosts_data), batch_size):
batch = hosts_data[i:i+batch_size]
print(f"正在处理第 {i//batch_size + 1} 批,共 {len(batch)} 台主机")
params = []
for host in batch:
host_config = {
"host": host["hostname"],
"interfaces": [
{
"type": 1,
"main": 1,
"useip": 1,
"ip": host["ip"],
"dns": "",
"port": host.get("port", "10050")
}
],
"groups": [{"groupid": gid} for gid in host.get("group_ids", [])],
"templates": [{"templateid": tid} for tid in host.get("template_ids", [])],
"macros": host.get("macros", []),
"tags": host.get("tags", []),
"description": host.get("description", "")
}
# 添加SNMP接口(可选)
if host.get("snmp_ip"):
host_config["interfaces"].append({
"type": 2, # SNMP接口
"main": 0,
"useip": 1,
"ip": host["snmp_ip"],
"dns": "",
"port": host.get("snmp_port", "161"),
"details": {
"version": host.get("snmp_version", 2),
"community": host.get("snmp_community", "{$SNMP_COMMUNITY}")
}
})
params.append(host_config)
data = {
"jsonrpc": "2.0",
"method": "host.create",
"params": params,
"auth": self.auth_token,
"id": i+100
}
try:
response = requests.post(self.url, json=data, timeout=60)
result = response.json()
if "result" in result:
added_hosts.extend(result["result"]["hostids"])
print(f"✓ 成功添加 {len(result['result']['hostids'])} 台主机")
else:
print(f"✗ 添加失败: {result.get('error', {})}")
failed_hosts.extend(batch)
# 避免请求过快
time.sleep(1)
except Exception as e:
print(f"请求异常: {e}")
failed_hosts.extend(batch)
return added_hosts, failed_hosts
def generate_hosts_from_network(self, network_prefix, start_ip, end_ip,
hostname_prefix, group_ids, template_ids):
"""根据IP段生成主机配置"""
hosts = []
# 将IP地址转换为数字(简化处理)
def ip_to_num(ip):
parts = list(map(int, ip.split('.')))
return parts[0] * 256**3 + parts[1] * 256**2 + parts[2] * 256 + parts[3]
def num_to_ip(num):
return f"{num >> 24 & 255}.{num >> 16 & 255}.{num >> 8 & 255}.{num & 255}"
start_num = ip_to_num(f"{network_prefix}.{start_ip}")
end_num = ip_to_num(f"{network_prefix}.{end_ip}")
for i in range(start_num, end_num + 1):
ip = num_to_ip(i)
hostname = f"{hostname_prefix}{ip.replace('.', '-')}"
hosts.append({
"hostname": hostname,
"ip": ip,
"group_ids": group_ids,
"template_ids": template_ids,
"description": f"Auto-added host - {ip}"
})
return hosts
def main():
# 配置信息
ZABBIX_URL = "http://your-zabbix-server/zabbix/api_jsonrpc.php"
USERNAME = "Admin"
PASSWORD = "zabbix"
# 初始化
zbx = ZabbixBatchManager(ZABBIX_URL, USERNAME, PASSWORD)
# 方法1:从CSV文件导入
# csv_file = "hosts.csv"
# hosts_data = []
# with open(csv_file, 'r') as f:
# reader = csv.DictReader(f)
# for row in reader:
# hosts_data.append(row)
# 方法2:自动生成IP段主机
group_id = zbx.get_group_id("Linux servers")
template_id = zbx.get_template_id("Template OS Linux")
# 生成192.168.1.1 - 192.168.1.100范围内的主机
hosts_data = zbx.generate_hosts_from_network(
network_prefix="192.168.1",
start_ip="1",
end_ip="100",
hostname_prefix="server-",
group_ids=[group_id] if group_id else [],
template_ids=[template_id] if template_id else []
)
# 批量添加主机
added, failed = zbx.add_hosts_batch(hosts_data, batch_size=50)
print(f"\n批量添加完成!")
print(f"成功添加: {len(added)} 台")
print(f"失败: {len(failed)} 台")
if failed:
print("\n失败的主机列表:")
for host in failed:
print(f" - {host.get('hostname')} ({host.get('ip')})")
if __name__ == "__main__":
main()
hostname,ip,groupid,templateid,location,site
server-01,192.168.1.101,2,10001,Beijing,DC1
server-02,192.168.1.102,2,10001,Shanghai,DC2
server-03,192.168.1.103,3,10002,Guangzhou,DC3
# 启用连接池
session = requests.Session()
# 设置超时时间
requests.post(url, timeout=(30, 60))
# 分批处理,控制并发
batch_size = 50 # 根据服务器性能调整
# 添加延时避免请求过载
time.sleep(0.5)
def safe_add_host(host_config, max_retries=3):
"""带重试机制的主机添加"""
for attempt in range(max_retries):
try:
response = requests.post(zabbix_url, json=host_config, timeout=30)
if response.status_code == 200:
return response.json()
except requests.exceptions.RequestException as e:
print(f"尝试 {attempt+1} 失败: {e}")
time.sleep(2 ** attempt) # 指数退避
return None
def check_host_existence(hostnames):
"""检查主机是否已存在"""
data = {
"jsonrpc": "2.0",
"method": "host.get",
"params": {
"output": ["host"],
"filter": {"host": hostnames}
},
"auth": auth_token,
"id": 100
}
response = requests.post(zabbix_url, json=data)
existing_hosts = [h["host"] for h in response.json().get("result", [])]
return existing_hosts
host_config = {
"host": "server-01",
"interfaces": [...],
"macros": [
{"macro": "{$SNMP_COMMUNITY}", "value": "public"},
{"macro": "{$CPU_THRESHOLD}", "value": "90"},
{"macro": "{$MEMORY_THRESHOLD}", "value": "85"}
]
}
def update_hosts_bulk(updates):
"""批量更新主机"""
data = {
"jsonrpc": "2.0",
"method": "host.update",
"params": updates,
"auth": auth_token,
"id": 1
}
response = requests.post(zabbix_url, json=data)
return response.json()
这个方案可以高效处理数百台主机的批量添加,建议先在测试环境验证后再在生产环境使用。