PythonでGoogle Photosを扱うスクリプトをまとめた

Created 2019年3月10日17:47
Updated 2019年3月10日17:49
Categories Python Google Photos API

旅行に行った時の写真が全てGoogle Photosにアップロードされていたのですが、数が多すぎるのでローカルに落としてじっくり選定したいと思っていました。

Google PhotosのAPIから画像をダウンロードする知見は少し前に得ていたので、この機会に簡単に使えるようにクラス化してまとめてみました。

事前準備

  • このページを参考に、Google Photos APIを有効にしてください
  • Google Developer Consoleの認証情報ページから認証情報の入ったJSONをダウンロードしてください

ダウンロードしたcredentials.jsonをスクリプトと同じディレクトリに配置してください。

コード

from pathlib import Path
from requests_oauthlib import OAuth2Session
import json
import logging
import time
from datetime import datetime, timedelta

logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.DEBUG)
logger = logging.getLogger(__name__)


class GooglePhotos:
    # APIのURLやスコープ
    api_url = {
        "test": "https://photoslibrary.googleapis.com/v1/mediaItems",
        "searchItems": "https://photoslibrary.googleapis.com/v1/mediaItems:search",
        "mediaItem": "https://photoslibrary.googleapis.com/v1/mediaItems/{}"
    }
    scope = ["https://www.googleapis.com/auth/photoslibrary.readonly"]
    sleep_time = 10
    photo_size_format = "{base}=w{width}-h{height}"

    def __init__(self, token_path="token.json", credential_path="credentials.json"):
        self.token_path = token_path
        self.google_session, logged_in = self.login(credential_path)
        # ログイン処理が行われていたらトークンを保存
        # 本来自動保存だが動かないので追加
        if logged_in:
            self.save_token()
        # 有効期限の過ぎたトークンをリフレッシュ
        self.token_expires_at = datetime.fromtimestamp(self.google_session.token.get("expires_at"))
        self.check_and_refresh_token()

    # ログイン後に取得したトークンをtoken.jsonに保存
    def save_token(self):
        logger.debug("トークンを保存しています")
        Path(self.token_path).write_text(json.dumps(self.google_session.token))

    # token.jsonが存在したら読み込み
    def load_token(self):
        # 存在しない場合は期限切れのダミーを返す
        token = {
            "access_token": "",
            "refresh_token": "",
            "token_type": "",
            "expires_in": "-30",
            "expires_at": (datetime.now() - timedelta(hours=2)).timestamp()
        }
        path = Path(self.token_path)
        if path.exists():
            logger.debug("トークンをファイルから読み込んでいます")
            token = json.loads(path.read_text())
        return token

    def check_and_refresh_token(self):
        if datetime.now() + timedelta(minutes=10) > self.token_expires_at:
            logger.debug("トークンの期限切れが近いため、更新を行います")
            new_token = self.google_session.refresh_token(
                self.google_session.auto_refresh_url,
                **self.google_session.auto_refresh_kwargs
            )
            self.google_session.token = new_token
            self.token_expires_at = datetime.fromtimestamp(self.google_session.token.get("expires_at"))

    def get(self, *args, **kwargs):
        self.check_and_refresh_token()
        return self.google_session.get(*args, **kwargs)

    def post(self, *args, **kwargs):
        self.check_and_refresh_token()
        return self.google_session.post(*args, **kwargs)

    # ログインしてセッションオブジェクトを返す
    def login(self, credential_path):
        # 認証情報を読み込み
        auth_info = json.loads(Path(credential_path).read_text()).get("installed", None)
        assert auth_info is not None
        # トークン読み込み
        token = self.load_token()
        # トークン更新用の認証情報
        extras = {
            "client_id": auth_info.get("client_id"),
            "client_secret": auth_info.get("client_secret"),
        }
        # セッションオブジェクトを作成
        # TODO: token_updaterの引数がたぶん合わない
        google_session = OAuth2Session(
            auth_info.get("client_id"),
            scope=GooglePhotos.scope,
            token=token,
            auto_refresh_kwargs=extras,
            token_updater=self.save_token,
            auto_refresh_url=auth_info.get("token_uri"),
            redirect_uri=auth_info.get("redirect_uris")[0]
        )
        # ログインしていない場合ログインを行う
        logged_in = False
        if not google_session.authorized:
            logger.debug("ログインを行います")
            authorization_url, state = google_session.authorization_url(
                auth_info.get("auth_uri"),
                access_type="offline",
                prompt="select_account"
            )
            # 認証URLにアクセスしてコードをペースト
            print("Access {} and paste code.".format(authorization_url))
            access_code = input(">>> ")
            google_session.fetch_token(
                auth_info.get("token_uri"),
                client_secret=auth_info.get("client_secret"),
                code=access_code
            )
            assert google_session.authorized
            logged_in = True
        return google_session, logged_in

    def get_photo_list(self, page_num=10, page_size="100"):
        photo_list = []
        params = {
            "pageSize": str(page_size)
        }
        # リクエストボディ
        query_filter = {
            "filters": {
                "mediaTypeFilter": {
                    "mediaTypes": [
                        "PHOTO"
                    ]
                }
            }
        }
        for page_index in range(page_num):
            logger.debug("{}番目のページを取得します".format(page_index))
            # リクエスト送信
            api_url = GooglePhotos.api_url.get("searchItems")
            response = self.post(api_url, params=params, data=json.dumps(query_filter))
            assert response.status_code == 200, "Response is not 200"
            res_json = response.json()
            # 画像情報だけ抜き出し
            media_items = res_json.get("mediaItems")
            photo_list.extend(media_items)
            # 次ページのトークンを取得・設定
            if "nextPageToken" in res_json:
                params["pageToken"] = res_json.get("nextPageToken")
            else:
                break
            # 過負荷を避けるため間隔を開けてAPIを叩く
            time.sleep(GooglePhotos.sleep_time)
        return photo_list

    def download_photo(self, photo_id, save_dir="./", add_datetime_header=False, overwrite=False):
        logger.debug("Downloading: {}".format(photo_id))
        response = self.get(GooglePhotos.api_url.get("mediaItem").format(photo_id))
        assert response.status_code == 200
        media_item_latest = response.json()
        # MediaItemから各種情報を取得
        base_url = media_item_latest.get("baseUrl")
        metadata = media_item_latest.get("mediaMetadata")
        filename = media_item_latest.get("filename")
        # ダウンロードURLを構成
        download_url = GooglePhotos.photo_size_format.format(
            base=base_url,
            width=metadata["width"],
            height=metadata["height"]
        )
        # 保存ファイルの作成
        if add_datetime_header:
            creation_time = datetime.strptime(metadata.get("creationTime"), "%Y-%m-%dT%H:%M:%SZ")
            header = creation_time.strftime("%Y%m%d_%H%M%S_")
            filename = header + filename
        write_path = Path(save_dir) / filename
        logger.debug("Saving to {}".format(write_path))
        # ダウンロード実行
        if overwrite or not write_path.exists():
            response = self.get(download_url)
            assert response.status_code == 200
        # 保存
        write_path.write_bytes(response.content)


def test():
    photos = GooglePhotos()
    photo_list = photos.get_photo_list(page_num=5)
    for photo_info in photo_list:
        photos.download_photo(photo_info.get("id"), save_dir="./photos", add_datetime_header=True, overwrite=False)
        # 過負荷を避けるため間隔を開けてAPIを叩く
        time.sleep(GooglePhotos.sleep_time)


if __name__ == "__main__":
    test()

使い方

photos = GooglePhotos()を実行する事で、ログイン済みのphotosオブジェクトを作れます。

get_photo_list関数で画像一覧を取得して、download_photo関数で画像をダウンロードできます。

get_photo_listは取得する画像情報のページ数と1ページ当たりの枚数を指定でき、例えばページ数5で1ページ当たり100枚だと500枚取得できます(1ページ当たりの最大数はAPIの仕様上100となっています)。

他にも色々と変えられますが、もうちょっと自分で弄ってみた後に改良してしっかりと公開しようと思います。

コメントを投稿

コメント