欢迎来真孝善网,为您提供真孝善正能量书籍故事!

网络爬虫代理应用及aiohttp代理实践探索

时间:11-14 名人轶事 提交错误

大家好,今天给各位分享网络爬虫代理应用及aiohttp代理实践探索的一些知识,其中也会对进行解释,文章篇幅可能偏长,如果能碰巧解决你现在面临的问题,别忘了关注本站,现在就马上开始吧!

代理IP方案简述

在快速爬取网站时,我们必须应对每分钟IP访问次数有限甚至屏蔽IP的服务器。使用代理IP 可以帮助我们。 Kaito的爬虫代理服务解释清楚,推荐。 Github上也有很多现成的开源代理爬虫,例如:

qiyeboy/IPProxyPooljhao104/proxy_poolawolfly9/IPProxyToolfancoo/Proxyderekhe/mobike-crawler/modules/ProxyProvider.py思路也很清晰,从更多代理网站爬取免费代理,保存输入你的拥有自己的数据库并定期更新。获取代理IP后,验证代理的有效性。并提供了简单的API来获取代理IP。七爷的博客python开源IP代理池-IProxys详细解释了其代码。大多数代理网站的抓取都比较简单。上述开源代码包含了很多代理网站的爬取和解析。难点在于js反爬机制,通过使用selenium操作headless webkit或者js代码解析以及python的js代码执行库也已经解决了。另外,有趣的是,上面的开源代码中,是利用爬取得到的代理来访问代理网站的。要定期刷新代理,有自定义代理定时刷新模块,也可以使用celery定时任务。验证有效性的方式包括:直接访问百度。访问http://icanhazip.com/并获取返回的IP。访问http://httpbin.org/get?show_env=1 获取有关访问标头的详细信息并确定代理的匿名性。访问其他目标网站,如豆瓣等,可以用BaseHTTPServer扩展API,也可以使用简单的flask或Django加插件来提供restful api服务。对于免费代理IP,最重要的一个是量大。即使其中很大一部分无效,您仍然可以获得一些高质量的代理。一个网站可以有数千个未经过滤的代理,多个网站的数量可能很大,当然必须考虑到重复。然后就是代理IP筛选机制。很多开源库都添加了评分机制,这一点非常重要。例如,可以通过对累计超时次数和成功率进行加权来判断代理IP的质量。每次使用后评估代理IP刷新数据库,方便下次选择优质代理IP。如何拨打对代理IP进行评价,在成功和失败的各种情况中如何奖惩,筛选出最优质的代理IP非常重要。另外,每个代理IP的使用还应该考虑是否设置一定的使用间隔,避免因使用过于频繁而导致失败。

尝试

当然,首先要做的就是从免费代理网站获取大量的代理IP。我选择了最方便的66ip。界面非常简单。一次访问您可以获得大约3,000 个代理。当然,频繁的访问会造成js的反爬机制,那就干脆使用selenium+phantomJs吧。 url=("http://m.66ip.cn/mo.php?tqsl={proxy_number}")

url=url.format(proxy_number=10000)

html=requests.get(url, headers=headers).content

html=html.decode(chardet.detect(html)["编码"])

模式=r"d{1,3}.d{1,3}.d{1,3}.d{1,3}:d{1,5}"

all_ip=re.findall(pattern, html) 然后设置代理IP的奖惩机制。我参考了摩拜爬虫源码以及解析使用类。看起来简单明了。每个代理IP 对象都有自己的IP 和分数,@ 属性使它们可以通过点运算符进行访问。初始分数为100分,这里的分数均四舍五入为整数。代理成功,根据延迟大小,即调用代理对应的方式,给予奖励,最高10分,超时扣10分,连接错误扣30分,其他错误扣50分(可酌情修改)。为了便于代理IP 之间的比较,修改了__lt__ 方法。类Proxy:

def __init__(自身, ip):

self._url="http://" + ip

self._score=100

@财产

def url(自身):

返回self._url

@财产

def 分数(自己):

返回self._score

def __lt__(自己,其他):

"""

由于优先级队列返回最小,因此这里得分高的代理是优秀的

所以比较时情况正好相反

"""

返回self._score other._score

def 成功(自身,时间):

