爬虫相关

基于Tornado的异步爬虫

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import time
import logging
from datetime import timedelta
from tornado import httpclient, gen, ioloop, queues
import traceback
from bs4 import BeautifulSoup


def logged(class_):
    logging.basicConfig(level=logging.INFO)
    class_.logger = logging.getLogger(class_.__name__)
    return class_


@logged
class AsyncSpider(object):
    """A simple class of asynchronous spider."""
    def __init__(self, urls, concurrency=10, results=None, **kwargs):
        super(AsyncSpider, self).__init__(**kwargs)

        self.concurrency = concurrency
        self._q = queues.Queue()
        self._fetching = set()
        self._fetched = set()
        if results is None:
            self.results = []
        for url in urls:
            self._q.put(url)
        httpclient.AsyncHTTPClient.configure(
            "tornado.curl_httpclient.CurlAsyncHTTPClient"
        )

    def fetch(self, url, **kwargs):
        fetch = getattr(httpclient.AsyncHTTPClient(), 'fetch')
        http_request = httpclient.HTTPRequest(url, **kwargs)
        return fetch(http_request, raise_error=False)

    def handle_html(self, url, html):
        """处理html页面"""
        print(url)

    def handle_response(self, url, response):
        """处理http响应,对于200响应码直接处理html页面,
        否则按照需求处理不同响应码"""
        if response.code == 200:
            self.handle_html(url, response.body)

        elif response.code == 599:    # retry
            self._fetching.remove(url)
            self._q.put(url)

    @gen.coroutine
    def get_page(self, url):
        # yield gen.sleep(10)    # sleep when need
        try:
            response = yield self.fetch(url)
            self.logger.debug('######fetched %s' % url)
        except Exception as e:
            self.logger.debug('Exception: %s %s' % (e, url))
            raise gen.Return(e)
        raise gen.Return(response)    # py3 can just return response

    @gen.coroutine
    def _run(self):
        @gen.coroutine
        def fetch_url():
            current_url = yield self._q.get()
            try:
                if current_url in self._fetching:
                    return

                self.logger.debug('fetching****** %s' % current_url)
                self._fetching.add(current_url)

                response = yield self.get_page(current_url)
                self.handle_response(current_url, response)    # handle reponse

                self._fetched.add(current_url)

            finally:
                self._q.task_done()

        @gen.coroutine
        def worker():
            while True:
                yield fetch_url()

        # Start workers, then wait for the work queue to be empty.
        for _ in range(self.concurrency):
            worker()

        yield self._q.join(timeout=timedelta(seconds=300000))

        try:
            assert self._fetching == self._fetched
        except AssertionError:    # some http error not handle
            print(self._fetching-self._fetched)
            print(self._fetched-self._fetching)

    def run(self):
        io_loop = ioloop.IOLoop.current()
        io_loop.run_sync(self._run)


class MySpider(AsyncSpider):

    def fetch(self, url, **kwargs):
        """重写父类fetch方法可以添加cookies,headers,timeout等信息"""
        cookies_str = "PHPSESSID=j1tt66a829idnms56ppb70jri4; pspt=%7B%22id%22%3A%2233153%22%2C%22pswd%22%3A%228835d2c1351d221b4ab016fbf9e8253f%22%2C%22_code%22%3A%22f779dcd011f4e2581c716d1e1b945861%22%7D; key=%E9%87%8D%E5%BA%86%E5%95%84%E6%9C%A8%E9%B8%9F%E7%BD%91%E7%BB%9C%E7%A7%91%E6%8A%80%E6%9C%89%E9%99%90%E5%85%AC%E5%8F%B8; think_language=zh-cn; SERVERID=a66d7d08fa1c8b2e37dbdc6ffff82d9e|1444973193|1444967835; CNZZDATA1254842228=1433864393-1442810831-%7C1444972138"
        headers = {
            'User-Agent': 'mozilla/5.0 (compatible; baiduspider/2.0; +http://www.baidu.com/search/spider.html)',
            'cookie': cookies_str
        }
        return super(MySpider, self).fetch(
            url, headers=headers,
            #proxy_host="127.0.0.1", proxy_port=8787,    # for proxy
        )

    def handle_html(self, url, html):
        print(url)
        #print(BeautifulSoup(html, 'lxml').find('title'))


