Python语言如何接入代码Demo

以下分别展示API模式、账密认证模式下的Python语言demo示例:

一、API教程demo示例

# coding=utf-8
# !/usr/bin/env python
import json
import threading
import time
import requests as rq

# 设置请求头
headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:85.0) Gecko/20100101 Firefox/85.0",
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
    "Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
    "Accept-Encoding": "gzip, deflate, br"
}
# 测试链接
testUrl = 'https://ipinfo.ipidea.io'


# 核心业务
def testPost(host, port):
    # 配置获取到的ip,port
    proxies = {
        # host  api获取到的代理服务器地址
        # port  api获取到的端口
        'http': 'http://{}:{}'.format(host, port),
        'https': 'http://{}:{}'.format(host, port),
    }
    while True:
        try:
            # 配置代理后测试
            res = rq.get(testUrl, proxies=proxies, timeout=5)
            # print(res.status_code)
            # 打印请求结果
            print(res.status_code, "***", res.text)
            break
        except Exception as e:
            print(e)
            break
    return


class ThreadFactory(threading.Thread):
    def __init__(self, host, port):
        threading.Thread.__init__(self)
        self.host = host
        self.port = port

    def run(self):
        testPost(self.host, self.port)


# 提取代理的链接  json类型的返回值
tiqu = '提取链接'

while 1 == 1:
    # 每次提取10个,放入线程中
    resp = rq.get(url=tiqu, timeout=5)
    try:
        if resp.status_code == 200:
            dataBean = json.loads(resp.text)
        else:
            print("获取失败")
            time.sleep(1)
            continue
    except ValueError:
        print("获取失败")
        time.sleep(1)
        continue
    else:
        # 解析json数组,获取ip和port
        print("code=", dataBean["code"])
        code = dataBean["code"]
        if code == 0:
            threads = []
            for proxy in dataBean["data"]:
                threads.append(ThreadFactory(proxy["ip"], proxy["port"]))
            for t in threads:  # 开启线程
                t.start()
                time.sleep(0.01)
            for t in threads:  # 阻塞线程
                t.join()
    # break
    time.sleep(1)

     

二、账密认证教程demo示例

           
'''
导入thread,time,request包,
实现多线程控制,等待,http请求
'''
import _thread
import time
import requests

# 设置请求头
headers = {
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8",
    "User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 10_3_3 like Mac OS X) AppleWebKit/603.3.8 (KHTML, like Gecko) Mobile/14G60 MicroMessenger
    /6.5.19 NetType/4G Language/zh_TW",
}

# 测试地址
mainUrl = 'https://ipinfo.ipidea.io'

def testUrl():
    # 设置帐密代理
    proxy = {
        'http': 'http://认证账户名:认证账户密码@代理服务器地址:代理服务器端口',
        'https': 'http://认证账户名:认证账户密码@代理服务器地址:代理服务器端口',
    }
    try:
        res = requests.get(mainUrl, headers=headers, proxies=proxy, timeout=10)
        print(res.status_code, res.text)
    except Exception as e:
        print("访问失败", e)
        pass

# 开启10个线程进行测试
for i in range(0, 10):
    _thread.start_new_thread(testUrl, ())
    time.sleep(0.1)

time.sleep(10)