self._score +=int(10/int(时间+ 1))

def 超时错误(自身):

self._score -=10

def connectError(self):

self._score -=30

def otherError(self):

self._score -=50 看来最好的验证方式就是直接访问目标网站。除了根本无法使用的部分外,代理IP对于不同的目标网站的有效性是不同的。在我的尝试中,豆瓣对代理IP的响应明显好于摩拜。这里的代码是访问摩拜单车。第一轮筛选时,每个代理IP都会访问目标网站两次,超时时间为10秒,视情况给予奖励和惩罚。分数大于50 的将被保留。总共花了大约22秒的时间排除了少量根本无法使用的代理,并且还初步更新了所有代理的分数。 async def douban(代理, 会话):

# 使用代理访问目标网站,根据情况对代理进行奖励和惩罚

尝试:

开始=时间.time()

与session.post 异步(mobike_url,

数据=数据,

代理=代理.url,

headers=headers, # 可以引用外部头

超时=10)作为resp:

结束=时间.time()

# print(resp.status)

如果resp.status==200:

proxy.success(结束-开始)

print("%6.3d" % proxy._score, "已用时间--", end - start, "s")

否则:

proxy.otherError()

print("*****", resp.status, "*****")

除了TimeoutError 为te:

print("%6.3d" % proxy._score, "timeoutError")

proxy.timeoutError()

除了ClientConnectionError 为ce:

print("%6.3d" % proxy._score, "connectError")

proxy.connectError()

除了异常e:

print("%6.3d" % proxy._score, "otherError-", e)

proxy.otherError()

#ClientHttpProxyError

#TCPConnector 维护一个连接池并限制并行连接总数。当池满时,请求将退出并添加新请求。 500和100没有太大区别。

# ClientSession调用TCPConnector构造连接,Session可以共享

# 信号量限制同时请求构建连接的数量。当有足够的Semaphore时,总时间与超时时间大致相同。

async def initDouban():

conn=aiohttp.TCPConnector(verify_ssl=False,

limit=100, # windows下连接池不能太大

use_dns_cache=真)

任务=[]

与aiohttp.ClientSession(loop=loop,connector=conn) 异步作为session:

对于proxy: 中的p

任务=asyncio.ensure_future(豆瓣(p, 会话))

任务.append(任务)

响应=asyncio.gather(*任务)

等待回复

conn.close()

deffirstFilter():

对于范围(2): 内的i

s=时间.time()

未来=asyncio.ensure_future(initDouban())

循环.run_until_complete(未来)

e=时间.time()

print("----- 初始化时间%s-----n" % i, e - s, "s")

数量=0

pq=优先队列()

对于proxy: 中的代理

如果代理._score 50:

pq.put_nowait(代理)

数字+=1

print("原IP号:%s" % len(all_ip), ";过滤后:%s" % num)

return pq 接下来就是正式访问了。这里我使用基于堆的asyncio优先级队列(非线程安全)。通过asyncio.Semaphore限制并发请求连接数,不断从队列中取出最好的代理IP,访问完成后放回队列中。结果是多个连接不会同时使用代理IP,如果代理成功,会很快被放回队列中并再次使用。 (如果需要设置成功代理的使用间隔,可以改为访问成功后释放连接和信号量,然后使用asyncio.sleep(x)等待一段时间再放入优先级如果在genDouban函数中实现,可以设置一个范围(并发量)大于Semaphore(并发量)到一定程度)奖励和惩罚就会继续。一开始会有一个筛选过程。稳定输出如下:

pq=第一个过滤器()

异步def genDouban(sem, session):

# 带信号量的Getter 函数。

而True:

与sem: 异步

代理=等待pq.get()

等待豆瓣(代理,会话)

等待pq.put(代理)

async defdynamicRunDouban(concurrency):

"""

TCPConnector维护一个连接池并限制并行连接的总数。当池已满时,请求退出并添加新请求。

ClientSession调用TCPConnector构造连接,Session可以共享

信号量限制同时请求构建连接的数量。当有足够的Semaphore时,总时间与超时类似。

"""

conn=aiohttp.TCPConnector(verify_ssl=False,

限制=并发数,

use_dns_cache=真)

任务=[]

