Created 2019年3月10日17:47
Updated 2019年3月10日17:49
Categories
Python
Google Photos API
旅行に行った時の写真が全てGoogle Photosにアップロードされていたのですが、数が多すぎるのでローカルに落としてじっくり選定したいと思っていました。
Google PhotosのAPIから画像をダウンロードする知見は少し前に得ていたので、この機会に簡単に使えるようにクラス化してまとめてみました。
ダウンロードしたcredentials.jsonをスクリプトと同じディレクトリに配置してください。
from pathlib import Path
from requests_oauthlib import OAuth2Session
import json
import logging
import time
from datetime import datetime, timedelta
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.DEBUG)
logger = logging.getLogger(__name__)
class GooglePhotos:
# APIのURLやスコープ
api_url = {
"test": "https://photoslibrary.googleapis.com/v1/mediaItems",
"searchItems": "https://photoslibrary.googleapis.com/v1/mediaItems:search",
"mediaItem": "https://photoslibrary.googleapis.com/v1/mediaItems/{}"
}
scope = ["https://www.googleapis.com/auth/photoslibrary.readonly"]
sleep_time = 10
photo_size_format = "{base}=w{width}-h{height}"
def __init__(self, token_path="token.json", credential_path="credentials.json"):
self.token_path = token_path
self.google_session, logged_in = self.login(credential_path)
# ログイン処理が行われていたらトークンを保存
# 本来自動保存だが動かないので追加
if logged_in:
self.save_token()
# 有効期限の過ぎたトークンをリフレッシュ
self.token_expires_at = datetime.fromtimestamp(self.google_session.token.get("expires_at"))
self.check_and_refresh_token()
# ログイン後に取得したトークンをtoken.jsonに保存
def save_token(self):
logger.debug("トークンを保存しています")
Path(self.token_path).write_text(json.dumps(self.google_session.token))
# token.jsonが存在したら読み込み
def load_token(self):
# 存在しない場合は期限切れのダミーを返す
token = {
"access_token": "",
"refresh_token": "",
"token_type": "",
"expires_in": "-30",
"expires_at": (datetime.now() - timedelta(hours=2)).timestamp()
}
path = Path(self.token_path)
if path.exists():
logger.debug("トークンをファイルから読み込んでいます")
token = json.loads(path.read_text())
return token
def check_and_refresh_token(self):
if datetime.now() + timedelta(minutes=10) > self.token_expires_at:
logger.debug("トークンの期限切れが近いため、更新を行います")
new_token = self.google_session.refresh_token(
self.google_session.auto_refresh_url,
**self.google_session.auto_refresh_kwargs
)
self.google_session.token = new_token
self.token_expires_at = datetime.fromtimestamp(self.google_session.token.get("expires_at"))
def get(self, *args, **kwargs):
self.check_and_refresh_token()
return self.google_session.get(*args, **kwargs)
def post(self, *args, **kwargs):
self.check_and_refresh_token()
return self.google_session.post(*args, **kwargs)
# ログインしてセッションオブジェクトを返す
def login(self, credential_path):
# 認証情報を読み込み
auth_info = json.loads(Path(credential_path).read_text()).get("installed", None)
assert auth_info is not None
# トークン読み込み
token = self.load_token()
# トークン更新用の認証情報
extras = {
"client_id": auth_info.get("client_id"),
"client_secret": auth_info.get("client_secret"),
}
# セッションオブジェクトを作成
# TODO: token_updaterの引数がたぶん合わない
google_session = OAuth2Session(
auth_info.get("client_id"),
scope=GooglePhotos.scope,
token=token,
auto_refresh_kwargs=extras,
token_updater=self.save_token,
auto_refresh_url=auth_info.get("token_uri"),
redirect_uri=auth_info.get("redirect_uris")[0]
)
# ログインしていない場合ログインを行う
logged_in = False
if not google_session.authorized:
logger.debug("ログインを行います")
authorization_url, state = google_session.authorization_url(
auth_info.get("auth_uri"),
access_type="offline",
prompt="select_account"
)
# 認証URLにアクセスしてコードをペースト
print("Access {} and paste code.".format(authorization_url))
access_code = input(">>> ")
google_session.fetch_token(
auth_info.get("token_uri"),
client_secret=auth_info.get("client_secret"),
code=access_code
)
assert google_session.authorized
logged_in = True
return google_session, logged_in
def get_photo_list(self, page_num=10, page_size="100"):
photo_list = []
params = {
"pageSize": str(page_size)
}
# リクエストボディ
query_filter = {
"filters": {
"mediaTypeFilter": {
"mediaTypes": [
"PHOTO"
]
}
}
}
for page_index in range(page_num):
logger.debug("{}番目のページを取得します".format(page_index))
# リクエスト送信
api_url = GooglePhotos.api_url.get("searchItems")
response = self.post(api_url, params=params, data=json.dumps(query_filter))
assert response.status_code == 200, "Response is not 200"
res_json = response.json()
# 画像情報だけ抜き出し
media_items = res_json.get("mediaItems")
photo_list.extend(media_items)
# 次ページのトークンを取得・設定
if "nextPageToken" in res_json:
params["pageToken"] = res_json.get("nextPageToken")
else:
break
# 過負荷を避けるため間隔を開けてAPIを叩く
time.sleep(GooglePhotos.sleep_time)
return photo_list
def download_photo(self, photo_id, save_dir="./", add_datetime_header=False, overwrite=False):
logger.debug("Downloading: {}".format(photo_id))
response = self.get(GooglePhotos.api_url.get("mediaItem").format(photo_id))
assert response.status_code == 200
media_item_latest = response.json()
# MediaItemから各種情報を取得
base_url = media_item_latest.get("baseUrl")
metadata = media_item_latest.get("mediaMetadata")
filename = media_item_latest.get("filename")
# ダウンロードURLを構成
download_url = GooglePhotos.photo_size_format.format(
base=base_url,
width=metadata["width"],
height=metadata["height"]
)
# 保存ファイルの作成
if add_datetime_header:
creation_time = datetime.strptime(metadata.get("creationTime"), "%Y-%m-%dT%H:%M:%SZ")
header = creation_time.strftime("%Y%m%d_%H%M%S_")
filename = header + filename
write_path = Path(save_dir) / filename
logger.debug("Saving to {}".format(write_path))
# ダウンロード実行
if overwrite or not write_path.exists():
response = self.get(download_url)
assert response.status_code == 200
# 保存
write_path.write_bytes(response.content)
def test():
photos = GooglePhotos()
photo_list = photos.get_photo_list(page_num=5)
for photo_info in photo_list:
photos.download_photo(photo_info.get("id"), save_dir="./photos", add_datetime_header=True, overwrite=False)
# 過負荷を避けるため間隔を開けてAPIを叩く
time.sleep(GooglePhotos.sleep_time)
if __name__ == "__main__":
test()
photos = GooglePhotos()
を実行する事で、ログイン済みのphotosオブジェクトを作れます。
get_photo_list
関数で画像一覧を取得して、download_photo
関数で画像をダウンロードできます。
get_photo_list
は取得する画像情報のページ数と1ページ当たりの枚数を指定でき、例えばページ数5で1ページ当たり100枚だと500枚取得できます(1ページ当たりの最大数はAPIの仕様上100となっています)。
他にも色々と変えられますが、もうちょっと自分で弄ってみた後に改良してしっかりと公開しようと思います。