はじめに
皆さんこんにちは!
仕事やプライベートで、Google Driveを活用されている方は多いのではないでしょうか?ファイルの保存や共有にとても便利ですよね。
でも、手作業でのファイル操作って、結構面倒だったりしませんか?特に、大量のファイルを扱ったり、定期的に同じ作業を繰り返したりする場合は、うんざりしてしまうこともあるでしょう。
私もデータアナリスト時代に大量のファイルを扱う必要があり、何とかならないものかとずっと考えていました。
そんな悩みを解決してくれるのが、PythonとPyDrive2ライブラリです!これらを組み合わせれば、Google Driveの操作を自動化して、面倒な作業から解放されちゃいましょう。
この記事では、私が実際に業務効率化のために作成したGoogleDriveAPI
クラスを題材に、PyDrive2の使い方を徹底解説します!
「APIってなんだか難しそう…」と不安に思っている方も、大丈夫!この記事を読めば、きっとPythonでGoogle Driveを自在に操れるようになりますよ。
さあ、一緒にGoogle Drive自動化の世界へ飛び込みましょう。
この記事で学べること
- PyDrive2ライブラリの概要とインストール方法
- Google Cloud Consoleでの設定 (OAuth同意画面など)
- Google Drive APIの認証方法
GoogleDriveAPI
クラスを使った、Google Driveの基本的な操作- フォルダ/ファイルIDの取得
- ファイルのアップロード/ダウンロード
- ファイルの削除
- フォルダ内のファイル一覧取得
- 特定のフォルダ内の最新ファイルの取得
- ファイル情報からpandas DataFrameを生成
GoogleDriveAPI
クラスを別のPythonファイルから呼び出して使う方法GoogleDriveAPI
クラスに新しいメソッドを追加する方法
GoogleDriveAPI
クラスって?
GoogleDriveAPI
クラスは、私がGoogle Drive APIを簡単に操作できるように作った、オリジナルのPythonクラスです。このクラスを使えば、Google Driveの様々な操作を、シンプルで分かりやすいコードで実行できます。
例えば、フォルダ内の最新のファイルをダウンロードしたい場合、GoogleDriveAPI
クラスを使えば、たった数行のコードで実現できます。
from Google_Drive_api import GoogleDriveAPI
# GoogleDriveAPI クラスのインスタンスを作成
drive = GoogleDriveAPI()
# フォルダIDを指定 (例: マイドライブのID)
folder_id = drive.get_item_id("マイドライブ", is_folder=True)
# 作成日が一番新しいファイルのIDを取得
file_id = drive.get_latest_file_id(folder_id)
# ファイルをダウンロード
if file_id:
drive.download_file(file_id=file_id)
else:
print("ファイルが見つかりませんでした。")
すごく簡単ですよね?
このブログ記事では、GoogleDriveAPI
クラスのコードを詳しく解説しながら、PyDrive2の使い方をマスターしていきます。
事前準備:PyDrive2をインストールしよう
まずは、PyDrive2ライブラリをインストールしましょう。pip
コマンドを使えば、簡単にインストールできます。
ターミナルまたはコマンドプロンプトを開いて、以下のコマンドを実行してください。
pip install pydrive2 pandas
このコマンドで、PyDrive2
とpandas
(ファイル一覧をデータフレームとして取得するために必要)がインストールされます。
事前準備:Google Cloud ConsoleでAPIを有効化しよう
次に、Google Cloud Consoleで、Google Drive APIを有効化し、認証情報を作成する必要があります。
ちょっと手順が複雑なので、ここでは概要だけ説明します。詳しくは、PyDrive2の公式ドキュメント や、「PythonでGoogle Drive APIを使う」 などの記事を参考にしてください。
- Google Cloud Consoleにアクセス: Google Cloud Console にアクセスし、Googleアカウントでログインします。
- プロジェクトの作成または選択: 画面上部の「プロジェクトを選択」から、新しいプロジェクトを作成するか、既存のプロジェクトを選択します。
- Google Drive APIの有効化: 左側のナビゲーションメニューから「APIとサービス」>「ライブラリ」を選択し、「Google Drive API」を検索して、「有効にする」をクリックします。
- OAuth同意画面の設定: 左側のメニューから「APIとサービス」>「OAuth同意画面」を選択し、必要な情報を入力して、同意画面を設定します。
- User Typeは用途に応じて
外部
か内部
を選択してください。個人利用の場合は通常外部
を選択すれば問題ありません。 - アプリケーション名は任意の名前(例:
PyDrive2 App
)を入力してください。 - 「承認済みドメイン」には、ご自身のドメイン(無い場合は適当なドメイン(例:
example.com
))を入力してください。 - 「保存して次へ」をクリックしてください。
- 次の「スコープ」は何も入力せずに「保存して次へ」をクリックしてください。
- 次の「テストユーザー」はご自身のGoogleアカウントのメールアドレスを入力して、「保存して次へ」をクリックしてください。
- User Typeは用途に応じて
- 認証情報の作成: 左側のメニューから「APIとサービス」>「認証情報」を選択し、「認証情報を作成」>「OAuthクライアントID」を選択します。
- 「アプリケーションの種類」で「デスクトップアプリ」を選択します。
- 「名前」は任意の名前(例:
PyDrive2 Desktop Client
)を入力してください。 - 「作成」をクリックします。
- 認証情報のダウンロード: 作成した認証情報の右側にあるダウンロードアイコンをクリックし、JSONファイルをダウンロードします。ダウンロードしたファイルを、
client_secrets.json
という名前で保存します。 settings.yaml
の作成: 以下の内容のsettings.yaml
という名前のファイルを作成し、client_secrets.json
と同じディレクトリに保存します。
client_config_backend: file
client_config_file: client_secrets.json
save_credentials: True
save_credentials_backend: file
save_credentials_file: saved_credentials.json
get_refresh_token: True
oauth_scope:
- https://www.googleapis.com/auth/drive
- https://www.googleapis.com/auth/drive.install
GoogleDriveAPI
クラスのコード解説
さあ、準備が整ったところで、いよいよGoogleDriveAPI
クラスのコードを見ていきましょう!
ディレクトリ構成
.
├── main.py
├── Google_Drive_api.py
├── client_secrets.json
├── saved_credentials.json
└── settings.yaml
main.py
が実行するスクリプトで、Google_Drive_api.py
がGoogleDriveAPI
クラスを定義しているモジュールです。 client_secrets.json
, saved_credentials.json
, settings.yaml
は上で説明した設定ファイルです。 これらはすべて同じディレクトリに配置してください。
Google_Drive_api.py
のコード
import os
import pandas as pd
from pydrive2.drive import GoogleDrive
from pydrive2.auth import GoogleAuth
from typing import List, Dict, Optional
class GoogleDriveAPI:
FOLDER_MIME_TYPE = 'application/vnd.google-apps.folder'
CSV_MIME_TYPE = 'text/csv'
def __init__(self) -> None:
"""
GoogleDriveAPIクラスのコンストラクタ
"""
self.gauth = GoogleAuth()
# 保存された認証情報を読み込む
self.gauth.LoadCredentialsFile("saved_credentials.json")
# 認証情報が存在しない、または無効な場合は新たに認証を行う
if self.gauth.credentials is None:
self.gauth.LocalWebserverAuth()
elif self.gauth.access_token_expired:
self.gauth.Refresh()
else:
self.gauth.Authorize()
# 認証情報を保存
self.gauth.SaveCredentialsFile("saved_credentials.json")
self.drive = GoogleDrive(self.gauth)
def get_item_id(self, name: str, parent_id: Optional[str] = None, is_folder: bool = False) -> Optional[str]:
"""
ファイルまたはフォルダのIDを取得
Args:
name (str): 取得したいファイルまたはフォルダの名前。
parent_id (str, optional): 親フォルダのID。指定しない場合はルートフォルダから検索。
is_folder (bool, optional): フォルダを取得する場合はTrue、ファイルを取得する場合はFalse。デフォルトはFalse。
Returns:
str or None: 見つかったファイルまたはフォルダのID。見つからない場合はNone。
"""
query = f"trashed = false and title='{name}'"
if is_folder:
query += f" and mimeType='{self.FOLDER_MIME_TYPE}'"
if parent_id:
query += f" and '{parent_id}' in parents"
item_list = self.drive.ListFile({'q': query}).GetList()
if not item_list:
return None
elif len(item_list) > 1:
print(f"警告: '{name}' に一致する項目が複数見つかりました。最初の項目を返します。")
return item_list[0]['id']
def get_latest_file_id(self, folder_id: str) -> Optional[str]:
"""
フォルダ内の最新ファイルのIDを取得
Args:
folder_id (str): フォルダのID。
Returns:
str or None: 最新ファイルのID。ファイルが存在しない場合はNone。
"""
query = f"trashed = false and '{folder_id}' in parents and mimeType != '{self.FOLDER_MIME_TYPE}'"
file_list = self.drive.ListFile({'q': query}).GetList()
if not file_list:
return None
return max(file_list, key=lambda f: f['createdDate'])['id']
def _get_files_info_from_folder(self, folder_id: Optional[str]) -> List[Dict]:
"""
フォルダ内のファイル情報を取得するヘルパー関数
Args:
folder_id (str): フォルダのID
Returns:
List[Dict]: ファイル情報のリスト
"""
if not folder_id:
return []
return self.drive.ListFile({'q': f"'{folder_id}' in parents"}).GetList()
def get_all_file_ids(self, folder_name: str) -> List[str]:
"""
フォルダ内のすべてのファイルIDを取得
Args:
folder_name (str): フォルダ名。
Returns:
List[str]: ファイルIDのリスト。
"""
folder_id = self.get_item_id(folder_name, is_folder=True)
file_list = self._get_files_info_from_folder(folder_id)
return [file['id'] for file in file_list] if file_list else []
def get_file_df(self, folder_name: str) -> pd.DataFrame:
"""
フォルダ内のファイル情報をDataFrameとして取得
Args:
folder_name (str): フォルダ名。
Returns:
pd.DataFrame: ファイル情報を含むDataFrame。
"""
folder_id = self.get_item_id(folder_name, is_folder=True)
file_list = self._get_files_info_from_folder(folder_id)
return pd.DataFrame([(file['title'], file['createdDate'], file['id']) for file in file_list],
columns=['title', 'createdDate', 'id']) if file_list else pd.DataFrame()
def get_created_at(self, folder_name: str) -> Dict[str, str]:
"""
ファイルIDと作成日時の辞書を取得
Args:
folder_name (str): フォルダ名。
Returns:
Dict[str, str]: ファイルIDをキー、作成日時を値とする辞書。
"""
folder_id = self.get_item_id(folder_name, is_folder=True)
file_list = self._get_files_info_from_folder(folder_id)
return {file['id']: file['createdDate'] for file in file_list} if file_list else {}
def upload_file(self, folder_name: str, content_file: str, file_name: str = 'no_name.csv') -> None:
"""
ファイルをGoogle Driveにアップロード
Args:
folder_name (str): アップロード先のフォルダ名。
content_file (str): アップロードするファイルのパス。
file_name (str, optional): アップロードするファイルの名前。デフォルトは 'no_name.csv'。
"""
folder_id = self.get_item_id(folder_name, is_folder=True)
if not folder_id:
print(f"フォルダ '{folder_name}' が見つかりません")
return
metadata = {
'parents': [{"id": folder_id}],
'title': file_name,
'mimeType': self.CSV_MIME_TYPE
}
f = self.drive.CreateFile(metadata=metadata)
f.SetContentFile(content_file)
try:
f.Upload()
print(f"ファイル '{file_name}' が正常にアップロードされました")
except Exception as e:
print(f"ファイルのアップロード中にエラーが発生しました: {e}")
finally:
f.content.close() # ファイルを確実に閉じる
def download_file(self, file_id: Optional[str] = None, folder_name: Optional[str] = None, file_name: Optional[str] = None, target_dir: str = "") -> None:
"""
IDまたは名前でファイルをダウンロード
Args:
file_id (str, optional): ダウンロードするファイルのID。指定しない場合は、folder_nameとfile_nameから検索。
folder_name (str, optional): ファイルが存在するフォルダ名。file_idが指定されていない場合に必要。
file_name (str, optional): ダウンロードするファイル名。file_idが指定されていない場合に必要。
target_dir (str, optional): ダウンロードしたファイルを保存するディレクトリ。デフォルトはカレントディレクトリ。
"""
if not file_id:
if not folder_name or not file_name:
print("file_idまたはfolder_nameとfile_nameの両方を指定する必要があります")
return
folder_id = self.get_item_id(folder_name, is_folder=True)
if not folder_id:
print(f"フォルダ '{folder_name}' が見つかりません")
return
file_id = self.get_item_id(file_name, parent_id=folder_id)
if not file_id:
print(f"ファイルが見つかりません")
return
try:
f = self.drive.CreateFile({'id': file_id})
f.FetchMetadata()
if f.metadata.get('trashed', False):
print("ファイルはゴミ箱にあります")
return
output_name = file_name or f['title']
f.GetContentFile(os.path.join(target_dir, output_name))
print(f"ファイル '{output_name}' が正常にダウンロードされました")
except Exception as e:
print(f"ファイルのダウンロード中にエラーが発生しました: {e}")
def delete_file(self, file_id: Optional[str] = None, folder_name: Optional[str] = None, file_name: Optional[str] = None) -> None:
"""
IDまたは名前でファイルを削除
Args:
file_id (str, optional): 削除するファイルのID。指定しない場合は、folder_nameとfile_nameから検索。
folder_name (str, optional): ファイルが存在するフォルダ名。file_idが指定されていない場合に必要。
file_name (str, optional): 削除するファイル名。file_idが指定されていない場合に必要。
"""
if not file_id:
if not folder_name or not file_name:
print("file_idまたはfolder_nameとfile_nameの両方を指定する必要があります")
return
folder_id = self.get_item_id(folder_name, is_folder=True)
if not folder_id:
print(f"フォルダ '{folder_name}' が見つかりません")
return
file_id = self.get_item_id(file_name, parent_id=folder_id)
if not file_id:
print(f"ファイルが見つかりません")
return
try:
f = self.drive.CreateFile({'id': file_id})
f.Delete()
print(f"ID '{file_id}' のファイルが正常に削除されました")
except Exception as e:
print(f"エラーが発生しました: {e}")
def list_files_in_folder(self, folder_name: str) -> None:
"""
フォルダ内のすべてのファイルを一覧表示
Args:
フォルダ内のすべてのファイルを一覧表示
Args:
folder_name (str): フォルダ名。
"""
folder_id = self.get_item_id(folder_name, is_folder=True)
if not folder_id:
print(f"指定されたフォルダ'{folder_name}'が見つかりません")
return
file_list = self.drive.ListFile({'q': f"'{folder_id}' in parents"}).GetList()
if not file_list:
print("フォルダ内にファイルが見つかりません")
return
for file in file_list:
print(f"タイトル: {file['title']}, ID: {file['id']}")
def create_folder(self, folder_name: str, parent_folder_id: str = 'root') -> None:
"""
Google Drive上にフォルダを作成する
Args:
folder_name (str): 作成するフォルダ名
parent_folder_id (str): 親フォルダのID。デフォルトは'root'(ルートフォルダ)
"""
folder_metadata = {
'title': folder_name,
'mimeType': self.FOLDER_MIME_TYPE,
'parents': [{'id': parent_folder_id}]
}
folder = self.drive.CreateFile(folder_metadata)
folder.Upload()
print(f"フォルダ '{folder_name}' (ID: {folder['id']}) が作成されました")
コードの解説
ここからはコードを解説します。
chatGPTも使っているので、ちょっと日本語が変なところがありますが、気にしないでください。
クラス変数
FOLDER_MIME_TYPE = 'application/vnd.google-apps.folder'
CSV_MIME_TYPE = 'text/csv'
FOLDER_MIME_TYPE
: フォルダのMIMEタイプを定義しています。Google Drive APIではフォルダを特定するためにこのMIMEタイプを使用します。CSV_MIME_TYPE
: CSVファイルのMIMEタイプを定義しています。アップロード時にファイルの種別を指定するために使用します。
__init__
メソッド (コンストラクタ)
def __init__(self) -> None:
"""
GoogleDriveAPIクラスのコンストラクタ
"""
self.gauth = GoogleAuth()
# 保存された認証情報を読み込む
self.gauth.LoadCredentialsFile("saved_credentials.json")
# 認証情報が存在しない、または無効な場合は新たに認証を行う
if self.gauth.credentials is None:
self.gauth.LocalWebserverAuth()
elif self.gauth.access_token_expired:
self.gauth.Refresh()
else:
self.gauth.Authorize()
# 認証情報を保存
self.gauth.SaveCredentialsFile("saved_credentials.json")
self.drive = GoogleDrive(self.gauth)
self.gauth = GoogleAuth()
:GoogleAuth
オブジェクトを生成します。このオブジェクトを使って認証処理を行います。self.gauth.LoadCredentialsFile("saved_credentials.json")
: 以前に認証した際に保存された認証情報をsaved_credentials.json
ファイルから読み込みます。- 認証情報の確認と更新:
if self.gauth.credentials is None:
: 認証情報が全く存在しない場合、self.gauth.LocalWebserverAuth()
を実行して、ブラウザ経由での認証プロセスを開始します。elif self.gauth.access_token_expired:
: 認証情報は存在するが、アクセストークンの有効期限が切れている場合、self.gauth.Refresh()
を実行してトークンを更新します。else:
: 認証情報が存在し、アクセストークンも有効な場合、self.gauth.Authorize()
で認証を完了します。
self.gauth.SaveCredentialsFile("saved_credentials.json")
: 更新された認証情報をsaved_credentials.json
に保存します。self.drive = GoogleDrive(self.gauth)
: 認証済みのGoogleAuth
オブジェクトを使ってGoogleDrive
オブジェクトを生成します。このオブジェクトを使ってGoogle Drive APIを操作します。
get_item_id
メソッド
def get_item_id(self, name: str, parent_id: Optional[str] = None, is_folder: bool = False) -> Optional[str]:
"""
ファイルまたはフォルダのIDを取得
Args:
name (str): 取得したいファイルまたはフォルダの名前。
parent_id (str, optional): 親フォルダのID。指定しない場合はルートフォルダから検索。
is_folder (bool, optional): フォルダを取得する場合はTrue、ファイルを取得する場合はFalse。デフォルトはFalse。
Returns:
str or None: 見つかったファイルまたはフォルダのID。見つからない場合はNone。
"""
query = f"trashed = false and title='{name}'"
if is_folder:
query += f" and mimeType='{self.FOLDER_MIME_TYPE}'"
if parent_id:
query += f" and '{parent_id}' in parents"
item_list = self.drive.ListFile({'q': query}).GetList()
if not item_list:
return None
elif len(item_list) > 1:
print(f"警告: '{name}' に一致する項目が複数見つかりました。最初の項目を返します。")
return item_list[0]['id']
query = f"trashed = false and title='{name}'"
: Google Drive APIに送信する検索クエリのベース部分を作成します。trashed = false
: ゴミ箱にないアイテムを対象とします。title='{name}'
: 指定された名前と一致するアイテムを検索します。
if is_folder: query += f" and mimeType='{self.FOLDER_MIME_TYPE}'"
:is_folder
がTrue
の場合、フォルダのみを対象とするようにクエリを拡張します。if parent_id: query += f" and '{parent_id}' in parents"
:parent_id
が指定されている場合、指定された親フォルダ内に限定するようにクエリを拡張します。item_list = self.drive.ListFile({'q': query}).GetList()
: 構築したクエリを使ってListFile
メソッドを実行し、条件に一致するアイテムのリストを取得します。if not item_list: return None
: 一致するアイテムが見つからない場合はNone
を返します。elif len(item_list) > 1: print(f"警告: '{name}' に一致する項目が複数見つかりました。最初の項目を返します。")
: 一致するアイテムが複数見つかった場合は、警告メッセージを表示します。return item_list[0]['id']
: 最初のアイテムのIDを返します。
get_latest_file_id
メソッド
def get_latest_file_id(self, folder_id: str) -> Optional[str]:
"""
フォルダ内の最新ファイルのIDを取得
Args:
folder_id (str): フォルダのID。
Returns:
str or None: 最新ファイルのID。ファイルが存在しない場合はNone。
"""
query = f"trashed = false and '{folder_id}' in parents and mimeType != '{self.FOLDER_MIME_TYPE}'"
file_list = self.drive.ListFile({'q': query}).GetList()
if not file_list:
return None
return max(file_list, key=lambda f: f['createdDate'])['id']
query = f"trashed = false and '{folder_id}' in parents and mimeType != '{self.FOLDER_MIME_TYPE}'"
: 検索クエリを作成します。trashed = false
: ゴミ箱にないアイテムを対象とします。'{folder_id}' in parents
: 指定されたフォルダ内のアイテムを対象とします。mimeType != '{self.FOLDER_MIME_TYPE}'
: フォルダを除外し、ファイルのみを対象とします。
file_list = self.drive.ListFile({'q': query}).GetList()
: クエリを実行して、条件に一致するファイルのリストを取得します。if not file_list: return None
: ファイルが見つからない場合はNone
を返します。return max(file_list, key=lambda f: f['createdDate'])['id']
:createdDate
(作成日時)をキーとして、最も新しいファイルのIDを返します。
_get_files_info_from_folder
メソッド
def _get_files_info_from_folder(self, folder_id: Optional[str]) -> List[Dict]:
"""
フォルダ内のファイル情報を取得するヘルパー関数
Args:
folder_id (str): フォルダのID
Returns:
List[Dict]: ファイル情報のリスト
"""
if not folder_id:
return []
return self.drive.ListFile({'q': f"'{folder_id}' in parents"}).GetList()
if not folder_id: return []
:folder_id
がNone
または空文字列の場合は、空のリストを返します。return self.drive.ListFile({'q': f"'{folder_id}' in parents"}).GetList()
: 指定されたフォルダ内のすべてのファイル情報をリストとして返します。
get_all_file_ids
メソッド
def get_all_file_ids(self, folder_name: str) -> List[str]:
"""
フォルダ内のすべてのファイルIDを取得
Args:
folder_name (str): フォルダ名。
Returns:
List[str]: ファイルIDのリスト。
"""
folder_id = self.get_item_id(folder_name, is_folder=True)
file_list = self._get_files_info_from_folder(folder_id)
return [file['id'] for file in file_list] if file_list else []
folder_id = self.get_item_id(folder_name, is_folder=True)
:get_item_id
メソッドを使って、指定されたフォルダ名のフォルダIDを取得します。file_list = self._get_files_info_from_folder(folder_id)
:_get_files_info_from_folder
ヘルパー関数を使って、フォルダ内のファイル情報リストを取得します。return [file['id'] for file in file_list] if file_list else []
: ファイル情報リストからファイルIDを抽出し、リストとして返します。ファイルが存在しない場合は空のリストを返します。
get_file_df
メソッド
def get_file_df(self, folder_name: str) -> pd.DataFrame:
"""
フォルダ内のファイル情報をDataFrameとして取得
Args:
folder_name (str): フォルダ名。
Returns:
pd.DataFrame: ファイル情報を含むDataFrame。
"""
folder_id = self.get_item_id(folder_name, is_folder=True)
file_list = self._get_files_info_from_folder(folder_id)
return pd.DataFrame([(file['title'], file['createdDate'], file['id']) for file in file_list],
columns=['title', 'createdDate', 'id']) if file_list else pd.DataFrame()
folder_id = self.get_item_id(folder_name, is_folder=True)
:get_item_id
メソッドを使って、指定されたフォルダ名のフォルダIDを取得します。file_list = self._get_files_info_from_folder(folder_id)
:_get_files_info_from_folder
ヘルパー関数を使って、フォルダ内のファイル情報リストを取得します。return pd.DataFrame(...) if file_list else pd.DataFrame()
: ファイル情報リストから、ファイル名、作成日時、IDを抽出してpandas
のDataFrame
オブジェクトを作成して返します。ファイルが存在しない場合は空のDataFrame
を返します。
get_created_at
メソッド
def get_created_at(self, folder_name: str) -> Dict[str, str]:
"""
ファイルIDと作成日時の辞書を取得
Args:
folder_name (str): フォルダ名。
Returns:
Dict[str, str]: ファイルIDをキー、作成日時を値とする辞書。
"""
folder_id = self.get_item_id(folder_name, is_folder=True)
file_list = self._get_files_info_from_folder(folder_id)
return {file['id']: file['createdDate'] for file in file_list} if file_list else {}
folder_id = self.get_item_id(folder_name, is_folder=True)
:get_item_id
メソッドを使って、指定されたフォルダ名のフォルダIDを取得します。file_list = self._get_files_info_from_folder(folder_id)
:_get_files_info_from_folder
ヘルパー関数を使って、フォルダ内のファイル情報リストを取得します。return {file['id']: file['createdDate'] for file in file_list} if file_list else {}
: ファイル情報リストから、ファイルIDと作成日時を抽出して辞書として返します。ファイルが存在しない場合は空の辞書を返します。
upload_file
メソッド
def upload_file(self, folder_name: str, content_file: str, file_name: str = 'no_name.csv') -> None:
"""
ファイルをGoogle Driveにアップロード
Args:
folder_name (str): アップロード先のフォルダ名。
content_file (str): アップロードするファイルのパス。
file_name (str, optional): アップロードするファイルの名前。デフォルトは 'no_name.csv'。
"""
folder_id = self.get_item_id(folder_name, is_folder=True)
if not folder_id:
print(f"フォルダ '{folder_name}' が見つかりません")
return
metadata = {
'parents': [{"id": folder_id}],
'title': file_name,
'mimeType': self.CSV_MIME_TYPE
}
f = self.drive.CreateFile(metadata=metadata)
f.SetContentFile(content_file)
try:
f.Upload()
print(f"ファイル '{file_name}' が正常にアップロードされました")
except Exception as e:
print(f"ファイルのアップロード中にエラーが発生しました: {e}")
finally:
f.content.close() # ファイルを確実に閉じる
folder_id = self.get_item_id(folder_name, is_folder=True)
:get_item_id
メソッドを使って、アップロード先のフォルダIDを取得します。if not folder_id: ...
: フォルダIDが取得できない場合、エラーメッセージを表示して関数を終了します。metadata = {...}
: アップロードするファイルのメタデータを設定します。parents
: アップロード先のフォルダIDを指定します。title
: アップロードするファイルの名前を指定します。mimeType
: ファイルのMIMEタイプをCSV_MIME_TYPE
に設定します。
f = self.drive.CreateFile(metadata=metadata)
: メタデータを使ってGoogleDriveFile
オブジェクトを作成します。f.SetContentFile(content_file)
: アップロードするローカルファイルのパスを指定します。try...except...finally
: ファイルのアップロード処理中にエラーが発生した場合の例外処理と、正常終了/エラー発生に関わらずファイルを閉じる処理を記述しています。f.Upload()
: ファイルをアップロードします。print(...)
: アップロード成功またはエラー発生のメッセージを表示します。f.content.close()
: ローカルファイルをクローズします。
download_file
メソッド
def download_file(self, file_id: Optional[str] = None, folder_name: Optional[str] = None, file_name: Optional[str] = None, target_dir: str = "") -> None:
"""
IDまたは名前でファイルをダウンロード
Args:
file_id (str, optional): ダウンロードするファイルのID。指定しない場合は、folder_nameとfile_nameから検索。
folder_name (str, optional): ファイルが存在するフォルダ名。file_idが指定されていない場合に必要。
file_name (str, optional): ダウンロードするファイル名。file_idが指定されていない場合に必要。
target_dir (str, optional): ダウンロードしたファイルを保存するディレクトリ。デフォルトはカレントディレクトリ。
"""
if not file_id:
if not folder_name or not file_name:
print("file_idまたはfolder_nameとfile_nameの両方を指定する必要があります")
return
folder_id = self.get_item_id(folder_name, is_folder=True)
if not folder_id:
print(f"フォルダ '{folder_name}' が見つかりません")
return
file_id = self.get_item_id(file_name, parent_id=folder_id)
if not file_id:
print(f"ファイルが見つかりません")
return
try:
f = self.drive.CreateFile({'id': file_id})
f.FetchMetadata()
if f.metadata.get('trashed', False):
print("ファイルはゴミ箱にあります")
return
output_name = file_name or f['title']
f.GetContentFile(os.path.join(target_dir, output_name))
print(f"ファイル '{output_name}' が正常にダウンロードされました")
except Exception as e:
print(f"ファイルのダウンロード中にエラーが発生しました: {e}")
file_id
,folder_name
,file_name
: ダウンロードするファイルを特定するための引数です。file_id
を直接指定するか、folder_name
とfile_name
を組み合わせて指定します。target_dir
: ダウンロードしたファイルを保存するローカルディレクトリを指定します。デフォルトはカレントディレクトリです。if not file_id:
:file_id
が指定されていない場合は、folder_name
とfile_name
からget_item_id
メソッドを使ってファイルIDを取得します。if not folder_name or not file_name: ...
: 必要な情報が不足している場合はエラーメッセージを表示して関数を終了します。folder_id = self.get_item_id(...)
: フォルダIDを取得します。if not folder_id: ...
: フォルダが見つからない場合はエラーメッセージを表示して関数を終了します。file_id = self.get_item_id(...)
: ファイルIDを取得します。
if not file_id: ...
: ファイルが見つからない場合はエラーメッセージを表示して関数を終了します。try...except
: ファイルのダウンロード処理中にエラーが発生した場合の例外処理を記述しています。f = self.drive.CreateFile({'id': file_id})
: 指定されたファイルIDを使ってGoogleDriveFile
オブジェクトを作成します。f.FetchMetadata()
: ファイルのメタデータを取得します。if f.metadata.get('trashed', False): ...
: ファイルがゴミ箱にあるかどうかを確認し、ゴミ箱にある場合はエラーメッセージを表示して関数を終了します。output_name = file_name or f['title']
: ダウンロードするファイルの名前を決定します。file_name
が指定されている場合はその名前を使い、そうでない場合はGoogle Drive上のファイル名を使います。f.GetContentFile(os.path.join(target_dir, output_name))
: ファイルをダウンロードし、指定されたパスに保存します。print(...)
: ダウンロード成功またはエラー発生のメッセージを表示します。
delete_file
メソッド
def delete_file(self, file_id: Optional[str] = None, folder_name: Optional[str] = None, file_name: Optional[str] = None) -> None:
"""
IDまたは名前でファイルを削除
Args:
file_id (str, optional): 削除するファイルのID。指定しない場合は、folder_nameとfile_nameから検索。
folder_name (str, optional): ファイルが存在するフォルダ名。file_idが指定されていない場合に必要。
file_name (str, optional): 削除するファイル名。file_idが指定されていない場合に必要。
"""
if not file_id:
if not folder_name or not file_name:
print("file_idまたはfolder_nameとfile_nameの両方を指定する必要があります")
return
folder_id = self.get_item_id(folder_name, is_folder=True)
if not folder_id:
print(f"フォルダ '{folder_name}' が見つかりません")
return
file_id = self.get_item_id(file_name, parent_id=folder_id)
if not file_id:
print(f"ファイルが見つかりません")
return
try:
f = self.drive.CreateFile({'id': file_id})
f.Delete()
print(f"ID '{file_id}' のファイルが正常に削除されました")
except Exception as e:
print(f"エラーが発生しました: {e}")
file_id
,folder_name
,file_name
: 削除するファイルを特定するための引数です。file_id
を直接指定するか、folder_name
とfile_name
を組み合わせて指定します。if not file_id:
:file_id
が指定されていない場合は、folder_name
とfile_name
からget_item_id
メソッドを使ってファイルIDを取得します。if not folder_name or not file_name: ...
: 必要な情報が不足している場合はエラーメッセージを表示して関数を終了します。folder_id = self.get_item_id(...)
: フォルダIDを取得します。if not folder_id: ...
: フォルダが見つからない場合はエラーメッセージを表示して関数を終了します。file_id = self.get_item_id(...)
: ファイルIDを取得します。
if not file_id: ...
: ファイルが見つからない場合はエラーメッセージを表示して関数を終了します。try...except
: ファイルの削除処理中にエラーが発生した場合の例外処理を記述しています。f = self.drive.CreateFile({'id': file_id})
: 指定されたファイルIDを使ってGoogleDriveFile
オブジェクトを作成します。f.Delete()
: ファイルを削除します。print(...)
: 削除成功またはエラー発生のメッセージを表示します。
list_files_in_folder
メソッド
def list_files_in_folder(self, folder_name: str) -> None:
"""
フォルダ内のすべてのファイルを一覧表示
Args:
folder_name (str): フォルダ名。
"""
folder_id = self.get_item_id(folder_name, is_folder=True)
if not folder_id:
print(f"指定されたフォルダ'{folder_name}'が見つかりません")
return
file_list = self.drive.ListFile({'q': f"'{folder_id}' in parents"}).GetList()
if not file_list:
print("フォルダ内にファイルが見つかりません")
return
for file in file_list:
print(f"タイトル: {file['title']}, ID: {file['id']}")
folder_id = self.get_item_id(folder_name, is_folder=True)
:get_item_id
メソッドを使って、指定されたフォルダ名のフォルダIDを取得します。if not folder_id: ...
: フォルダIDが取得できない場合、エラーメッセージを表示して関数を終了します。file_list = self.drive.ListFile({'q': f"'{folder_id}' in parents"}).GetList()
: 指定されたフォルダIDを親に持つすべてのファイル情報を取得します。if not file_list: ...
: ファイルリストが空の場合、メッセージを表示して関数を終了します。for file in file_list: ...
: 取得したファイルリストをループ処理し、各ファイルのタイトルとIDを表示します。
create_folder
メソッド
def create_folder(self, folder_name: str, parent_folder_id: str = 'root') -> None:
"""
Google Drive上にフォルダを作成する
Args:
folder_name (str): 作成するフォルダ名
parent_folder_id (str): 親フォルダのID。デフォルトは'root'(ルートフォルダ)
"""
folder_metadata = {
'title': folder_name,
'mimeType': self.FOLDER_MIME_TYPE,
'parents': [{'id': parent_folder_id}]
}
folder = self.drive.CreateFile(folder_metadata)
folder.Upload()
print(f"フォルダ '{folder_name}' (ID: {folder['id']}) が作成されました")
folder_metadata = {...}
: 作成するフォルダのメタデータを設定します。title
: フォルダ名を指定します。mimeType
: フォルダのMIMEタイプを指定します。parents
: 親フォルダのIDを指定します。
folder = self.drive.CreateFile(folder_metadata)
: メタデータを使ってGoogleDriveFile
オブジェクトを作成します。folder.Upload()
: フォルダをGoogle Driveにアップロード(作成)します。print(...)
: フォルダ作成成功のメッセージを表示します。
GoogleDriveAPI
クラスの使用例
それでは、実際にGoogleDriveAPI
クラスを使って、Google Driveを操作してみましょう!
モジュールとして使う
まず、Google_Drive_api.py
ファイルと同じディレクトリに、新しいPythonファイル(例えば main.py
)を作成します。
main.py
に、以下のようにコードを記述します。
from Google_Drive_api import GoogleDriveAPI
# GoogleDriveAPI クラスのインスタンスを作成
drive = GoogleDriveAPI()
# フォルダ名を指定 (例: マイドライブ直下)
folder_name = "マイドライブ"
# マイドライブのフォルダIDを取得
folder_id = drive.get_item_id(folder_name, is_folder=True)
# マイドライブ内のファイル一覧を表示
if folder_id:
drive.list_files_in_folder(folder_name)
# マイドライブ内のファイル情報をDataFrameとして取得
df = drive.get_file_df(folder_name)
print(df)
else:
print(f"フォルダ '{folder_name}' が見つかりませんでした。")
このコードを実行すると、マイドライブ
フォルダ内のファイル一覧と、ファイル情報を格納した pandas の DataFrame が表示されます。
以下、main.py
で実行できる他の操作の例を紹介します。
例1: フォルダ内の最新のファイルをダウンロードする
from Google_Drive_api import GoogleDriveAPI
# GoogleDriveAPI クラスのインスタンスを作成
drive = GoogleDriveAPI()
# フォルダ名を指定
folder_name = "MyFolder"
# フォルダIDを取得
folder_id = drive.get_item_id(folder_name, is_folder=True)
# 最新のファイルIDを取得
if folder_id:
latest_file_id = drive.get_latest_file_id(folder_id)
# ファイルをダウンロード
if latest_file_id:
drive.download_file(file_id=latest_file_id)
else:
print(f"フォルダ '{folder_name}' にファイルが見つかりません")
else:
print(f"フォルダ '{folder_name}' が見つかりませんでした。")
このコードを実行すると、MyFolder
フォルダ内の最新のファイルが、main.py
と同じディレクトリにダウンロードされます。
例2: ファイルをアップロードする
from Google_Drive_api import GoogleDriveAPI
# GoogleDriveAPI クラスのインスタンスを作成
drive = GoogleDriveAPI()
# フォルダ名を指定
folder_name = "MyFolder"
# アップロードするファイルのパスを指定
file_path = "upload.txt"
# ファイルをアップロード (ファイル名を指定)
drive.upload_file(folder_name, file_path, file_name="uploaded_file.txt")
このコードを実行すると、upload.txt
というファイルが、uploaded_file.txt
という名前でMyFolder
フォルダにアップロードされます。upload.txt
はmain.py
と同じディレクトリに置いてください。
使用例3: 特定フォルダ内のCSVファイルを読み込んでデータ処理、結果を別フォルダへアップロードする
使用例2までは単純なケースだったので、もう少し実践的なケースを見てみます。
業務でよくあるケースとして、Google Drive上の特定のフォルダに格納されたCSVファイルのデータを読み込み、何らかの処理を施した後、結果を別のCSVファイルとして指定のフォルダにアップロードする、というシナリオを想定してみましょう。
例えば、InputData
フォルダ内に複数のCSVファイル(例:data_20231026.csv
, data_20231027.csv
)が存在し、これらのファイルには売上データが格納されているとします。
このデータを読み込んで、日別の売上合計を計算し、OutputData
フォルダに daily_sales.csv
というファイル名でアップロードする処理を考えます。
from Google_Drive_api import GoogleDriveAPI
import pandas as pd
import os
# GoogleDriveAPI クラスのインスタンスを作成
drive = GoogleDriveAPI()
# 入力フォルダと出力フォルダの名前を指定
input_folder_name = "InputData"
output_folder_name = "OutputData"
# 入力フォルダのIDを取得
input_folder_id = drive.get_item_id(input_folder_name, is_folder=True)
# 入力フォルダ内のCSVファイル一覧を取得
file_list = drive.drive.ListFile({'q': f"'{input_folder_id}' in parents and trashed=false and mimeType='text/csv'"}).GetList()
# 各CSVファイルを読み込んでデータを結合
all_data = pd.DataFrame()
for file in file_list:
# ファイルをダウンロードして、pandas DataFrameとして読み込む
file_content = drive.drive.CreateFile({'id': file['id']})
file_content.GetContentFile(file_content['title'], mimetype='text/csv')
df = pd.read_csv(file_content['title'])
all_data = pd.concat([all_data, df])
# ダウンロードしたファイルを削除
os.remove(file_content['title'])
# 日別の売上合計を計算 (例として、'Date'列と'Sales'列があると仮定)
all_data['Date'] = pd.to_datetime(all_data['Date'])
daily_sales = all_data.groupby('Date')['Sales'].sum().reset_index()
# 結果をCSVファイルとして出力
output_file_name = 'daily_sales.csv'
output_file_path = f'./{output_file_name}' # 一時的にローカルに保存
daily_sales.to_csv(output_file_path, index=False)
# 結果をGoogle Driveの出力フォルダにアップロード
drive.upload_file(output_folder_name, output_file_path, file_name=output_file_name)
# 一時的に保存したローカルのファイルを削除
os.remove(output_file_path)
print(f"日別売上データを {output_folder_name} フォルダに {output_file_name} としてアップロードしました。")
コードの解説
- フォルダとファイル名の指定:
input_folder_name
とoutput_folder_name
で処理対象のフォルダ名を指定します。 - 入力フォルダ内のCSVファイル一覧取得:
drive.drive.ListFile()
とクエリを使って、指定フォルダ内のCSVファイルのみを取得します。'q': f"'{input_folder_id}' in parents and trashed=false and mimeType='text/csv'"
は、input_folder_id
を親に持ち、ゴミ箱になく、MIMEタイプがtext/csv
のファイルのみを対象とするクエリです。
- ファイルのダウンロードとデータ結合:
for
ループで各CSVファイルを処理します。file_content.GetContentFile(file_content['title'], mimetype='text/csv')
でファイルをダウンロードします。ダウンロードしたファイルはfile_content['title']
にローカルパスが保存されています。pd.read_csv()
でCSVファイルをpandas
DataFrame として読み込みます。pd.concat()
で全てのDataFrameを結合し、all_data
に格納します。os.remove(file_content['title'])
で、ダウンロードしたファイルを削除します。
- データ処理:
all_data['Date'] = pd.to_datetime(all_data['Date'])
でDate
列を日付型に変換します。daily_sales = all_data.groupby('Date')['Sales'].sum().reset_index()
で、日付ごとのSales
列の合計を計算し、新しいDataFramedaily_sales
に格納します。
- 結果のアップロード:
daily_sales.to_csv(output_file_path, index=False)
で処理結果のデータフレームをローカルファイルに保存します。drive.upload_file()
でローカルに保存されたファイルを Google Drive のOutputData
フォルダにアップロードします。os.remove(output_file_path)
で、一時的に作成したローカルファイルを削除します。
この例では、Google Drive上のファイルを直接操作するのではなく、一度ローカルにダウンロードしてから処理し、再度アップロードしています。 これは、pandas
のようなデータ処理ライブラリがGoogle Drive上のファイルを直接扱えないためです。
使用例4: 定期レポートの自動生成と共有フォルダへのアップロード
特定のフォルダに格納されているデータを使って定期的にレポートを生成し、それを特定の共有フォルダに自動的にアップロードする例を紹介します。
例えば、Reports
フォルダに各月の売上データ(例:sales_202310.csv
, sales_202311.csv
)が保存されているとします。
from Google_Drive_api import GoogleDriveAPI
import pandas as pd
import os
import calendar
# GoogleDriveAPI クラスのインスタンスを作成
drive = GoogleDriveAPI()
# レポート元データフォルダと共有フォルダの名前を指定
reports_folder_name = "Reports"
shared_folder_name = "Shared Reports"
# 各フォルダのIDを取得
reports_folder_id = drive.get_item_id(reports_folder_name, is_folder=True)
shared_folder_id = drive.get_item_id(shared_folder_name, is_folder=True)
# レポート元データフォルダ内のCSVファイル一覧を取得
file_list = drive.drive.ListFile({'q': f"'{reports_folder_id}' in parents and trashed=false and mimeType='text/csv'"}).GetList()
# 各月のレポートを生成
for file in file_list:
# ファイル名から対象年月を取得 (例: sales_202310.csv -> 2023, 10)
year, month = map(int, file['title'].split('_')[1].split('.')[0][:6])
month_name = calendar.month_name[month]
# ファイルをダウンロードして、pandas DataFrameとして読み込む
file_content = drive.drive.CreateFile({'id': file['id']})
file_content.GetContentFile(file_content['title'], mimetype='text/csv')
df = pd.read_csv(file_content['title'])
# ダウンロードしたファイルを削除
os.remove(file_content['title'])
# 月次レポートを生成 (例として、商品カテゴリ別の売上合計を計算)
# 実際のレポート内容は必要に応じて変更してください
category_sales = df.groupby('Category')['Sales'].sum().reset_index()
# レポート内容を文字列として作成 (例として、簡単なテキスト形式)
report_content = f"Monthly Sales Report - {month_name} {year}\n\n"
report_content += category_sales.to_string(index=False)
report_content += "\n\n"
# レポートをテキストファイルとして一時的にローカルに保存
report_file_name = f"report_{year}{month:02}.txt"
report_file_path = f"./{report_file_name}"
with open(report_file_path, 'w') as f:
f.write(report_content)
# レポートファイルを共有フォルダにアップロード
drive.upload_file(shared_folder_name, report_file_path, file_name=report_file_name)
# 一時的に保存したローカルのファイルを削除
os.remove(report_file_path)
print(f"{month_name} {year} のレポートを {shared_folder_name} フォルダにアップロードしました。")
コードの解説
- フォルダ名の指定とIDの取得:
reports_folder_name
とshared_folder_name
で処理対象のフォルダ名を指定します。drive.get_item_id()
で各フォルダのIDを取得します。
- ファイル一覧の取得:
drive.drive.ListFile()
でReports
フォルダ内のCSVファイル一覧を取得します。
- 月次レポートの生成:
for
ループで各CSVファイルを処理します。- ファイル名から対象年月を抽出し、
calendar.month_name
で月名を取得します。 - ファイルをダウンロードし、
pandas
DataFrame として読み込みます。 os.remove(file_content['title'])
で、ダウンロードしたファイルを削除します。df.groupby('Category')['Sales'].sum().reset_index()
で商品カテゴリ別の売上合計を計算します。- レポート内容を文字列として作成します。
with open(report_file_path, 'w') as f:
で一時的にローカルにテキストファイルとしてレポートを保存します。
- レポートのアップロード:
drive.upload_file()
でレポートファイルをShared Reports
フォルダにアップロードします。os.remove(report_file_path)
で一時ファイルを削除します。
この例を応用することで、例えば以下のようなタスクも自動化できます。
- データベースからデータを取得し、CSVファイルとしてGoogle Driveにアップロードする。
- Google Drive上のファイルを定期的にバックアップする。
- フォームの回答データをスプレッドシートから読み込み、レポートを生成する。
これらの例を参考に、ご自身の業務に合わせた自動化処理を実装してみてください。
新しいメソッドを追加するには?
GoogleDriveAPI
クラスに新しい機能を追加したい場合は、クラス定義に新しいメソッドを追加するだけです!
例えば、Google_Drive_api.py
のGoogleDriveAPI
クラスに以下のようなメソッドを追加することで、特定のフォルダ内のファイルをすべて別フォルダにコピーする機能を実現できます。
def copy_all_files(self, source_folder_name: str, destination_folder_name: str) -> None:
"""
特定のフォルダ内のすべてのファイルを別フォルダにコピーする
Args:
source_folder_name (str): コピー元のフォルダ名
destination_folder_name (str): コピー先のフォルダ名
"""
source_folder_id = self.get_item_id(source_folder_name, is_folder=True)
if not source_folder_id:
print(f"コピー元のフォルダ '{source_folder_name}' が見つかりません")
return
destination_folder_id = self.get_item_id(destination_folder_name, is_folder=True)
if not destination_folder_id:
print(f"コピー先のフォルダ '{destination_folder_name}' が見つかりません")
return
file_list = self.drive.ListFile({'q': f"'{source_folder_id}' in parents and trashed=false"}).GetList()
for file in file_list:
# コピー先のメタデータを作成(タイトルと親フォルダを変更)
copied_file_metadata = {
'title': file['title'],
'parents': [{'id': destination_folder_id}]
}
# ファイルをコピー
copied_file = self.drive.auth.service.files().copy(fileId=file['id'], body=copied_file_metadata).execute()
print(f"ファイル '{file['title']}' を '{destination_folder_name}' にコピーしました (ID: {copied_file['id']})")
コードの解説
copy_all_files(self, source_folder_name: str, destination_folder_name: str) -> None
source_folder_name
: コピー元のフォルダ名を指定します。destination_folder_name
: コピー先のフォルダ名を指定します。
source_folder_id = self.get_item_id(source_folder_name, is_folder=True)
: コピー元のフォルダIDを取得します。destination_folder_id = self.get_item_id(destination_folder_name, is_folder=True)
: コピー先のフォルダIDを取得します。file_list = self.drive.ListFile({'q': f"'{source_folder_id}' in parents and trashed=false"}).GetList()
: コピー元のフォルダ内のファイル一覧を取得します。for file in file_list:
: 各ファイルに対してループ処理を行います。copied_file_metadata = {...}
: コピー後のファイルのメタデータを設定します。title
は元のファイル名、parents
はコピー先のフォルダIDを指定します。copied_file = self.drive.auth.service.files().copy(fileId=file['id'], body=copied_file_metadata).execute()
: Google Drive APIのfiles().copy()
メソッドを使ってファイルをコピーします。fileId
に元のファイルID、body
に新しいメタデータを指定します。print(...)
: コピー完了のメッセージを表示します。
このメソッドをGoogleDriveAPI
クラスに追加したら、以下のようにして使えます。
from Google_Drive_api import GoogleDriveAPI
# GoogleDriveAPI クラスのインスタンスを作成
drive = GoogleDriveAPI()
# コピー元とコピー先のフォルダ名を指定
source_folder_name = "SourceFolder"
destination_folder_name = "DestinationFolder"
# フォルダ内のファイルをすべてコピー
drive.copy_all_files(source_folder_name, destination_folder_name)
このように、GoogleDriveAPI
クラスを拡張することで、Google Driveをさらに便利に使いこなすことができます。
まとめ
この記事では、PythonのPyDrive2
ライブラリを使って、Google Driveを操作する方法を解説しました。GoogleDriveAPI
クラスを活用すれば、Google Drive APIの複雑な処理を簡略化し、効率的にファイル操作を行うことができます。
今回紹介したコードを参考に、ぜひ皆さんもPythonでGoogle Driveを自由自在に操ってみてください!
このブログ記事が、皆さんのGoogle Driveライフをより快適にする一助となれば幸いです。
おまけ:テスト用のコード
皆さんの手元でGoogle_Drive_api.pyを作成したときに、きちんと作動するか以下のコードでテストしてみてください。
何か間違いがあればエラーメッセージが出ますので、それを見て修正してみてください。
import unittest
import os
import pandas as pd
from Google_Drive_api import GoogleDriveAPI
import uuid
import io
import sys
import time
class TestGoogleDriveAPIIntegration(unittest.TestCase):
@classmethod
def setUpClass(cls):
# クラス全体のセットアップ。認証を行い、テスト用フォルダを準備
cls.api = GoogleDriveAPI()
cls.test_folder_name = f"TestFolder_{uuid.uuid4().hex}"
cls.test_file_name = "test_file.csv"
cls.test_file_path = "test_file.csv"
cls.non_existent_folder_name = "NonExistentFolder"
cls.non_existent_file_name = "non_existent_file.csv"
# テスト用フォルダの作成
folder = cls.api.drive.CreateFile({'title': cls.test_folder_name, 'mimeType': 'application/vnd.google-apps.folder'})
folder.Upload()
cls.test_folder_id = folder['id']
# テスト用ファイルの作成
with open(cls.test_file_path, "w") as f:
f.write("col1,col2\n1,2\n3,4")
# テスト用ファイルのアップロード
cls.api.upload_file(cls.test_folder_name, cls.test_file_path, cls.test_file_name)
print("-" * 70)
print(f" テストフォルダ '{cls.test_folder_name}' を作成しました")
print("-" * 70)
@classmethod
def tearDownClass(cls):
# クラス全体のクリーンアップ。テスト用フォルダとファイルを削除
if os.path.exists(cls.test_file_path):
os.remove(cls.test_file_path)
try:
folder = cls.api.drive.CreateFile({'id': cls.test_folder_id})
folder.Delete()
print("-" * 70)
print(f" テストフォルダ '{cls.test_folder_name}' を削除しました")
print("-" * 70)
except Exception as e:
print(f"クリーンアップ中にエラーが発生しました: {e}")
def setUp(self):
# 各テストメソッドの前処理。特に何もしていません。
self.captured_output = io.StringIO()
sys.stdout = self.captured_output
def tearDown(self):
# 各テストメソッドの後処理。キャプチャした出力をリセット
sys.stdout = sys.__stdout__
if self.captured_output.getvalue() and not self._outcome.success:
print(f"\n詳細情報 (テスト失敗時のみ):\n{self.captured_output.getvalue()}")
def print_test_result(self, result, test_name):
print("-" * 70)
print(f" {test_name}: {'成功' if result else '失敗'}")
print("-" * 70)
def test_01_list_files_in_folder(self):
test_name = "test_01_list_files_in_folder"
print(f"テストフォルダ: {self.test_folder_name} の内容を確認")
self.api.list_files_in_folder(self.test_folder_name)
output = self.captured_output.getvalue()
result = self.test_file_name in output and "ID:" in output
self.print_test_result(result, test_name)
self.assertTrue(result, "アップロードしたファイルが一覧に表示されていない、またはファイルIDが一覧に表示されていません")
def test_02_get_file_df(self):
test_name = "test_02_get_file_df"
print(f"テストフォルダ: {self.test_folder_name} のファイル情報をDataFrameで取得")
df = self.api.get_file_df(self.test_folder_name)
# DataFrame の内容を見やすく表示
print(" DataFrame:")
if not df.empty:
print(df.to_string(index=False)) # インデックスを非表示にして表示
else:
print(" (空のDataFrame)")
result = not df.empty and all(col in df.columns for col in ['title', 'createdDate', 'id']) and df['title'][0] == self.test_file_name
self.print_test_result(result, test_name)
self.assertTrue(result, "DataFrameが空、必要なカラムがない、またはアップロードしたファイル名が含まれていません")
def test_03_download_file(self):
test_name = "test_03_download_file"
# ダウンロードしたファイルを一時保存するためのディレクトリを作成
download_dir = "downloaded_files"
os.makedirs(download_dir, exist_ok=True)
# テスト用に、ファイルをもう一つアップロード
self.api.upload_file(self.test_folder_name, self.test_file_path, "download.csv")
self.api.download_file(folder_name=self.test_folder_name, file_name="download.csv", target_dir=download_dir)
downloaded_file_path = os.path.join(download_dir, "download.csv")
result = os.path.exists(downloaded_file_path)
self.print_test_result(result, test_name)
self.assertTrue(result, "ファイルがダウンロードされていません")
# ダウンロードしたファイルを削除
if os.path.exists(downloaded_file_path):
os.remove(downloaded_file_path)
# ダウンロード用ディレクトリを削除
os.rmdir(download_dir)
def test_04_delete_file(self):
test_name = "test_04_delete_file"
# テスト用に、ファイルをもう一つアップロード
self.api.upload_file(self.test_folder_name, self.test_file_path, "delete.csv")
file_id = self.api.get_item_id("delete.csv", parent_id=self.test_folder_id)
self.api.delete_file(file_id=file_id)
# 削除されたか確認
deleted_file_id = self.api.get_item_id("delete.csv", parent_id=self.test_folder_id)
result = deleted_file_id is None
self.print_test_result(result, test_name)
self.assertTrue(result, "ファイルの削除に失敗しました")
def test_05_get_item_id_folder_exists(self):
test_name = "test_05_get_item_id_folder_exists"
folder_id = self.api.get_item_id(self.test_folder_name, is_folder=True)
result = folder_id is not None
self.print_test_result(result, test_name)
self.assertTrue(result, f"フォルダ '{self.test_folder_name}' のIDが取得できませんでした")
def test_06_get_item_id_folder_not_exists(self):
test_name = "test_06_get_item_id_folder_not_exists"
folder_id = self.api.get_item_id(self.non_existent_folder_name, is_folder=True)
result = folder_id is None
self.print_test_result(result, test_name)
self.assertTrue(result, f"存在しないフォルダ '{self.non_existent_folder_name}' のIDが取得されました")
def test_07_get_item_id_file_exists(self):
test_name = "test_07_get_item_id_file_exists"
file_id = self.api.get_item_id(self.test_file_name, parent_id=self.api.get_item_id(self.test_folder_name, is_folder=True))
result = file_id is not None
self.print_test_result(result, test_name)
self.assertTrue(result, f"ファイル '{self.test_file_name}' のIDが取得できませんでした")
def test_08_get_item_id_file_not_exists(self):
test_name = "test_08_get_item_id_file_not_exists"
file_id = self.api.get_item_id(self.non_existent_file_name, parent_id=self.api.get_item_id(self.test_folder_name, is_folder=True))
result = file_id is None
self.print_test_result(result, test_name)
self.assertTrue(result, f"存在しないファイル '{self.non_existent_file_name}' のIDが取得されました")
def test_09_get_latest_file_id(self):
test_name = "test_09_get_latest_file_id"
# 名前を変えてもう一つアップロード
time.sleep(1) # アップロード間隔を空ける
self.api.upload_file(self.test_folder_name, self.test_file_path, self.test_file_name + "2")
latest_file_id = self.api.get_latest_file_id(self.test_folder_id)
result = latest_file_id is not None
self.print_test_result(result, test_name)
self.assertTrue(result, "最新のファイルIDが取得できませんでした")
def test_10_get_latest_file_id_empty_folder(self):
test_name = "test_10_get_latest_file_id_empty_folder"
# テスト用の空のフォルダを作成するために、一意なフォルダ名を使用します
unique_folder_name = f"EmptyTestFolder_{uuid.uuid4().hex}"
# 空のフォルダを作成
empty_folder = self.api.drive.CreateFile({'title': unique_folder_name, 'mimeType': 'application/vnd.google-apps.folder'})
empty_folder.Upload()
empty_folder_id = empty_folder['id']
latest_file_id = self.api.get_latest_file_id(empty_folder_id)
# テストで使用した空のフォルダを削除
empty_folder.Delete()
result = latest_file_id is None
self.print_test_result(result, test_name)
self.assertTrue(result, "空のフォルダでファイルIDが取得されました")
def test_11_get_all_file_ids(self):
test_name = "test_11_get_all_file_ids"
file_ids = self.api.get_all_file_ids(self.test_folder_name)
result = len(file_ids) >= 1
self.print_test_result(result, test_name)
self.assertTrue(result, "ファイルIDのリストが空です")
def test_12_get_all_file_ids_empty_folder(self):
test_name = "test_12_get_all_file_ids_empty_folder"
file_ids = self.api.get_all_file_ids(self.non_existent_folder_name)
result = len(file_ids) == 0
self.print_test_result(result, test_name)
self.assertTrue(result, "存在しないフォルダでファイルIDが取得されました")
def test_13_get_created_at(self):
test_name = "test_13_get_created_at"
created_at_dict = self.api.get_created_at(self.test_folder_name)
result = len(created_at_dict) > 0
self.print_test_result(result, test_name)
self.assertTrue(result, "作成日時の辞書が空です")
def test_14_get_created_at_empty_folder(self):
test_name = "test_14_get_created_at_empty_folder"
created_at_dict = self.api.get_created_at(self.non_existent_folder_name)
result = len(created_at_dict) == 0
self.print_test_result(result, test_name)
self.assertTrue(result, "存在しないフォルダで作成日時が取得されました")
def test_15_upload_file(self):
test_name = "test_15_upload_file"
# テスト用ファイルをアップロード
self.api.upload_file(self.test_folder_name, self.test_file_path, "upload.csv")
file_id = self.api.get_item_id("upload.csv", parent_id=self.api.get_item_id(self.test_folder_name, is_folder=True))
result = file_id is not None
self.print_test_result(result, test_name)
self.assertTrue(result, "ファイルのアップロードに失敗しました")
def test_16_upload_file_to_nonexistent_folder(self):
test_name = "test_16_upload_file_to_nonexistent_folder"
# 存在しないフォルダにアップロードを試みる
result = self.api.upload_file(self.non_existent_folder_name, self.test_file_path, self.test_file_name)
# メソッドが正常に失敗したことを確認するために、結果がNoneであることを期待する
self.print_test_result(result is None, test_name)
self.assertIsNone(result)
def test_17_download_nonexistent_file(self):
test_name = "test_17_download_nonexistent_file"
self.api.download_file(folder_name=self.test_folder_name, file_name=self.non_existent_file_name)
# 存在しないファイルのダウンロードを試みた際にエラーメッセージが表示されることを確認
# このテストは、エラーが適切に処理されることを検証するために、コンソール出力をキャプチャする必要があります。
# ただし、unittestの標準的な方法では直接キャプチャできないため、ここではスキップします。
# 代わりに、手動でテストを実行し、コンソール出力を確認してください。
result = "ファイルが見つかりません" in self.captured_output.getvalue()
self.print_test_result(result, test_name)
self.assertTrue(result, "存在しないファイルをダウンロードしようとした際に、エラーメッセージが表示されませんでした")
def test_18_delete_nonexistent_file(self):
test_name = "test_18_delete_nonexistent_file"
result = self.api.delete_file(folder_name=self.test_folder_name, file_name=self.non_existent_file_name)
# 存在しないファイルを削除しようとしたときにエラーが発生しないことを確認する。
# メソッドが正常に失敗したことを確認するために、結果がNoneであることを期待する
self.print_test_result(result is None, test_name)
self.assertIsNone(result)
def test_19_list_files_in_nonexistent_folder(self):
test_name = "test_19_list_files_in_nonexistent_folder"
self.api.list_files_in_folder(self.non_existent_folder_name)
result = f"指定されたフォルダ'{self.non_existent_folder_name}'が見つかりません" in self.captured_output.getvalue()
self.print_test_result(result, test_name)
self.assertTrue(result, "存在しないフォルダに対してlist_files_in_folderを実行した際に、エラーメッセージが表示されませんでした")
if __name__ == '__main__':
unittest.main(argv=['first-arg-is-ignored'], exit=False, verbosity=2)
コメント