無駄と文化

実用的ブログ

Python で日時計算 ~ 月初とか月末とか N ヵ月前とか

Python で日付計算と言えば datetime モジュールですね。まぁまぁ便利なんですが、標準では 月初月末 を求めることができません。
なのでちょっとしたユーティリティコードとして書いておきました。


N ヶ月後・N ヶ月前

add_month()N ヶ月後の同日同時間 を求める関数です。同様に sub_month()N ヶ月前の同日同時間 を求めます。
2月31日 のように計算結果が存在しない日付になってしまう場合は、同月の末日を返します。

本当は datetime.timedelta を使えれば良かったんですが、datetime.timedelta では N ヶ月差を表現することはできないようです。


月初・月末

現在持っている日付の 月初月末 を求める関数も書きました。
start_of_month() は月初を、end_of_month() は月末を返します。


コード

# -*- coding: utf-8 -*-

from datetime import datetime
from calendar import monthrange


def add_month(_date, month_delta):
    """{month_delta} ヵ月後の同日の日時を返す

    >>> add_month(datetime.datetime(2017, 1, 30, 5, 20, 15), month_delta=1)
    datetime.datetime(2017, 2, 28, 23, 59, 59)

    >>> add_month(datetime.datetime(2017, 1, 15, 5, 20, 15), month_delta=1)
    datetime.datetime(2017, 2, 15, 5, 20, 15)

    >>> add_month(datetime.datetime(2017, 1, 30, 5, 20, 15), month_delta=13)
    datetime.datetime(2018, 2, 28, 23, 59, 59)
    """

    year = _date.year
    month = _date.month
    day = _date.day
    hour = _date.hour
    minute = _date.minute
    second = _date.second
    microsecond = _date.microsecond

    alt_year = year
    alt_month = month + month_delta

    if alt_month > 12:
        month_overflow = alt_month

        alt_year += month_overflow // 12
        alt_month = month_overflow % 12
    elif alt_month <= 0:
        month_overflow = alt_month

        alt_year += (month_overflow - 1) // 12
        alt_month = (month_overflow - 1) % 12 + 1

    try:
        return datetime(alt_year, alt_month, day, hour, minute, second, microsecond)
    except ValueError:
        return end_of_month(datetime(alt_year, alt_month, 1, hour, minute, second, microsecond))

def sub_month(_date, month_delta):
    """{month_delta} ヵ月前の同日の日時を返す

    >>> sub_month(datetime.datetime(2017, 3, 30, 5, 20, 15), month_delta=1)
    datetime.datetime(2017, 2, 28, 23, 59, 59)

    >>> sub_month(datetime.datetime(2017, 3, 15, 5, 20, 15), month_delta=1)
    datetime.datetime(2017, 2, 15, 5, 20, 15)

    >>> sub_month(datetime.datetime(2017, 3, 20, 5, 20, 15), month_delta=5)
    datetime.datetime(2016, 10, 20, 5, 20, 15)
    """

    return add_month(_date, -month_delta)

def start_of_month(_date):
    """その月の月初の時を返す

    >>> start_of_month(datetime.datetime(2017, 3, 15))
    datetime.datetime(2017, 3, 1, 0, 0)

    >>> start_of_month(datetime.datetime(2017, 2, 28, 10, 10, 10))
    datetime.datetime(2017, 2, 1, 0, 0)
    """

    year = _date.year
    month = _date.month

    return datetime(year, month, 1, 0, 0, 0)

def end_of_month(_date):
    """その月の月末の日時を返す

    >>> end_of_month(datetime.datetime(2017, 3, 15))
    datetime.datetime(2017, 3, 31, 23, 59, 59)

    >>> end_of_month(datetime.datetime(2017, 2, 28, 10, 10, 10))
    datetime.datetime(2017, 2, 28, 23, 59, 59)
    """

    year = _date.year
    month = _date.month
    end_of_month = monthrange(year, month)[1]

    return datetime(year, month, end_of_month, 23, 59, 59)


ちなみに、これらの関数名は PHP の良く出来た日付計算ライブラリ Carbon を大いに参考にしています。

一応、Gist があります。

Python で日時計算 ~ 月初とか月末とか N ヵ月前とか · GitHub


私からは以上です。

一手間加えた INSERT - レコードが未登録のとき、登録済みのとき、

DB にレコードを INSERT するとき、一手間加えて 未登録の場合に限って登録登録済みなら一部フィールドだけ上書き などしたくなりますよね。
ここ最近、そのような SQL を書くことが多かったのでメモしておきます。

ちなみに MySQL の独自構文などもバリバリ使っているので、他のベンダーの DB に適用するときは部分的な書き換えが必要かもしれません。


レコードが存在していなかったら新規登録、存在していれば上書き

INSERT INTO `posts`(
    `id`
    ,`title`
    ,`body`
    ,`created_at`
    ,`updated_at`
)
VALUES (
    42
    ,'test title'
    ,'test body'
    ,'1970-01-01 00:00:00'
    ,'1970-01-01 00:00:00'
)
ON DUPLICATE KEY UPDATE
    `title`       = VALUES(`title`)
    ,`body`       = VALUES(`body`)
    ,`updated_at` = VALUES(`updated_at`)

