銀月の符号

Python 使い見習いの日記・雑記

md5sum の模倣

Python で md5sum 。車輪の再発明遊び、兼 C コードリーディング。参考にしたのは GNU coreutils-8.4.tar.gz の md5sum.c 。

先日の「md5 のチェックを Python で - 銀月の符号」なんかと違って、ちゃんと各種オプションもある。

得られたもの

  • optparse モジュールのコールバックオプションの使い方
  • (作成過程における)自己満足(完成品に需要がないのは明らか。先日のは即席でもつくれそうなほど簡単なところがメリットになりえたが、これは無理。)

TODO

  • テスト(これはひどい。予定に入っていちゃだめだろ、終わってないと)
  • docstring, コメント(同上)
  • stdin からの入力

以下着手するかどうか未定。

  • リファクタリング(読みやすいコード、テストしやすいコードに。元コードの構造に引きずられて、 Python らしからぬ状態になっているのかも。)
  • 複数のアルゴリズムへの対応(md5sum.c は、これひとつで md5sum, sha1sum, sha256sum, sha224sum, sha512sum, sha384sum のビルドができるようだ。プリプロセッサで振り分けている。 Python 流だとどう料理すべきか?)
  • 多言語対応のための下地作り(たしか _("standard input") の _ は gettext というものだったような。詳しく知らないけれど実行環境のロケールに合った .mo ファイルをもとに一部の表示用テキストを置き換えるものだったっけ。 Python にも gettext モジュールがあることだし、これの使い方の勉強もできる?)

md5sum.py

# coding: utf-8

u"""GNU md5sum の模倣
"""

import sys
import re
import hashlib
import optparse

def split3(line):
    mo = re.match(r'([a-fA-F0-9]{32}) ([ \*])(.*)', line)
    if not mo:
        return None
    digest, is_bin, name = mo.groups()
    if u'\\' in name:
        L = []
        append = L.append
        it = iter(name)
        for c in it:
            if c != u'\\':
                append(c)
            else:
                try:
                    c = next(it)
                except StopIteration:
                    return None
                if c == u'\\':
                    append(u'\\')
                elif c == u'n':
                    append(u'\n')
                else:
                    return None
        name = u''.join(L)
    return digest, is_bin, name

def digest_stream(f, buf_size=2097152):
    ha = hashlib.md5()
    while True:
        data = f.read(buf_size)
        if not data:
            break
        ha.update(data)
    return ha

def digest_file(filename, binary):
    is_stdin = filename == '-'

    if is_stdin:
        # TODO: 標準入力からの入力をなんとかする
        raise Exception(u'Reading from stdin is not supported yet.')
    else:
        with open(filename, 'rb' if binary else 'r') as f:
            ha = digest_stream(f)
        return ha

def digest_check(filename, warn=False):
    is_stdin = filename == '-'

    if is_stdin:
        # TODO: 標準入力からの入力をなんとかする
        raise Exception(u'Reading from stdin is not supported yet.')
    else:
        with open(filename, 'r') as f:
            for line_number, line in enumerate(f, 1):
                if line[0] == '#':
                    continue
                if line[-1] == '\n':
                    line = line[:-1]
                try:
                    digest, is_bin, name = split3(line)
                except TypeError:
                    if warn:
                        print >> sys.stderr, u'%s: %d: improperly formatted MD5 checksum line' % (filename, line_number)
                    continue

                mode = 'rb' if is_bin == '*' else 'r'
                try:
                    with open(name, mode) as g:
                        he = digest_stream(g)
                    result = digest.lower() == he.hexdigest().lower()
                    result_s = u'OK' if result else u'FAILED'
                except IOError, e:
                    print >> sys.stderr, e
                    result = False
                    result_s = u'FAILED open or read'
                yield name, result, result_s

usage = '''%prog [OPTION]... [FILE]...
Print or check MD5(128-bit) checksums.
With no File, or when FILE is -, read standard input.'''

# TODO: version および説明文を考え直すこと
version = '''%prog 0.1
Written by fgshun'''

def status_callback(option, opt, value, parser, *args, **kwargs):
    parser.values.status_only = True
    parser.values.warn = False
    parser.values.quiet = True

def quiet_callback(option, opt, value, parser, *args, **kwargs):
    parser.values.status_only = False
    parser.values.warn = False
    parser.values.quiet = True

def warn_callback(option, opt, value, parser, *args, **kwargs):
    parser.values.status_only = False
    parser.values.warn = True
    parser.values.quiet = False

def main():
    parser = optparse.OptionParser(usage=usage, version=version)
    parser.add_option('-b', '--binary',
            action='store_true', dest='binary',
            help='read in binary mode')
    parser.add_option('-c', '--check',
            action='store_true', dest='do_check',
            help='read MD5 sums from the FILEs and check them')
    parser.add_option('-t', '--text',
            action='store_false', dest='binary',
            help='read in text mode (default)')
    parser.add_option('--quiet',
            action='callback', callback=quiet_callback,
            help="don't print OK for each successfully verified file")
    parser.add_option('--status',
            action='callback', callback=status_callback,
            help="don't output anything, status code shows success")
    parser.add_option('-w', '--warn',
            action='callback', callback=warn_callback,
            help='warn about improperly formatted checksum lines')
    parser.set_defaults(
            binary=None,
            do_check=False,
            status_only=False,
            warn=False,
            quiet=False)

    options, args = parser.parse_args()

    if options.binary is not None and options.do_check:
        parser.error('the --binary and --text options are meaningless when'
                     'verifying checksums')
    if options.status_only and not options.do_check:
        parser.error('the --status option is meaningful only'
                     'when verifying checksums')
    if options.warn and not options.do_check:
        parser.error('the --warn option is meaningful only'
                     'when verifying checksums')
    if options.quiet and not options.do_check:
        parser.error('the --quiet option is meaningful only'
                     'when verifying checksums')

    if not args:
        args.append('-')

    ok = True

    if options.do_check:
        lines = 0
        computed = 0
        failures = 0
        mismatched = 0
        for arg in args:
            for name, result, result_s in digest_check(arg, options.warn):
                lines += 1
                if result:
                    computed += 1
                else:
                    if result_s == u'FAILED':
                        computed += 1
                        mismatched += 1
                    elif result_s == u'FAILED open or read':
                        failures += 1
                    else:
                        assert True
                    ok = False
                if not (options.status_only or (result and options.quiet)):
                    print u'%s: %s' % (name, result_s)
        if failures and not options.status_only:
            print >> sys.stderr, 'WARNING: %d of %d listed files could not be read' % (failures, lines)
        if mismatched and not options.status_only:
            print >> sys.stderr, 'WARNING: %d of %d computed checksums did Not match' % (mismatched, computed)
    else:
        for arg in args:
            try:
                ha = digest_file(arg, options.binary)
                digest = ha.hexdigest()
                parg = arg.replace(u'\\', u'\\\\')
                parg = parg.replace(u'\n', u'\\n')
                print u'%s %c%s' % (
                        digest, u'*' if options.binary else u' ', parg)
            except IOError, e:
                print >> sys.stderr, e
                ok = False

    sys.exit(0 if ok else 1)

if __name__ == '__main__':
    main()