sem=asyncio.Semaphore(并发)

与aiohttp.ClientSession(loop=loop,connector=conn) 异步作为session:

尝试:

对于我在范围(并发):

任务=asyncio.ensure_future(genDouban(sem, 会话))

任务.append(任务)

响应=asyncio.gather(*任务)

等待回复

除了键盘中断:

print("-----完成-----n")

对于tasks:中的任务

任务.取消()

如果没有conn.close:

conn.close()

未来=asyncio.ensure_future(dynamicRunDouban(200))

Loop.run_until_complete(future) 最后,我们中断程序并检查代理IP的分数:scores=[p.score for p in proxies]

分数.sort(反向=True)

print("最受欢迎的IPs:n ------------n", 分数[:50],

[i 代表i 的分数,如果i 100])

loop.is_close()

其它方案概览:

Scrapy 官方文档提到Tor - 洋葱路由:使用轮换IP 池。例如,freeTor 投影仪付费服务如ProxyMesh。一个开源替代方案是scrapoxy,这是一个超级代理,您可以将自己的代理附加到其上。

如何在知乎中使用python爬虫IP池?答案中提到了Squid以及修改x-forward-for标签的方法:利用Squid的cache_peer机制,将这些代理按照一定的格式写入到配置文件中(具体格式参考文档),配置Squid端口,那么Squid可以帮助你调度代理,它也可以丢弃失败的代理。在访问的http请求中添加x-forward-for标签。客户端随机生成它并声明自己是透明代理服务器。如何突破豆瓣爬虫频率限制?提及于:使用带bid的cookie(可伪造)访问-github

其他资料

使用python-aiohttpAsyncIO 为工作Python 开发人员发出100 万个请求如何在40 小时内抓取2.5 亿个网页

代码

from selenium import webdriver

导入时间

导入aiohttp

从aiohttp.client_exceptions 导入ClientConnectionError

从aiohttp.client_exceptions 导入TimeoutError

导入异步

从asyncio.queues 导入PriorityQueue

导入夏代

进口重新

导入请求

从requests.packages.urllib3.exceptions 导入InsecureRequestWarning

requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

headers={"User-Agent": ("Mozilla/5.0 (Windows NT 10.0; Win64; x64) "

"AppleWebKit/537.36(KHTML,如Gecko)")}

循环=asyncio.get_event_loop()

类Proxy:

def __init__(自身, ip):

self._url="http://" + ip

self._score=100

@财产

def url(自身):

返回self._url

@财产

def 分数(自己):

返回self._score

def __lt__(自己,其他):

"""

由于优先级队列返回最小,因此这里得分高的代理是优秀的

所以比较时情况正好相反

"""

返回self._score other._score

def 成功(自身,时间):

self._score +=int(10/int(时间+ 1))

def 超时错误(自身):

self._score -=10

def connectError(self):

self._score -=30

def otherError(self):

self._score -=50

def getProxies():

url=("http://m.66ip.cn/mo.php?tqsl={proxy_number}")

url=url.format(proxy_number=10000)

html=requests.get(url, headers=headers).content

html=html.decode(chardet.detect(html)["编码"])

模式=r"d{1,3}.d{1,3}.d{1,3}.d{1,3}:d{1,5}"

all_ip=re.findall(模式, html)

如果len(all_ip)==0:

驱动程序=webdriver.PhantomJS(

executable_path=r"D:/phantomjs/bin/phantomjs.exe")

驱动程序.get(url)

time.sleep(12) # js等待5秒

html=驱动程序.page_source

驱动程序.quit()

all_ip=re.findall(模式, html)

使用open("66ip_" + str(time.time()), "w", 编码="utf-8") 作为f:

f.write(html)

返回所有IP

all_ip=设置(getProxies()) |设置(getProxies())

proxies=[all_ip 中代理的代理(proxy)]

mobike_url="https://mwx.mobike.com/mobike-api/rent/nearbyBikesInfo.do"

data={ # 请求参数: 纬度、经度!

"纬度": "33.2",

"经度": "113.4",

}

标题={

"referer": "https://servicewechat.com/",

}

async def douban(代理, 会话):

尝试:

开始=时间.time()