UNIQUE インデックスまたは PRIMARY KEY を重複させるようなレコードを INSERT しようとしたときに、INSERT ではなく UPDATE が実行される。
その場合でも created_at は上書きされない。

参考


レコードが存在していなかったら新規登録、存在していれば何もしない

INSERT INTO `tags` (
    `id`
    ,`name`
)
SELECT
    42
    ,'Technology'
FROM dual
WHERE NOT EXISTS (
    SELECT `id` FROM `tags`
    WHERE `name` = 'Technology'
)

タグ一覧テーブルからタグ名が ‘Technorogy’ であるものを探して、存在しなかった場合に限り INSERT する。
8行目でテーブル名として使われている dual は実際には参照されることのないダミーのテーブル名。

参考

Scrapy+AWS LambdaでWeb定点観測のイレギュラーに立ち向かう

この記事は クローラー/Webスクレイピング Advent Calendar 2016 の10日目の記事です。
9日目は @hotu_ta さん、11日目は @TakesxiSximada さん でした。


Web スクレイピングはイレギュラーとの戦いです。特にそれが Web 定点観測のためのスクレイピングであれば難易度はさらに高まります。

  • スクレイピングしようとしたタイミングでサーバーが死んでいるかもしれない
  • クローラーを書いていたときには気づけなかったバグが遅れて発動するかもしれない
  • 知らぬ間にスクレイピングしたいページの URL が変更されるかもしれない

そんなイレギュラーに立ち向かうために、現在 私が試している方法をまとめてみます。


その前に「Web 定点観測」とは?

Web 定点観測 とは、一言でいうと「一つの URL を定期的にスクレイピングして経時的な変化を追っていく行為」のことです。RSS が提供されていないページに置いて更新を常に追っていくためのスクレイピングなどが一般的でしょうか。

あえて「Web 定点観測」と名前を付けているのは、一度実行してデータを取得したら終わりではないよという点を強調したいからです。


構成

それではさっそく構成を、

f:id:todays_mitsui:20161218205656p:plain

システムは前半の クローリングパート と、後半の スクレイピングパート に分けています。 ここでは『ページ内のリンクを探索しながらページを次々に辿っていく処理』をクローリング、『取得したページ(HTML)から必要なデータを適切な形式で抜き出す処理』をスクレイピングと呼び分けています。

前半のクローリングパートでは Scrapy で必要としているページのリンクを辿り S3 に HTML を保存すること だけ をやります。
後半のスクレイピングパートのメインは AWS Lambda です。S3 に HTML が保存されることがトリガーとなって Lambda が呼び出されるように設定しています。やっているのは HTML に記述されている情報を抜き出して、お好みで加工してデータベースに保存することです。

また、クローリングパート, スクレイピングパートそれぞれで発生したエラー情報は Rollbar に投げてログを取っています。


前半と後半に分ける理由

実際のところ Scrapy というフレームワークは充分にフルスタックで、クローリングとスクレイピングを同時にこなすのに充分な機能を持っています。では、なぜわざわざ処理を前半と後半に分けて Scrapy で HTML の保存だけをやるのでしょうか。
それはデータの取得に失敗する原因の多くがスクレイピングパートでのエラーによるからです。

さらに、そのエラーが全てのページで起こるもの(スクレイピングのロジックに起因するもの)であればクローリング自体中止したいし、そのページに特有のもの(特定のページ構造に起因するもの)であればクローリングを続行したいという事情も事態を複雑にします。
目的のデータが取得できなかったとき 何が原因で、誰のせいなのか をはっきりと把握しておく必要があるのです。そして、対処できるものに対しては対処し、そうでないものについてもイレギュラーが起こっていることだけは把握できる準備が必要です。


スクレイピングにおける「イレギュラー」を細分化すると以下のようになるかと思います。

  1. サーバー側でエラーが発生する(404エラー, 500エラー など)
  2. サーバーは正常だが目的のコンテンツが無い
  3. スクレイピング処理中にエラーが発生する
  4. スクレイピング処理は正常終了するが目的のデータが取得できていない

このうち①, ②はサーバー側の要因であり、こちら側で対処できないことが多いです。一方で③, ④はおそらくこちら側での ミス で、何かしらの対処が可能なものです。

処理を前半と後半に分ければ、クローリングパートでは①だけを気にすれば良くなります。スクレイピングパートで気にするべきは②, ③, ④です。


① サーバー側でのエラー

通常、Scrapy のクローリングエンジンはサーバーから 4xx 系や 5xx 系のエラーが返った場合にログにエラーを記録だけしてクローリングを続けます。
ですが、余りにも 5xx 系のエラーが多い場合にはサーバーが死んでいると見るのが自然なので何かしらの特別な処理をしたくなるでしょう。

Scrapy でそれをするには errback() を使います。
errback() はクローリング中に 200 以外のステータスコードが返されたときにだけ呼ばれるコールバック関数です。

