# coding=utf-8
# !/usr/bin/env python
import json
import threading
import time
import requests as rq
# 设置请求头
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:85.0) Gecko/20100101 Firefox/85.0",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
"Accept-Encoding": "gzip, deflate, br"
}
# 测试链接
testUrl = 'https://ipinfo.ipidea.io'
# 核心业务
def testPost(host, port):
# 配置获取到的ip,port
proxies = {
# host api获取到的代理服务器地址
# port api获取到的端口
'http': 'http://{}:{}'.format(host, port),
'https': 'http://{}:{}'.format(host, port),
}
while True:
try:
# 配置代理后测试
res = rq.get(testUrl, proxies=proxies, timeout=5)
# print(res.status_code)
# 打印请求结果
print(res.status_code, "***", res.text)
break
except Exception as e:
print(e)
break
return
class ThreadFactory(threading.Thread):
def __init__(self, host, port):
threading.Thread.__init__(self)
self.host = host
self.port = port
def run(self):
testPost(self.host, self.port)
# 提取代理的链接 json类型的返回值
tiqu = '提取链接'
while 1 == 1:
# 每次提取10个,放入线程中
resp = rq.get(url=tiqu, timeout=5)
try:
if resp.status_code == 200:
dataBean = json.loads(resp.text)
else:
print("获取失败")
time.sleep(1)
continue
except ValueError:
print("获取失败")
time.sleep(1)
continue
else:
# 解析json数组,获取ip和port
print("code=", dataBean["code"])
code = dataBean["code"]
if code == 0:
threads = []
for proxy in dataBean["data"]:
threads.append(ThreadFactory(proxy["ip"], proxy["port"]))
for t in threads: # 开启线程
t.start()
time.sleep(0.01)
for t in threads: # 阻塞线程
t.join()
# break
time.sleep(1)
'''
导入thread,time,request包,
实现多线程控制,等待,http请求
'''
import _thread
import time
import requests
# 设置请求头
headers = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8",
"User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 10_3_3 like Mac OS X) AppleWebKit/603.3.8 (KHTML, like Gecko) Mobile/14G60 MicroMessenger
/6.5.19 NetType/4G Language/zh_TW",
}
# 测试地址
mainUrl = 'https://ipinfo.ipidea.io'
def testUrl():
# 设置帐密代理
proxy = {
'http': 'http://认证账户名:认证账户密码@代理服务器地址:代理服务器端口',
'https': 'http://认证账户名:认证账户密码@代理服务器地址:代理服务器端口',
}
try:
res = requests.get(mainUrl, headers=headers, proxies=proxy, timeout=10)
print(res.status_code, res.text)
except Exception as e:
print("访问失败", e)
pass
# 开启10个线程进行测试
for i in range(0, 10):
_thread.start_new_thread(testUrl, ())
time.sleep(0.1)
time.sleep(10)