与session.post 异步(mobike_url,

数据=数据,

代理=代理.url,

headers=headers, # 可以引用外部头

超时=10)作为resp:

结束=时间.time()

# print(resp.status)

如果resp.status==200:

proxy.success(结束-开始)

print("%6.3d" % proxy._score, "已用时间--", end - start, "s")

否则:

proxy.otherError()

print("*****", resp.status, "*****")

except TimeoutError as te: print("%6.3d" % proxy._score, "timeoutError") proxy.timeoutError() except ClientConnectionError as ce: print("%6.3d" % proxy._score, "connectError") proxy.connectError() except Exception as e: print("%6.3d" % proxy._score, "otherError->", e) proxy.otherError() # ClientHttpProxyError # TCPConnector维持链接池,限制并行连接的总量,当池满了,有请求退出再加入新请求,500和100相差不大 # ClientSession调用TCPConnector构造连接,Session可以共用 # Semaphore限制同时请求构造连接的数量,Semphore充足时,总时间与timeout差不多 async def initDouban(): conn = aiohttp.TCPConnector(verify_ssl=False, limit=100, # 连接池在windows下不能太大,<500 use_dns_cache=True) tasks = [] async with aiohttp.ClientSession(loop=loop, connector=conn) as session: for p in proxies: task = asyncio.ensure_future(douban(p, session)) tasks.append(task) responses = asyncio.gather(*tasks) await responses conn.close() def firstFilter(): for i in range(2): s = time.time() future = asyncio.ensure_future(initDouban()) loop.run_until_complete(future) e = time.time() print("----- init time %s-----n" % i, e - s, "s") num = 0

pq = PriorityQueue() for proxy in proxies: if proxy._score >50: pq.put_nowait(proxy) num += 1 print("原始ip数:%s" % len(all_ip), "; 筛选后:%s" % num) return pq pq = firstFilter() async def genDouban(sem, session): # Getter function with semaphore. while True: async with sem: proxy = await pq.get() await douban(proxy, session) await pq.put(proxy) async def dynamicRunDouban(concurrency): """ TCPConnector维持链接池,限制并行连接的总量,当池满了,有请求退出再加入新请求 ClientSession调用TCPConnector构造连接,Session可以共用 Semaphore限制同时请求构造连接的数量,Semphore充足时,总时间与timeout差不多 """ conn = aiohttp.TCPConnector(verify_ssl=False, limit=concurrency, use_dns_cache=True) tasks = [] sem = asyncio.Semaphore(concurrency) async with aiohttp.ClientSession(loop=loop, connector=conn) as session: try: for i in range(concurrency): task = asyncio.ensure_future(genDouban(sem, session)) tasks.append(task) responses = asyncio.gather(*tasks) await responses except KeyboardInterrupt: print("-----finishing-----n") for task in tasks: task.cancel() if not conn.closed: conn.close() future = asyncio.ensure_future(dynamicRunDouban(200)) loop.run_until_complete(future) scores = [p.score for p in proxies] scores.sort(reverse=True) print("Most popular IPs:n ------------n", scores[:50], [i for i in scores if i >100]) loop.is_closed()访问百度async def baidu(proxy): """ 验证是否可以访问百度 """ async with aiohttp.ClientSession(loop=loop) as session: async with session.get("http://baidu.com", proxy="http://" + proxy, timeout=5) as resp: text = await resp.text() if "baidu.com" not in text: print(proxy, "n----nis bad for baidu.comn") return False return True访问icanhazipasync def testProxy(proxy): """ http://aiohttp.readthedocs.io/en/stable/client_reference.html#aiohttp.ClientSession.request """ async with aiohttp.ClientSession(loop=loop) as session: async with session.get("http://icanhazip.com", proxy="http://" + proxy, timeout=5) as resp: text = await resp.text() if len(text) >20: return else: if await baidu(proxy): firstFilteredProxies.append(proxy) # print("原始:", proxy, "; 结果:", text)访问HttpBinasync def httpbin(proxy): """ 访问httpbin获取headers详情, 注意访问https 代理仍为http 参考资料: https://imququ.com/post/x-forwarded-for-header-in-http.html http://www.cnblogs.com/wenthink/p/HTTTP_Proxy_TCP_Http_Headers_Check.html """ async with aiohttp.ClientSession(loop=loop) as session: async with session.get("https://httpbin.org/get?show_env=1", proxy="http://" + proxy, timeout=4) as resp: json_ = await resp.json() origin_ip = json_["origin"] proxy_ip = json_["headers"]["X-Forwarded-For"] via = json_["headers"].get("Via", None) print("原始IP:", origin_ip, "; 代理IP:", proxy_ip, "---Via:", via) if proxy_ip != my_ip and origin_ip == proxy_ip: annoy_proxies.append(proxy)访问豆瓣APIasync def douban(proxy): async with aiohttp.ClientSession(loop=loop) as session: try: async with session.get(("https://api.douban.com/v2/movie/top250" "?count=10"), proxy="http://" + proxy, headers=headers, timeout=4) as resp: print(resp.status) except TimeoutError as te: print(proxy, te, "timeoutError") except ClientProxyConnectionError as pce: print(proxy, pce, "proxyError") except ClientConnectionError as ce: print(proxy, ce, "connectError")循环访问豆瓣导致暂时被封IPheaders = {"User-Agent": ("Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) ")} while True: r = requests.get("http://douban.com", headers=headers) print(r.status_code) r = requests.get("https://movie.douban.com/j/search_subjects?" "type=movie&tag=%E8%B1%86%E7%93%A3%E9%AB%9",