def main():
    st = time.time()
    urls = []
    n = 1000
    for page in range(1, n):
        urls.append('http://www.jb51.net/article/%s.htm' % page)
    s = MySpider(urls, 10)
    s.run()
    print(time.time()-st)
    print(60.0/(time.time()-st)*1000, 'per minute')
    print(60.0/(time.time()-st)*1000/60.0, 'per second')


if __name__ == '__main__':
    main()

写爬虫会遇到的一些工具函数

#!/usr/bin/env python
# -*- coding:utf-8 -*-

"""
chrome有个功能,对于请求可以直接右键copy as curl,然后在命令行里边用curl
模拟发送请求。现在需要把此curl字符串处理成requests库可以传入的参数格式,
http://stackoverflow.com/questions/23118249/whats-the-difference-between-request-payload-vs-form-data-as-seen-in-chrome
"""

import re
from functools import wraps
import traceback
import requests


def encode_to_dict(encoded_str):
    """ 将encode后的数据拆成dict
    >>> encode_to_dict('name=foo')
    {'name': foo'}
    >>> encode_to_dict('name=foo&val=bar')
    {'name': 'foo', 'val': 'var'}
    """

    pair_list = encoded_str.split('&')
    d = {}
    for pair in pair_list:
        if pair:
            key = pair.split('=')[0]
            val = pair.split('=')[1]
            d[key] = val
    return d


def parse_curl_str(s):
    """convert chrome curl string to url, headers_dict and data"""
    pat = re.compile("'(.*?)'")
    str_list = [i.strip() for i in re.split(pat, s)]   # 拆分curl请求字符串

    url = ''
    headers_dict = {}
    data = ''

    for i in range(0, len(str_list)-1, 2):
        arg = str_list[i]
        string = str_list[i+1]

        if arg.startswith('curl'):
            url = string

        elif arg.startswith('-H'):
            header_key = string.split(':', 1)[0].strip()
            header_val = string.split(':', 1)[1].strip()
            headers_dict[header_key] = header_val

        elif arg.startswith('--data'):
            data = string

    return url, headers_dict, data


def retry(retries=3):
    """一个失败请求重试,或者使用下边这个功能强大的retrying
    pip install retrying
    https://github.com/rholder/retrying

    :param retries: number int of retry times.
    """
    def _retry(func):
        @wraps(func)
        def _wrapper(*args, **kwargs):
            index = 0
            while index < retries:
                index += 1
                try:
                    response = func(*args, **kwargs)
                    if response.status_code == 404:
                        print(404)
                        break
                    elif response.status_code != 200:
                        print(response.status_code)
                        continue
                    else:
                        break
                except Exception as e:
                    traceback.print_exc()
                    response = None
            return response
        return _wrapper
    return _retry


_get = requests.get


@retry(5)
def get(*args, **kwds):
    if 'timeout' not in kwds:
        kwds['timeout'] = 10
    if 'headers' not in kwds:
        headers = {
            'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.116 Safari/537.36',
        }
        kwds['headers'] = headers

    return _get(*args, **kwds)

requests.get = get


def retry_get_html(*args, **kwargs):
    try:
        return get(*args, **kwargs).content
    except AttributeError:
        return ''


def lazy_property(fn):
    attr_name = '_lazy_' + fn.__name__

    @property
    def _lazy_property(self):
        if not hasattr(self, attr_name):
            setattr(self, attr_name, fn(self))
            return getattr(self, attr_name)
    return _lazy_property


def my_ip():
    # url = 'https://api.ipify.org?format=json'
    url = 'http://httpbin.org/ip'
    return requests.get(url).text


def form_data_to_dict(s):
    """form_data_to_dict s是从chrome里边复制得到的form-data表单里的字符串,
    注意*必须*用原始字符串r""

    :param s: form-data string
    """
    arg_list = [line.strip() for line in s.split('\n')]
    d = {}
    for i in arg_list:
        if i:
            k = i.split(':', 1)[0].strip()
            v = ''.join(i.split(':', 1)[1:]).strip()
            d[k] = v
    return d