import scrapy
from scrapy import Request


class ExampleSpider(scrapy.Spider):
    name = "index"
    allowed_domains = ["example.com"]

    start_urls = ["http://example.com/",]
    error_urls = []

    def parse(self, response):
        for i in range(10):
            yield Request(
                "http://example.com/{0}/".format(i),
                callback=self.callback,
                errback=selferrback,
            )

    def callback(self, response):
        return {
            "url": response.url,
            "title": response.xpath("//title/text()").extract_first(),
        }

    def errback(self, failure):
        if failure.check(HttpError):
            # 200 以外のステータスコードが返った場合
            response = failure.value.response
            self.error_urls.append(response.url)

        elif failure.check(DNSLookupError, TimeoutError, TCPTimedOutError):
            request = failure.request
            self.error_urls.append(request.url)

    def closed(reason):
        self.logger.debug("Errors: {0}".format(len(self.error_urls)))

error_urls という名前で空のリストを用意しておき、200 以外のステータスコードが返されたときにはそのページの URL を放り込んでいきます。
どれほどの数のサーバーエラーが検出されたとき異常と見なすかは一考の余地がありますが、エラーが多い場合には closed() の中で error_urls の中をチェックして然るべきエラー処理をします。現在はとりあえず Rollbar にエラーログを投げたりしてみています。


③ スクレイピング処理中に発生したエラー

ケアレスによるバグがクローリング中に起こるのを恐れてクローリングパートでは HTML を S3 に保存するという最小限の事だけをしています。
保存された HTML から目的のデータを抜き出すのは AWS Lambda のお仕事です。

import urllib
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

import boto3
from lxml import html

import rollbar
rollbar.init("POST_SERVER_ITEM_ACCESS_TOKEN")


def load(bucket, key):
    obj = s3.Object(bucket, key).get()

    doc = obj["Body"].read()
    metadata = obj["Metadata"]

    return doc, metadata

def parse(doc, base_url):
    root = html.fromstring(doc)

    title = root.xpath("//title/text()")[0].strip()
    if 0 == len(title):
        raise Exception("title not found.")

    return {
        "title": title,
    }

def lambda_handler(event, context):
    bucket = event["Records"][0]["s3"]["bucket"]["name"]
    key = urllib.unquote_plus(event["Records"][0]["s3"]["object"]["key"].encode("utf8"))
    logger.info("KEY: {0}".format(key))

    doc, metadata = load(bucket, key)

    try:
        data = parse(doc, base_url=metadata["src"])
    except Exception as e:
        logger.error(str(e))
        rollbar.report_exc_info()

    return data

具体例を示すほどの事でもないのですが、lxml で HTML を etree に変換して xpath でデータを指定し、抜き出し、加工しています。
クローリングパートよりも少々複雑な事をする場合もあり予期せぬエラーも起こりがちです。なるべく柔軟なコーディングを心がけますが、対象ページが知らぬ間にリニューアルされていて DOM 構造が大幅に変更されていることもあります。そのへんはスクレイピングという行為の性質上 仕方のないことかと思います。


スクレイピング処理中に起こったエラーは Rollbar に投げられます。
エラーに(私が)気づいたら元気よくコードを修正して Lambda を再び走らせます。対象のページは既に S3 に保存されているので、スクレイピングのトライ&エラーを何度繰り返そうとも元のサーバーに余計なトラフィックを発生させることはありません。


④ スクレイピング処理中に発生しなかったエラー、しかし目的のデータが得られていない場合

このパターンの方が厄介ですね。
スクレイピング処理中は要所要所で取得したデータをバリデーションチェックして、想定しない形式のデータが取れた場合には例外を投げるようにしています。そうやって投げられた例外は ③ で補足されて手動リトライされるわけですが、正しい結果が得られていないにも関わらず例外を投げ損なうと、異常の発見が遅れますね。


② 取得したページに目的のコンテンツが無い場合

これも厄介なパターンです。
そもそも何をもって 目的のコンテンツが無い とするのかも曖昧で、一概に判定できなかったりします。とはいえ、目的の情報が得られない場合にはスクレイピング処理中に例外が投げられるようなコーディングを心がけます。
異常があると分かったら一刻も早く停止させるべきなので。


このやり方でやってみて

メリット

  • クローリングパートとスクレイピングパートに分ける戦略は管理が楽
  • スクレイピング処理中に失敗しても原因究明とリトライがしやすい


デメリット

  • A/B テストに弱い
  • S3 への PUT が多くなるので場合によっては料金がかさむかも


一番のデメリットは A/Bテストに弱い ことです。
目的のデータが取得できていないことはスクレイピングパートで発覚しますが、そこからクローリングをリトライする綺麗な流れを今のところ考えられていません。


まとめ

Web 定点観測において現状やってみていることを紹介してみました。
『自分のことだからどうせしょうもないミスしてるって』という疑り深い精神と、リトライ時にもコンテンツ配信元のサーバーに迷惑を掛けないようにとの心づかいが働いている...、つもりです。


私からは以上です。