如果你还想了解更多这方面的信息,记得收藏关注本站。

用户评论

淡写薰衣草的香

终于有人跟我一样研究爬虫代理了!

    有10位网友表示赞同!

致命伤

我一直想学习使用aiohttp,这篇文章可能挺有用的。

    有12位网友表示赞同!

寒山远黛

爬虫代理真是个神器,网速太慢的卡顿问题可以轻松解决。

    有8位网友表示赞同!

▼遗忘那段似水年华

分享一下你的代理经验吧,有哪些好用推荐?

    有8位网友表示赞同!

♂你那刺眼的温柔

文章提到了哪些种类的爬虫代理,我也想试试!

    有19位网友表示赞同!

眉黛如画

自己搭建代理服务是不是更安全些?

    有18位网友表示赞同!

棃海

aiohttp 是高性能的库吗?用来爬取大规模数据效果怎么样?

    有9位网友表示赞同!

轨迹!

爬虫代理用不完浪费资源,真希望能找到循环利用的方式。

    有12位网友表示赞同!

軨倾词

写篇文章总结一下使用aiohttp代理爬取数据的步骤吧!

    有6位网友表示赞同!

纯真ブ已不复存在

学习 Python 写爬虫一直是我的目标,这篇文章真是正好!

    有18位网友表示赞同!

夜晟洛

爬虫代理怎么选择合适类型的?有推荐吗?

    有14位网友表示赞同!

泡泡龙

感觉爬虫代理费用都会比较高昂吧?

    有19位网友表示赞同!

蹂躏少女

使用官方代理还是自己去找第三方代理更靠谱?

    有10位网友表示赞同!

麝香味

还有哪些优秀的 Python 库用来辅助爬虫呢?

    有16位网友表示赞同!

冷青裳

这篇文章对新手来说会不会太深了?

    有7位网友表示赞同!

未来未必来

爬虫代理是不是会违反一些网站的协议?

    有12位网友表示赞同!

执妄

在使用爬虫代理的时候需要注意些什么安全问题吗?

    有16位网友表示赞同!

没过试用期的爱~

文章里面提到的案例能不能简单解释一下?

    有5位网友表示赞同!

七夏i

爬虫代理和 VPN 是同一个概念吗?

    有20位网友表示赞同!

【网络爬虫代理应用及aiohttp代理实践探索】相关文章:

1.蛤蟆讨媳妇【哈尼族民间故事】

2.米颠拜石

3.王羲之临池学书

4.清代敢于创新的“浓墨宰相”——刘墉

5.“巧取豪夺”的由来--米芾逸事

6.荒唐洁癖 惜砚如身(米芾逸事)

7.拜石为兄--米芾逸事

8.郑板桥轶事十则

9.王献之被公主抢亲后的悲惨人生

10.史上真实张三丰:在棺材中竟神奇复活