if __name__ == '__main__':
    import sys
    from pprint import pprint
    try:
        curl_str = sys.argv[1]   # 用三引号括起来作为参数
        url, headers_dict, data = parse_curl_str(curl_str)
        print(url)
        pprint(headers_dict)
        print(data)
    except IndexError:
        exit(0)

如何使用代理

#!/usr/bin/env python
# -*- coding:utf-8 -*-

# requests proxy demo
import requests

# install lantern first, 这是使用lantern的代理地址
proxies = {
    "http": "http://127.0.0.1:8787",
    "https": "http://127.0.0.1:8787",
}

url = 'http://httpbin.org/ip'
r = requests.get(url, proxies=proxies)
print(r.text)


# requests from version 2.10.0 support socks proxy
# pip install -U requests[socks]
proxies = {'http': "socks5://myproxy:9191"}
requests.get('http://example.org', proxies=proxies)

# tornado proxy demo
# sudo apt-get install libcurl-dev librtmp-dev
# pip install tornado pycurl

from tornado import httpclient, ioloop

config = {
    'proxy_host': 'YOUR_PROXY_HOSTNAME_OR_IP_ADDRESS',
    'proxy_port': 3128
}

httpclient.AsyncHTTPClient.configure(
    "tornado.curl_httpclient.CurlAsyncHTTPClient")


def handle_request(response):
    if response.error:
        print "Error:", response.error
    else:
        print response.body
    ioloop.IOLoop.instance().stop()

http_client = httpclient.AsyncHTTPClient()
http_client.fetch("http://twitter.com/",
    handle_request, **config)
ioloop.IOLoop.instance().start()

使用线程池

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import concurrent.futures
import bs4
import requests


class ThreadPoolCrawler(object):
    def __init__(self, urls, concurrency=10, **kwargs):
        self.urls = urls
        self.concurrency = concurrency
        self.results = []

    def handle_response(self, url, response):
        pass

    def get(self, *args, **kwargs):
        return requests.get(*args, **kwargs)

    def run(self):
        with concurrent.futures.ThreadPoolExecutor(max_workers=self.concurrency) as executor:
            future_to_url = {
                executor.submit(self.get, url): url for url in self.urls
            }
            for future in concurrent.futures.as_completed(future_to_url):
                url = future_to_url[future]
                try:
                    response = future.result()
                except Exception as e:
                    import traceback
                    traceback.print_exc()
                else:
                    self.handle_response(url, response)


class TestCrawler(ThreadPoolCrawler):
    def handle_response(self, url, response):
        soup = bs4.BeautifulSoup(response.text, 'lxml')
        title = soup.find('title')
        self.results.append({url: title})


def main():
    import time
    urls = ['http://localhost:8000'] * 300
    for nums in [2, 5, 10, 15, 20, 50, 70, 100]:
        beg = time.time()
        s = TestCrawler(urls, nums)
        s.run()
        print(nums, time.time()-beg)

if __name__ == '__main__':
    main()

使用tor代理ip

#!/usr/bin/env python
# -*- coding:utf-8 -*-

    # http://ningning.today/2016/03/07/python/python-requests-tor-crawler/

import os
import requests
import requesocks

#url = 'https://api.ipify.org?format=json'
url = 'http://httpbin.org/ip'


def getip_requests(url):
    print "(+) Sending request with plain requests..."
    r = requests.get(url)
    print "(+) IP is: " + r.text.replace("\n", "")


def getip_requesocks(url):
    print "(+) Sending request with requesocks..."
    session = requesocks.session()
    session.proxies = {'http': 'socks5://127.0.0.1:9050',
                       'https': 'socks5://127.0.0.1:9050'}
    r = session.get(url)
    print "(+) IP is: " + r.text.replace("\n", "")


def tor_requests():
    proxies = {
        'http': 'socks5://127.0.0.1:9050',
        'https': 'socks5://127.0.0.1:9050',
    }
    r = requests.get(url, proxies=proxies)
    print r.text


def main():
    print "Running tests..."
    getip_requests(url)
    getip_requesocks(url)
    os.system("""(echo authenticate '"yourpassword"'; echo signal newnym; echo quit) | nc localhost 9051""")
    getip_requesocks(url)


if __name__ == "__main__":
    main()
    #tor_requests()