PythonでGoogle Drive APIを使いこなす!PyDrive2を使った自動化術

目次

はじめに

皆さんこんにちは!

仕事やプライベートで、Google Driveを活用されている方は多いのではないでしょうか?ファイルの保存や共有にとても便利ですよね。

でも、手作業でのファイル操作って、結構面倒だったりしませんか?特に、大量のファイルを扱ったり、定期的に同じ作業を繰り返したりする場合は、うんざりしてしまうこともあるでしょう。
私もデータアナリスト時代に大量のファイルを扱う必要があり、何とかならないものかとずっと考えていました。

そんな悩みを解決してくれるのが、PythonPyDrive2ライブラリです!これらを組み合わせれば、Google Driveの操作を自動化して、面倒な作業から解放されちゃいましょう。

この記事では、私が実際に業務効率化のために作成したGoogleDriveAPIクラスを題材に、PyDrive2の使い方を徹底解説します!

「APIってなんだか難しそう…」と不安に思っている方も、大丈夫!この記事を読めば、きっとPythonでGoogle Driveを自在に操れるようになりますよ。

さあ、一緒にGoogle Drive自動化の世界へ飛び込みましょう。

この記事で学べること

  • PyDrive2ライブラリの概要とインストール方法
  • Google Cloud Consoleでの設定 (OAuth同意画面など)
  • Google Drive APIの認証方法
  • GoogleDriveAPIクラスを使った、Google Driveの基本的な操作
    • フォルダ/ファイルIDの取得
    • ファイルのアップロード/ダウンロード
    • ファイルの削除
    • フォルダ内のファイル一覧取得
    • 特定のフォルダ内の最新ファイルの取得
    • ファイル情報からpandas DataFrameを生成
  • GoogleDriveAPIクラスを別のPythonファイルから呼び出して使う方法
  • GoogleDriveAPIクラスに新しいメソッドを追加する方法

GoogleDriveAPI クラスって?

GoogleDriveAPI クラスは、私がGoogle Drive APIを簡単に操作できるように作った、オリジナルのPythonクラスです。このクラスを使えば、Google Driveの様々な操作を、シンプルで分かりやすいコードで実行できます。

例えば、フォルダ内の最新のファイルをダウンロードしたい場合、GoogleDriveAPIクラスを使えば、たった数行のコードで実現できます。

from Google_Drive_api import GoogleDriveAPI

# GoogleDriveAPI クラスのインスタンスを作成
drive = GoogleDriveAPI()

# フォルダIDを指定 (例: マイドライブのID)
folder_id = drive.get_item_id("マイドライブ", is_folder=True)

# 作成日が一番新しいファイルのIDを取得
file_id = drive.get_latest_file_id(folder_id)

# ファイルをダウンロード
if file_id:
    drive.download_file(file_id=file_id)
else:
    print("ファイルが見つかりませんでした。")

すごく簡単ですよね?

このブログ記事では、GoogleDriveAPIクラスのコードを詳しく解説しながら、PyDrive2の使い方をマスターしていきます。

事前準備:PyDrive2をインストールしよう

まずは、PyDrive2ライブラリをインストールしましょう。pipコマンドを使えば、簡単にインストールできます。

ターミナルまたはコマンドプロンプトを開いて、以下のコマンドを実行してください。

pip install pydrive2 pandas

このコマンドで、PyDrive2pandas(ファイル一覧をデータフレームとして取得するために必要)がインストールされます。

事前準備:Google Cloud ConsoleでAPIを有効化しよう

次に、Google Cloud Consoleで、Google Drive APIを有効化し、認証情報を作成する必要があります。

ちょっと手順が複雑なので、ここでは概要だけ説明します。詳しくは、PyDrive2の公式ドキュメント や、「PythonでGoogle Drive APIを使う」 などの記事を参考にしてください。

  1. Google Cloud Consoleにアクセス: Google Cloud Console にアクセスし、Googleアカウントでログインします。
  2. プロジェクトの作成または選択: 画面上部の「プロジェクトを選択」から、新しいプロジェクトを作成するか、既存のプロジェクトを選択します。
  3. Google Drive APIの有効化: 左側のナビゲーションメニューから「APIとサービス」>「ライブラリ」を選択し、「Google Drive API」を検索して、「有効にする」をクリックします。
  4. OAuth同意画面の設定: 左側のメニューから「APIとサービス」>「OAuth同意画面」を選択し、必要な情報を入力して、同意画面を設定します。
    • User Typeは用途に応じて外部内部を選択してください。個人利用の場合は通常外部を選択すれば問題ありません。
    • アプリケーション名は任意の名前(例:PyDrive2 App)を入力してください。
    • 「承認済みドメイン」には、ご自身のドメイン(無い場合は適当なドメイン(例:example.com))を入力してください。
    • 「保存して次へ」をクリックしてください。
    • 次の「スコープ」は何も入力せずに「保存して次へ」をクリックしてください。
    • 次の「テストユーザー」はご自身のGoogleアカウントのメールアドレスを入力して、「保存して次へ」をクリックしてください。
  5. 認証情報の作成: 左側のメニューから「APIとサービス」>「認証情報」を選択し、「認証情報を作成」>「OAuthクライアントID」を選択します。
    • 「アプリケーションの種類」で「デスクトップアプリ」を選択します。
    • 「名前」は任意の名前(例:PyDrive2 Desktop Client)を入力してください。
    • 「作成」をクリックします。
  6. 認証情報のダウンロード: 作成した認証情報の右側にあるダウンロードアイコンをクリックし、JSONファイルをダウンロードします。ダウンロードしたファイルを、client_secrets.jsonという名前で保存します。
  7. settings.yamlの作成: 以下の内容のsettings.yamlという名前のファイルを作成し、client_secrets.jsonと同じディレクトリに保存します。
client_config_backend: file
client_config_file: client_secrets.json
save_credentials: True
save_credentials_backend: file
save_credentials_file: saved_credentials.json
get_refresh_token: True
oauth_scope:
  - https://www.googleapis.com/auth/drive
  - https://www.googleapis.com/auth/drive.install

重要: client_secrets.jsonsettings.yamlには、機密情報が含まれています。これらのファイルを誤って公開したり、他人と共有したりしないように、厳重に管理してください。

GoogleDriveAPI クラスのコード解説

さあ、準備が整ったところで、いよいよGoogleDriveAPIクラスのコードを見ていきましょう!

ディレクトリ構成

.
├── main.py
├── Google_Drive_api.py
├── client_secrets.json
├── saved_credentials.json
└── settings.yaml

main.pyが実行するスクリプトで、Google_Drive_api.pyGoogleDriveAPIクラスを定義しているモジュールです。 client_secrets.json, saved_credentials.json, settings.yamlは上で説明した設定ファイルです。 これらはすべて同じディレクトリに配置してください。

Google_Drive_api.pyのコード

import os
import pandas as pd
from pydrive2.drive import GoogleDrive
from pydrive2.auth import GoogleAuth
from typing import List, Dict, Optional

class GoogleDriveAPI:
    FOLDER_MIME_TYPE = 'application/vnd.google-apps.folder'
    CSV_MIME_TYPE = 'text/csv'

    def __init__(self) -> None:
        """
        GoogleDriveAPIクラスのコンストラクタ
        """

        self.gauth = GoogleAuth()

        # 保存された認証情報を読み込む
        self.gauth.LoadCredentialsFile("saved_credentials.json")

        # 認証情報が存在しない、または無効な場合は新たに認証を行う
        if self.gauth.credentials is None:
            self.gauth.LocalWebserverAuth()
        elif self.gauth.access_token_expired:
            self.gauth.Refresh()
        else:
            self.gauth.Authorize()

        # 認証情報を保存
        self.gauth.SaveCredentialsFile("saved_credentials.json")

        self.drive = GoogleDrive(self.gauth)

    def get_item_id(self, name: str, parent_id: Optional[str] = None, is_folder: bool = False) -> Optional[str]:
        """
        ファイルまたはフォルダのIDを取得

        Args:
            name (str): 取得したいファイルまたはフォルダの名前。
            parent_id (str, optional): 親フォルダのID。指定しない場合はルートフォルダから検索。
            is_folder (bool, optional): フォルダを取得する場合はTrue、ファイルを取得する場合はFalse。デフォルトはFalse。

        Returns:
            str or None: 見つかったファイルまたはフォルダのID。見つからない場合はNone。
        """
        query = f"trashed = false and title='{name}'"
        if is_folder:
            query += f" and mimeType='{self.FOLDER_MIME_TYPE}'"
        if parent_id:
            query += f" and '{parent_id}' in parents"

        item_list = self.drive.ListFile({'q': query}).GetList()
        if not item_list:
            return None
        elif len(item_list) > 1:
            print(f"警告: '{name}' に一致する項目が複数見つかりました。最初の項目を返します。")
        return item_list[0]['id']

    def get_latest_file_id(self, folder_id: str) -> Optional[str]:
        """
        フォルダ内の最新ファイルのIDを取得

        Args:
            folder_id (str): フォルダのID。

        Returns:
            str or None: 最新ファイルのID。ファイルが存在しない場合はNone。
        """
        query = f"trashed = false and '{folder_id}' in parents and mimeType != '{self.FOLDER_MIME_TYPE}'"
        file_list = self.drive.ListFile({'q': query}).GetList()
        if not file_list:
            return None
        return max(file_list, key=lambda f: f['createdDate'])['id']

    def _get_files_info_from_folder(self, folder_id: Optional[str]) -> List[Dict]:
        """
        フォルダ内のファイル情報を取得するヘルパー関数

        Args:
            folder_id (str): フォルダのID

        Returns:
            List[Dict]: ファイル情報のリスト
        """
        if not folder_id:
            return []
        return self.drive.ListFile({'q': f"'{folder_id}' in parents"}).GetList()

    def get_all_file_ids(self, folder_name: str) -> List[str]:
        """
        フォルダ内のすべてのファイルIDを取得

        Args:
            folder_name (str): フォルダ名。

        Returns:
            List[str]: ファイルIDのリスト。
        """
        folder_id = self.get_item_id(folder_name, is_folder=True)
        file_list = self._get_files_info_from_folder(folder_id)
        return [file['id'] for file in file_list] if file_list else []

    def get_file_df(self, folder_name: str) -> pd.DataFrame:
        """
        フォルダ内のファイル情報をDataFrameとして取得

        Args:
            folder_name (str): フォルダ名。

        Returns:
            pd.DataFrame: ファイル情報を含むDataFrame。
        """
        folder_id = self.get_item_id(folder_name, is_folder=True)
        file_list = self._get_files_info_from_folder(folder_id)
        return pd.DataFrame([(file['title'], file['createdDate'], file['id']) for file in file_list],
                            columns=['title', 'createdDate', 'id']) if file_list else pd.DataFrame()

    def get_created_at(self, folder_name: str) -> Dict[str, str]:
        """
        ファイルIDと作成日時の辞書を取得

        Args:
            folder_name (str): フォルダ名。

        Returns:
            Dict[str, str]: ファイルIDをキー、作成日時を値とする辞書。
        """
        folder_id = self.get_item_id(folder_name, is_folder=True)
        file_list = self._get_files_info_from_folder(folder_id)
        return {file['id']: file['createdDate'] for file in file_list} if file_list else {}

    def upload_file(self, folder_name: str, content_file: str, file_name: str = 'no_name.csv') -> None:
        """
        ファイルをGoogle Driveにアップロード

        Args:
            folder_name (str): アップロード先のフォルダ名。
            content_file (str): アップロードするファイルのパス。
            file_name (str, optional): アップロードするファイルの名前。デフォルトは 'no_name.csv'。
        """
        folder_id = self.get_item_id(folder_name, is_folder=True)
        if not folder_id:
            print(f"フォルダ '{folder_name}' が見つかりません")
            return

        metadata = {
            'parents': [{"id": folder_id}],
            'title': file_name,
            'mimeType': self.CSV_MIME_TYPE
        }
        f = self.drive.CreateFile(metadata=metadata)
        f.SetContentFile(content_file)
        try:
            f.Upload()
            print(f"ファイル '{file_name}' が正常にアップロードされました")
        except Exception as e:
            print(f"ファイルのアップロード中にエラーが発生しました: {e}")
        finally:
            f.content.close()  # ファイルを確実に閉じる

    def download_file(self, file_id: Optional[str] = None, folder_name: Optional[str] = None, file_name: Optional[str] = None, target_dir: str = "") -> None:
        """
        IDまたは名前でファイルをダウンロード

        Args:
            file_id (str, optional): ダウンロードするファイルのID。指定しない場合は、folder_nameとfile_nameから検索。
            folder_name (str, optional): ファイルが存在するフォルダ名。file_idが指定されていない場合に必要。
            file_name (str, optional): ダウンロードするファイル名。file_idが指定されていない場合に必要。
            target_dir (str, optional): ダウンロードしたファイルを保存するディレクトリ。デフォルトはカレントディレクトリ。
        """
        if not file_id:
            if not folder_name or not file_name:
                print("file_idまたはfolder_nameとfile_nameの両方を指定する必要があります")
                return
            folder_id = self.get_item_id(folder_name, is_folder=True)
            if not folder_id:
                print(f"フォルダ '{folder_name}' が見つかりません")
                return
            file_id = self.get_item_id(file_name, parent_id=folder_id)

        if not file_id:
            print(f"ファイルが見つかりません")
            return

        try:
            f = self.drive.CreateFile({'id': file_id})
            f.FetchMetadata()
            if f.metadata.get('trashed', False):
                print("ファイルはゴミ箱にあります")
                return

            output_name = file_name or f['title']
            f.GetContentFile(os.path.join(target_dir, output_name))
            print(f"ファイル '{output_name}' が正常にダウンロードされました")
        except Exception as e:
            print(f"ファイルのダウンロード中にエラーが発生しました: {e}")

    def delete_file(self, file_id: Optional[str] = None, folder_name: Optional[str] = None, file_name: Optional[str] = None) -> None:
        """
        IDまたは名前でファイルを削除

        Args:
            file_id (str, optional): 削除するファイルのID。指定しない場合は、folder_nameとfile_nameから検索。
            folder_name (str, optional): ファイルが存在するフォルダ名。file_idが指定されていない場合に必要。
            file_name (str, optional): 削除するファイル名。file_idが指定されていない場合に必要。
        """
        if not file_id:
            if not folder_name or not file_name:
                print("file_idまたはfolder_nameとfile_nameの両方を指定する必要があります")
                return
            folder_id = self.get_item_id(folder_name, is_folder=True)
            if not folder_id:
                print(f"フォルダ '{folder_name}' が見つかりません")
                return
            file_id = self.get_item_id(file_name, parent_id=folder_id)

        if not file_id:
            print(f"ファイルが見つかりません")
            return

        try:
            f = self.drive.CreateFile({'id': file_id})
            f.Delete()
            print(f"ID '{file_id}' のファイルが正常に削除されました")
        except Exception as e:
            print(f"エラーが発生しました: {e}")

    def list_files_in_folder(self, folder_name: str) -> None:
        """
        フォルダ内のすべてのファイルを一覧表示

        Args:
        フォルダ内のすべてのファイルを一覧表示

        Args:
            folder_name (str): フォルダ名。
        """
        folder_id = self.get_item_id(folder_name, is_folder=True)
        if not folder_id:
            print(f"指定されたフォルダ'{folder_name}'が見つかりません")
            return

        file_list = self.drive.ListFile({'q': f"'{folder_id}' in parents"}).GetList()
        if not file_list:
            print("フォルダ内にファイルが見つかりません")
            return

        for file in file_list:
            print(f"タイトル: {file['title']}, ID: {file['id']}")

    def create_folder(self, folder_name: str, parent_folder_id: str = 'root') -> None:
        """
        Google Drive上にフォルダを作成する

        Args:
            folder_name (str): 作成するフォルダ名
            parent_folder_id (str): 親フォルダのID。デフォルトは'root'(ルートフォルダ)
        """

        folder_metadata = {
            'title': folder_name,
            'mimeType': self.FOLDER_MIME_TYPE,
            'parents': [{'id': parent_folder_id}]
        }
        folder = self.drive.CreateFile(folder_metadata)
        folder.Upload()
        print(f"フォルダ '{folder_name}' (ID: {folder['id']}) が作成されました")

コードの解説

ここからはコードを解説します。
chatGPTも使っているので、ちょっと日本語が変なところがありますが、気にしないでください。

クラス変数

    FOLDER_MIME_TYPE = 'application/vnd.google-apps.folder'
    CSV_MIME_TYPE = 'text/csv'
  • FOLDER_MIME_TYPE: フォルダのMIMEタイプを定義しています。Google Drive APIではフォルダを特定するためにこのMIMEタイプを使用します。
  • CSV_MIME_TYPE: CSVファイルのMIMEタイプを定義しています。アップロード時にファイルの種別を指定するために使用します。

__init__ メソッド (コンストラクタ)

    def __init__(self) -> None:
        """
        GoogleDriveAPIクラスのコンストラクタ
        """

        self.gauth = GoogleAuth()

        # 保存された認証情報を読み込む
        self.gauth.LoadCredentialsFile("saved_credentials.json")

        # 認証情報が存在しない、または無効な場合は新たに認証を行う
        if self.gauth.credentials is None:
            self.gauth.LocalWebserverAuth()
        elif self.gauth.access_token_expired:
            self.gauth.Refresh()
        else:
            self.gauth.Authorize()

        # 認証情報を保存
        self.gauth.SaveCredentialsFile("saved_credentials.json")

        self.drive = GoogleDrive(self.gauth)
  • self.gauth = GoogleAuth(): GoogleAuthオブジェクトを生成します。このオブジェクトを使って認証処理を行います。
  • self.gauth.LoadCredentialsFile("saved_credentials.json"): 以前に認証した際に保存された認証情報をsaved_credentials.jsonファイルから読み込みます。
  • 認証情報の確認と更新:
    • if self.gauth.credentials is None:: 認証情報が全く存在しない場合、self.gauth.LocalWebserverAuth()を実行して、ブラウザ経由での認証プロセスを開始します。
    • elif self.gauth.access_token_expired:: 認証情報は存在するが、アクセストークンの有効期限が切れている場合、self.gauth.Refresh()を実行してトークンを更新します。
    • else:: 認証情報が存在し、アクセストークンも有効な場合、self.gauth.Authorize()で認証を完了します。
  • self.gauth.SaveCredentialsFile("saved_credentials.json"): 更新された認証情報をsaved_credentials.jsonに保存します。
  • self.drive = GoogleDrive(self.gauth): 認証済みのGoogleAuthオブジェクトを使ってGoogleDriveオブジェクトを生成します。このオブジェクトを使ってGoogle Drive APIを操作します。

get_item_id メソッド

    def get_item_id(self, name: str, parent_id: Optional[str] = None, is_folder: bool = False) -> Optional[str]:
        """
        ファイルまたはフォルダのIDを取得

        Args:
            name (str): 取得したいファイルまたはフォルダの名前。
            parent_id (str, optional): 親フォルダのID。指定しない場合はルートフォルダから検索。
            is_folder (bool, optional): フォルダを取得する場合はTrue、ファイルを取得する場合はFalse。デフォルトはFalse。

        Returns:
            str or None: 見つかったファイルまたはフォルダのID。見つからない場合はNone。
        """
        query = f"trashed = false and title='{name}'"
        if is_folder:
            query += f" and mimeType='{self.FOLDER_MIME_TYPE}'"
        if parent_id:
            query += f" and '{parent_id}' in parents"

        item_list = self.drive.ListFile({'q': query}).GetList()
        if not item_list:
            return None
        elif len(item_list) > 1:
            print(f"警告: '{name}' に一致する項目が複数見つかりました。最初の項目を返します。")
        return item_list[0]['id']
  • query = f"trashed = false and title='{name}'": Google Drive APIに送信する検索クエリのベース部分を作成します。
    • trashed = false: ゴミ箱にないアイテムを対象とします。
    • title='{name}': 指定された名前と一致するアイテムを検索します。
  • if is_folder: query += f" and mimeType='{self.FOLDER_MIME_TYPE}'": is_folderTrueの場合、フォルダのみを対象とするようにクエリを拡張します。
  • if parent_id: query += f" and '{parent_id}' in parents": parent_idが指定されている場合、指定された親フォルダ内に限定するようにクエリを拡張します。
  • item_list = self.drive.ListFile({'q': query}).GetList(): 構築したクエリを使ってListFileメソッドを実行し、条件に一致するアイテムのリストを取得します。
  • if not item_list: return None: 一致するアイテムが見つからない場合はNoneを返します。
  • elif len(item_list) > 1: print(f"警告: '{name}' に一致する項目が複数見つかりました。最初の項目を返します。"): 一致するアイテムが複数見つかった場合は、警告メッセージを表示します。
  • return item_list[0]['id']: 最初のアイテムのIDを返します。

get_latest_file_id メソッド

    def get_latest_file_id(self, folder_id: str) -> Optional[str]:
        """
        フォルダ内の最新ファイルのIDを取得

        Args:
            folder_id (str): フォルダのID。

        Returns:
            str or None: 最新ファイルのID。ファイルが存在しない場合はNone。
        """
        query = f"trashed = false and '{folder_id}' in parents and mimeType != '{self.FOLDER_MIME_TYPE}'"
        file_list = self.drive.ListFile({'q': query}).GetList()
        if not file_list:
            return None
        return max(file_list, key=lambda f: f['createdDate'])['id']
  • query = f"trashed = false and '{folder_id}' in parents and mimeType != '{self.FOLDER_MIME_TYPE}'": 検索クエリを作成します。
    • trashed = false: ゴミ箱にないアイテムを対象とします。
    • '{folder_id}' in parents: 指定されたフォルダ内のアイテムを対象とします。
    • mimeType != '{self.FOLDER_MIME_TYPE}': フォルダを除外し、ファイルのみを対象とします。
  • file_list = self.drive.ListFile({'q': query}).GetList(): クエリを実行して、条件に一致するファイルのリストを取得します。
  • if not file_list: return None: ファイルが見つからない場合はNoneを返します。
  • return max(file_list, key=lambda f: f['createdDate'])['id']: createdDate(作成日時)をキーとして、最も新しいファイルのIDを返します。

_get_files_info_from_folder メソッド

    def _get_files_info_from_folder(self, folder_id: Optional[str]) -> List[Dict]:
        """
        フォルダ内のファイル情報を取得するヘルパー関数

        Args:
            folder_id (str): フォルダのID

        Returns:
            List[Dict]: ファイル情報のリスト
        """
        if not folder_id:
            return []
        return self.drive.ListFile({'q': f"'{folder_id}' in parents"}).GetList()
  • if not folder_id: return []: folder_idNoneまたは空文字列の場合は、空のリストを返します。
  • return self.drive.ListFile({'q': f"'{folder_id}' in parents"}).GetList(): 指定されたフォルダ内のすべてのファイル情報をリストとして返します。

get_all_file_ids メソッド

    def get_all_file_ids(self, folder_name: str) -> List[str]:
        """
        フォルダ内のすべてのファイルIDを取得

        Args:
            folder_name (str): フォルダ名。

        Returns:
            List[str]: ファイルIDのリスト。
        """
        folder_id = self.get_item_id(folder_name, is_folder=True)
        file_list = self._get_files_info_from_folder(folder_id)
        return [file['id'] for file in file_list] if file_list else []
  • folder_id = self.get_item_id(folder_name, is_folder=True): get_item_idメソッドを使って、指定されたフォルダ名のフォルダIDを取得します。
  • file_list = self._get_files_info_from_folder(folder_id): _get_files_info_from_folderヘルパー関数を使って、フォルダ内のファイル情報リストを取得します。
  • return [file['id'] for file in file_list] if file_list else []: ファイル情報リストからファイルIDを抽出し、リストとして返します。ファイルが存在しない場合は空のリストを返します。

get_file_df メソッド

    def get_file_df(self, folder_name: str) -> pd.DataFrame:
        """
        フォルダ内のファイル情報をDataFrameとして取得

        Args:
            folder_name (str): フォルダ名。

        Returns:
            pd.DataFrame: ファイル情報を含むDataFrame。
        """
        folder_id = self.get_item_id(folder_name, is_folder=True)
        file_list = self._get_files_info_from_folder(folder_id)
        return pd.DataFrame([(file['title'], file['createdDate'], file['id']) for file in file_list],
                            columns=['title', 'createdDate', 'id']) if file_list else pd.DataFrame()
  • folder_id = self.get_item_id(folder_name, is_folder=True): get_item_idメソッドを使って、指定されたフォルダ名のフォルダIDを取得します。
  • file_list = self._get_files_info_from_folder(folder_id): _get_files_info_from_folderヘルパー関数を使って、フォルダ内のファイル情報リストを取得します。
  • return pd.DataFrame(...) if file_list else pd.DataFrame(): ファイル情報リストから、ファイル名、作成日時、IDを抽出してpandasDataFrameオブジェクトを作成して返します。ファイルが存在しない場合は空のDataFrameを返します。

get_created_at メソッド

    def get_created_at(self, folder_name: str) -> Dict[str, str]:
        """
        ファイルIDと作成日時の辞書を取得

        Args:
            folder_name (str): フォルダ名。

        Returns:
            Dict[str, str]: ファイルIDをキー、作成日時を値とする辞書。
        """
        folder_id = self.get_item_id(folder_name, is_folder=True)
        file_list = self._get_files_info_from_folder(folder_id)
        return {file['id']: file['createdDate'] for file in file_list} if file_list else {}
  • folder_id = self.get_item_id(folder_name, is_folder=True): get_item_idメソッドを使って、指定されたフォルダ名のフォルダIDを取得します。
  • file_list = self._get_files_info_from_folder(folder_id): _get_files_info_from_folderヘルパー関数を使って、フォルダ内のファイル情報リストを取得します。
  • return {file['id']: file['createdDate'] for file in file_list} if file_list else {}: ファイル情報リストから、ファイルIDと作成日時を抽出して辞書として返します。ファイルが存在しない場合は空の辞書を返します。

upload_file メソッド

    def upload_file(self, folder_name: str, content_file: str, file_name: str = 'no_name.csv') -> None:
        """
        ファイルをGoogle Driveにアップロード

        Args:
            folder_name (str): アップロード先のフォルダ名。
            content_file (str): アップロードするファイルのパス。
            file_name (str, optional): アップロードするファイルの名前。デフォルトは 'no_name.csv'。
        """
        folder_id = self.get_item_id(folder_name, is_folder=True)
        if not folder_id:
            print(f"フォルダ '{folder_name}' が見つかりません")
            return

        metadata = {
            'parents': [{"id": folder_id}],
            'title': file_name,
            'mimeType': self.CSV_MIME_TYPE
        }
        f = self.drive.CreateFile(metadata=metadata)
        f.SetContentFile(content_file)
        try:
            f.Upload()
            print(f"ファイル '{file_name}' が正常にアップロードされました")
        except Exception as e:
            print(f"ファイルのアップロード中にエラーが発生しました: {e}")
        finally:
            f.content.close()  # ファイルを確実に閉じる
  • folder_id = self.get_item_id(folder_name, is_folder=True): get_item_idメソッドを使って、アップロード先のフォルダIDを取得します。
  • if not folder_id: ...: フォルダIDが取得できない場合、エラーメッセージを表示して関数を終了します。
  • metadata = {...}: アップロードするファイルのメタデータを設定します。
    • parents: アップロード先のフォルダIDを指定します。
    • title: アップロードするファイルの名前を指定します。
    • mimeType: ファイルのMIMEタイプをCSV_MIME_TYPEに設定します。
  • f = self.drive.CreateFile(metadata=metadata): メタデータを使ってGoogleDriveFileオブジェクトを作成します。
  • f.SetContentFile(content_file): アップロードするローカルファイルのパスを指定します。
  • try...except...finally: ファイルのアップロード処理中にエラーが発生した場合の例外処理と、正常終了/エラー発生に関わらずファイルを閉じる処理を記述しています。
    • f.Upload(): ファイルをアップロードします。
    • print(...): アップロード成功またはエラー発生のメッセージを表示します。
    • f.content.close(): ローカルファイルをクローズします。

download_file メソッド

    def download_file(self, file_id: Optional[str] = None, folder_name: Optional[str] = None, file_name: Optional[str] = None, target_dir: str = "") -> None:
        """
        IDまたは名前でファイルをダウンロード

        Args:
            file_id (str, optional): ダウンロードするファイルのID。指定しない場合は、folder_nameとfile_nameから検索。
            folder_name (str, optional): ファイルが存在するフォルダ名。file_idが指定されていない場合に必要。
            file_name (str, optional): ダウンロードするファイル名。file_idが指定されていない場合に必要。
            target_dir (str, optional): ダウンロードしたファイルを保存するディレクトリ。デフォルトはカレントディレクトリ。
        """
        if not file_id:
            if not folder_name or not file_name:
                print("file_idまたはfolder_nameとfile_nameの両方を指定する必要があります")
                return
            folder_id = self.get_item_id(folder_name, is_folder=True)
            if not folder_id:
                print(f"フォルダ '{folder_name}' が見つかりません")
                return
            file_id = self.get_item_id(file_name, parent_id=folder_id)
        if not file_id:
            print(f"ファイルが見つかりません")
            return

        try:
            f = self.drive.CreateFile({'id': file_id})
            f.FetchMetadata()
            if f.metadata.get('trashed', False):
                print("ファイルはゴミ箱にあります")
                return

            output_name = file_name or f['title']
            f.GetContentFile(os.path.join(target_dir, output_name))
            print(f"ファイル '{output_name}' が正常にダウンロードされました")
        except Exception as e:
            print(f"ファイルのダウンロード中にエラーが発生しました: {e}")
  • file_id, folder_name, file_name: ダウンロードするファイルを特定するための引数です。file_idを直接指定するか、folder_namefile_nameを組み合わせて指定します。
  • target_dir: ダウンロードしたファイルを保存するローカルディレクトリを指定します。デフォルトはカレントディレクトリです。
  • if not file_id:: file_idが指定されていない場合は、folder_namefile_nameからget_item_idメソッドを使ってファイルIDを取得します。
    • if not folder_name or not file_name: ...: 必要な情報が不足している場合はエラーメッセージを表示して関数を終了します。
    • folder_id = self.get_item_id(...): フォルダIDを取得します。
    • if not folder_id: ...: フォルダが見つからない場合はエラーメッセージを表示して関数を終了します。
    • file_id = self.get_item_id(...): ファイルIDを取得します。
  • if not file_id: ...: ファイルが見つからない場合はエラーメッセージを表示して関数を終了します。
  • try...except: ファイルのダウンロード処理中にエラーが発生した場合の例外処理を記述しています。
    • f = self.drive.CreateFile({'id': file_id}): 指定されたファイルIDを使ってGoogleDriveFileオブジェクトを作成します。
    • f.FetchMetadata(): ファイルのメタデータを取得します。
    • if f.metadata.get('trashed', False): ...: ファイルがゴミ箱にあるかどうかを確認し、ゴミ箱にある場合はエラーメッセージを表示して関数を終了します。
    • output_name = file_name or f['title']: ダウンロードするファイルの名前を決定します。file_nameが指定されている場合はその名前を使い、そうでない場合はGoogle Drive上のファイル名を使います。
    • f.GetContentFile(os.path.join(target_dir, output_name)): ファイルをダウンロードし、指定されたパスに保存します。
    • print(...): ダウンロード成功またはエラー発生のメッセージを表示します。

delete_file メソッド

    def delete_file(self, file_id: Optional[str] = None, folder_name: Optional[str] = None, file_name: Optional[str] = None) -> None:
        """
        IDまたは名前でファイルを削除

        Args:
            file_id (str, optional): 削除するファイルのID。指定しない場合は、folder_nameとfile_nameから検索。
            folder_name (str, optional): ファイルが存在するフォルダ名。file_idが指定されていない場合に必要。
            file_name (str, optional): 削除するファイル名。file_idが指定されていない場合に必要。
        """
        if not file_id:
            if not folder_name or not file_name:
                print("file_idまたはfolder_nameとfile_nameの両方を指定する必要があります")
                return
            folder_id = self.get_item_id(folder_name, is_folder=True)
            if not folder_id:
                print(f"フォルダ '{folder_name}' が見つかりません")
                return
            file_id = self.get_item_id(file_name, parent_id=folder_id)

        if not file_id:
            print(f"ファイルが見つかりません")
            return

        try:
            f = self.drive.CreateFile({'id': file_id})
            f.Delete()
            print(f"ID '{file_id}' のファイルが正常に削除されました")
        except Exception as e:
            print(f"エラーが発生しました: {e}")
  • file_id, folder_name, file_name: 削除するファイルを特定するための引数です。file_idを直接指定するか、folder_namefile_nameを組み合わせて指定します。
  • if not file_id:: file_idが指定されていない場合は、folder_namefile_nameからget_item_idメソッドを使ってファイルIDを取得します。
    • if not folder_name or not file_name: ...: 必要な情報が不足している場合はエラーメッセージを表示して関数を終了します。
    • folder_id = self.get_item_id(...): フォルダIDを取得します。
    • if not folder_id: ...: フォルダが見つからない場合はエラーメッセージを表示して関数を終了します。
    • file_id = self.get_item_id(...): ファイルIDを取得します。
  • if not file_id: ...: ファイルが見つからない場合はエラーメッセージを表示して関数を終了します。
  • try...except: ファイルの削除処理中にエラーが発生した場合の例外処理を記述しています。
    • f = self.drive.CreateFile({'id': file_id}): 指定されたファイルIDを使ってGoogleDriveFileオブジェクトを作成します。
    • f.Delete(): ファイルを削除します。
    • print(...): 削除成功またはエラー発生のメッセージを表示します。

list_files_in_folder メソッド

    def list_files_in_folder(self, folder_name: str) -> None:
        """
        フォルダ内のすべてのファイルを一覧表示

        Args:
            folder_name (str): フォルダ名。
        """
        folder_id = self.get_item_id(folder_name, is_folder=True)
        if not folder_id:
            print(f"指定されたフォルダ'{folder_name}'が見つかりません")
            return

        file_list = self.drive.ListFile({'q': f"'{folder_id}' in parents"}).GetList()
        if not file_list:
            print("フォルダ内にファイルが見つかりません")
            return

        for file in file_list:
            print(f"タイトル: {file['title']}, ID: {file['id']}")
  • folder_id = self.get_item_id(folder_name, is_folder=True): get_item_idメソッドを使って、指定されたフォルダ名のフォルダIDを取得します。
  • if not folder_id: ...: フォルダIDが取得できない場合、エラーメッセージを表示して関数を終了します。
  • file_list = self.drive.ListFile({'q': f"'{folder_id}' in parents"}).GetList(): 指定されたフォルダIDを親に持つすべてのファイル情報を取得します。
  • if not file_list: ...: ファイルリストが空の場合、メッセージを表示して関数を終了します。
  • for file in file_list: ...: 取得したファイルリストをループ処理し、各ファイルのタイトルとIDを表示します。

create_folder メソッド

    def create_folder(self, folder_name: str, parent_folder_id: str = 'root') -> None:
        """
        Google Drive上にフォルダを作成する

        Args:
            folder_name (str): 作成するフォルダ名
            parent_folder_id (str): 親フォルダのID。デフォルトは'root'(ルートフォルダ)
        """

        folder_metadata = {
            'title': folder_name,
            'mimeType': self.FOLDER_MIME_TYPE,
            'parents': [{'id': parent_folder_id}]
        }
        folder = self.drive.CreateFile(folder_metadata)
        folder.Upload()
        print(f"フォルダ '{folder_name}' (ID: {folder['id']}) が作成されました")
  • folder_metadata = {...}: 作成するフォルダのメタデータを設定します。
    • title: フォルダ名を指定します。
    • mimeType: フォルダのMIMEタイプを指定します。
    • parents: 親フォルダのIDを指定します。
  • folder = self.drive.CreateFile(folder_metadata): メタデータを使ってGoogleDriveFileオブジェクトを作成します。
  • folder.Upload(): フォルダをGoogle Driveにアップロード(作成)します。
  • print(...): フォルダ作成成功のメッセージを表示します。

GoogleDriveAPI クラスの使用例

それでは、実際にGoogleDriveAPIクラスを使って、Google Driveを操作してみましょう!

モジュールとして使う

まず、Google_Drive_api.py ファイルと同じディレクトリに、新しいPythonファイル(例えば main.py)を作成します。

main.pyに、以下のようにコードを記述します。

from Google_Drive_api import GoogleDriveAPI

# GoogleDriveAPI クラスのインスタンスを作成
drive = GoogleDriveAPI()

# フォルダ名を指定 (例: マイドライブ直下)
folder_name = "マイドライブ"

# マイドライブのフォルダIDを取得
folder_id = drive.get_item_id(folder_name, is_folder=True)

# マイドライブ内のファイル一覧を表示
if folder_id:
    drive.list_files_in_folder(folder_name)

    # マイドライブ内のファイル情報をDataFrameとして取得
    df = drive.get_file_df(folder_name)
    print(df)
else:
    print(f"フォルダ '{folder_name}' が見つかりませんでした。")

このコードを実行すると、マイドライブ フォルダ内のファイル一覧と、ファイル情報を格納した pandas の DataFrame が表示されます。

以下、main.pyで実行できる他の操作の例を紹介します。

例1: フォルダ内の最新のファイルをダウンロードする

from Google_Drive_api import GoogleDriveAPI

# GoogleDriveAPI クラスのインスタンスを作成
drive = GoogleDriveAPI()

# フォルダ名を指定
folder_name = "MyFolder"

# フォルダIDを取得
folder_id = drive.get_item_id(folder_name, is_folder=True)

# 最新のファイルIDを取得
if folder_id:
    latest_file_id = drive.get_latest_file_id(folder_id)

    # ファイルをダウンロード
    if latest_file_id:
        drive.download_file(file_id=latest_file_id)
    else:
        print(f"フォルダ '{folder_name}' にファイルが見つかりません")
else:
    print(f"フォルダ '{folder_name}' が見つかりませんでした。")

このコードを実行すると、MyFolderフォルダ内の最新のファイルが、main.pyと同じディレクトリにダウンロードされます。

例2: ファイルをアップロードする

from Google_Drive_api import GoogleDriveAPI

# GoogleDriveAPI クラスのインスタンスを作成
drive = GoogleDriveAPI()

# フォルダ名を指定
folder_name = "MyFolder"

# アップロードするファイルのパスを指定
file_path = "upload.txt"

# ファイルをアップロード (ファイル名を指定)
drive.upload_file(folder_name, file_path, file_name="uploaded_file.txt")

このコードを実行すると、upload.txtというファイルが、uploaded_file.txtという名前でMyFolderフォルダにアップロードされます。upload.txtmain.pyと同じディレクトリに置いてください。

使用例3: 特定フォルダ内のCSVファイルを読み込んでデータ処理、結果を別フォルダへアップロードする

使用例2までは単純なケースだったので、もう少し実践的なケースを見てみます。

業務でよくあるケースとして、Google Drive上の特定のフォルダに格納されたCSVファイルのデータを読み込み、何らかの処理を施した後、結果を別のCSVファイルとして指定のフォルダにアップロードする、というシナリオを想定してみましょう。

例えば、InputData フォルダ内に複数のCSVファイル(例:data_20231026.csv, data_20231027.csv)が存在し、これらのファイルには売上データが格納されているとします。
このデータを読み込んで、日別の売上合計を計算し、OutputDataフォルダに daily_sales.csv というファイル名でアップロードする処理を考えます。

from Google_Drive_api import GoogleDriveAPI
import pandas as pd
import os

# GoogleDriveAPI クラスのインスタンスを作成
drive = GoogleDriveAPI()

# 入力フォルダと出力フォルダの名前を指定
input_folder_name = "InputData"
output_folder_name = "OutputData"

# 入力フォルダのIDを取得
input_folder_id = drive.get_item_id(input_folder_name, is_folder=True)

# 入力フォルダ内のCSVファイル一覧を取得
file_list = drive.drive.ListFile({'q': f"'{input_folder_id}' in parents and trashed=false and mimeType='text/csv'"}).GetList()

# 各CSVファイルを読み込んでデータを結合
all_data = pd.DataFrame()
for file in file_list:
    # ファイルをダウンロードして、pandas DataFrameとして読み込む
    file_content = drive.drive.CreateFile({'id': file['id']})
    file_content.GetContentFile(file_content['title'], mimetype='text/csv')
    df = pd.read_csv(file_content['title'])
    all_data = pd.concat([all_data, df])

    # ダウンロードしたファイルを削除
    os.remove(file_content['title'])

# 日別の売上合計を計算 (例として、'Date'列と'Sales'列があると仮定)
all_data['Date'] = pd.to_datetime(all_data['Date'])
daily_sales = all_data.groupby('Date')['Sales'].sum().reset_index()

# 結果をCSVファイルとして出力
output_file_name = 'daily_sales.csv'
output_file_path = f'./{output_file_name}' # 一時的にローカルに保存
daily_sales.to_csv(output_file_path, index=False)

# 結果をGoogle Driveの出力フォルダにアップロード
drive.upload_file(output_folder_name, output_file_path, file_name=output_file_name)

# 一時的に保存したローカルのファイルを削除
os.remove(output_file_path)

print(f"日別売上データを {output_folder_name} フォルダに {output_file_name} としてアップロードしました。")

コードの解説

  1. フォルダとファイル名の指定: input_folder_nameoutput_folder_name で処理対象のフォルダ名を指定します。
  2. 入力フォルダ内のCSVファイル一覧取得:drive.drive.ListFile() とクエリを使って、指定フォルダ内のCSVファイルのみを取得します。
    • 'q': f"'{input_folder_id}' in parents and trashed=false and mimeType='text/csv'" は、input_folder_id を親に持ち、ゴミ箱になく、MIMEタイプが text/csv のファイルのみを対象とするクエリです。
  3. ファイルのダウンロードとデータ結合:
    • for ループで各CSVファイルを処理します。
    • file_content.GetContentFile(file_content['title'], mimetype='text/csv')でファイルをダウンロードします。ダウンロードしたファイルはfile_content['title']にローカルパスが保存されています。
    • pd.read_csv() でCSVファイルを pandas DataFrame として読み込みます。
    • pd.concat() で全てのDataFrameを結合し、all_data に格納します。
    • os.remove(file_content['title'])で、ダウンロードしたファイルを削除します。
  4. データ処理:
    • all_data['Date'] = pd.to_datetime(all_data['Date'])Date 列を日付型に変換します。
    • daily_sales = all_data.groupby('Date')['Sales'].sum().reset_index() で、日付ごとの Sales 列の合計を計算し、新しいDataFrame daily_sales に格納します。
  5. 結果のアップロード:
    • daily_sales.to_csv(output_file_path, index=False)で処理結果のデータフレームをローカルファイルに保存します。
    • drive.upload_file() でローカルに保存されたファイルを Google Drive の OutputData フォルダにアップロードします。
    • os.remove(output_file_path)で、一時的に作成したローカルファイルを削除します。

この例では、Google Drive上のファイルを直接操作するのではなく、一度ローカルにダウンロードしてから処理し、再度アップロードしています。 これは、pandas のようなデータ処理ライブラリがGoogle Drive上のファイルを直接扱えないためです。

使用例4: 定期レポートの自動生成と共有フォルダへのアップロード

特定のフォルダに格納されているデータを使って定期的にレポートを生成し、それを特定の共有フォルダに自動的にアップロードする例を紹介します。

例えば、Reports フォルダに各月の売上データ(例:sales_202310.csv, sales_202311.csv)が保存されているとします。

from Google_Drive_api import GoogleDriveAPI
import pandas as pd
import os
import calendar

# GoogleDriveAPI クラスのインスタンスを作成
drive = GoogleDriveAPI()

# レポート元データフォルダと共有フォルダの名前を指定
reports_folder_name = "Reports"
shared_folder_name = "Shared Reports"

# 各フォルダのIDを取得
reports_folder_id = drive.get_item_id(reports_folder_name, is_folder=True)
shared_folder_id = drive.get_item_id(shared_folder_name, is_folder=True)

# レポート元データフォルダ内のCSVファイル一覧を取得
file_list = drive.drive.ListFile({'q': f"'{reports_folder_id}' in parents and trashed=false and mimeType='text/csv'"}).GetList()

# 各月のレポートを生成
for file in file_list:
    # ファイル名から対象年月を取得 (例: sales_202310.csv -> 2023, 10)
    year, month = map(int, file['title'].split('_')[1].split('.')[0][:6])
    month_name = calendar.month_name[month]

    # ファイルをダウンロードして、pandas DataFrameとして読み込む
    file_content = drive.drive.CreateFile({'id': file['id']})
    file_content.GetContentFile(file_content['title'], mimetype='text/csv')
    df = pd.read_csv(file_content['title'])

    # ダウンロードしたファイルを削除
    os.remove(file_content['title'])

    # 月次レポートを生成 (例として、商品カテゴリ別の売上合計を計算)
    # 実際のレポート内容は必要に応じて変更してください
    category_sales = df.groupby('Category')['Sales'].sum().reset_index()

    # レポート内容を文字列として作成 (例として、簡単なテキスト形式)
    report_content = f"Monthly Sales Report - {month_name} {year}\n\n"
    report_content += category_sales.to_string(index=False)
    report_content += "\n\n"

    # レポートをテキストファイルとして一時的にローカルに保存
    report_file_name = f"report_{year}{month:02}.txt"
    report_file_path = f"./{report_file_name}"
    with open(report_file_path, 'w') as f:
        f.write(report_content)

    # レポートファイルを共有フォルダにアップロード
    drive.upload_file(shared_folder_name, report_file_path, file_name=report_file_name)

    # 一時的に保存したローカルのファイルを削除
    os.remove(report_file_path)

    print(f"{month_name} {year} のレポートを {shared_folder_name} フォルダにアップロードしました。")

コードの解説

  1. フォルダ名の指定とIDの取得:
    • reports_folder_nameshared_folder_name で処理対象のフォルダ名を指定します。
    • drive.get_item_id() で各フォルダのIDを取得します。
  2. ファイル一覧の取得:
    • drive.drive.ListFile()Reports フォルダ内のCSVファイル一覧を取得します。
  3. 月次レポートの生成:
    • for ループで各CSVファイルを処理します。
    • ファイル名から対象年月を抽出し、calendar.month_name で月名を取得します。
    • ファイルをダウンロードし、pandas DataFrame として読み込みます。
    • os.remove(file_content['title'])で、ダウンロードしたファイルを削除します。
    • df.groupby('Category')['Sales'].sum().reset_index() で商品カテゴリ別の売上合計を計算します。
    • レポート内容を文字列として作成します。
    • with open(report_file_path, 'w') as f: で一時的にローカルにテキストファイルとしてレポートを保存します。
  4. レポートのアップロード:
    • drive.upload_file() でレポートファイルを Shared Reports フォルダにアップロードします。
    • os.remove(report_file_path) で一時ファイルを削除します。

この例を応用することで、例えば以下のようなタスクも自動化できます。

  • データベースからデータを取得し、CSVファイルとしてGoogle Driveにアップロードする。
  • Google Drive上のファイルを定期的にバックアップする。
  • フォームの回答データをスプレッドシートから読み込み、レポートを生成する。

これらの例を参考に、ご自身の業務に合わせた自動化処理を実装してみてください。

新しいメソッドを追加するには?

GoogleDriveAPIクラスに新しい機能を追加したい場合は、クラス定義に新しいメソッドを追加するだけです!

例えば、Google_Drive_api.pyGoogleDriveAPIクラスに以下のようなメソッドを追加することで、特定のフォルダ内のファイルをすべて別フォルダにコピーする機能を実現できます。

    def copy_all_files(self, source_folder_name: str, destination_folder_name: str) -> None:
        """
        特定のフォルダ内のすべてのファイルを別フォルダにコピーする

        Args:
            source_folder_name (str): コピー元のフォルダ名
            destination_folder_name (str): コピー先のフォルダ名
        """

        source_folder_id = self.get_item_id(source_folder_name, is_folder=True)
        if not source_folder_id:
            print(f"コピー元のフォルダ '{source_folder_name}' が見つかりません")
            return

        destination_folder_id = self.get_item_id(destination_folder_name, is_folder=True)
        if not destination_folder_id:
            print(f"コピー先のフォルダ '{destination_folder_name}' が見つかりません")
            return

        file_list = self.drive.ListFile({'q': f"'{source_folder_id}' in parents and trashed=false"}).GetList()
        for file in file_list:
            # コピー先のメタデータを作成(タイトルと親フォルダを変更)
            copied_file_metadata = {
                'title': file['title'],
                'parents': [{'id': destination_folder_id}]
            }

            # ファイルをコピー
            copied_file = self.drive.auth.service.files().copy(fileId=file['id'], body=copied_file_metadata).execute()
            print(f"ファイル '{file['title']}' を '{destination_folder_name}' にコピーしました (ID: {copied_file['id']})")

コードの解説

  • copy_all_files(self, source_folder_name: str, destination_folder_name: str) -> None
    • source_folder_name: コピー元のフォルダ名を指定します。
    • destination_folder_name: コピー先のフォルダ名を指定します。
  • source_folder_id = self.get_item_id(source_folder_name, is_folder=True): コピー元のフォルダIDを取得します。
  • destination_folder_id = self.get_item_id(destination_folder_name, is_folder=True): コピー先のフォルダIDを取得します。
  • file_list = self.drive.ListFile({'q': f"'{source_folder_id}' in parents and trashed=false"}).GetList(): コピー元のフォルダ内のファイル一覧を取得します。
  • for file in file_list:: 各ファイルに対してループ処理を行います。
    • copied_file_metadata = {...}: コピー後のファイルのメタデータを設定します。titleは元のファイル名、parentsはコピー先のフォルダIDを指定します。
    • copied_file = self.drive.auth.service.files().copy(fileId=file['id'], body=copied_file_metadata).execute(): Google Drive APIのfiles().copy()メソッドを使ってファイルをコピーします。fileIdに元のファイルID、bodyに新しいメタデータを指定します。
    • print(...): コピー完了のメッセージを表示します。

このメソッドをGoogleDriveAPIクラスに追加したら、以下のようにして使えます。

from Google_Drive_api import GoogleDriveAPI

# GoogleDriveAPI クラスのインスタンスを作成
drive = GoogleDriveAPI()

# コピー元とコピー先のフォルダ名を指定
source_folder_name = "SourceFolder"
destination_folder_name = "DestinationFolder"

# フォルダ内のファイルをすべてコピー
drive.copy_all_files(source_folder_name, destination_folder_name)

このように、GoogleDriveAPIクラスを拡張することで、Google Driveをさらに便利に使いこなすことができます。

まとめ

この記事では、PythonのPyDrive2ライブラリを使って、Google Driveを操作する方法を解説しました。GoogleDriveAPIクラスを活用すれば、Google Drive APIの複雑な処理を簡略化し、効率的にファイル操作を行うことができます。

今回紹介したコードを参考に、ぜひ皆さんもPythonでGoogle Driveを自由自在に操ってみてください!

このブログ記事が、皆さんのGoogle Driveライフをより快適にする一助となれば幸いです。

おまけ:テスト用のコード

皆さんの手元でGoogle_Drive_api.pyを作成したときに、きちんと作動するか以下のコードでテストしてみてください。
何か間違いがあればエラーメッセージが出ますので、それを見て修正してみてください。

import unittest
import os
import pandas as pd
from Google_Drive_api import GoogleDriveAPI
import uuid
import io
import sys
import time

class TestGoogleDriveAPIIntegration(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        # クラス全体のセットアップ。認証を行い、テスト用フォルダを準備
        cls.api = GoogleDriveAPI()
        cls.test_folder_name = f"TestFolder_{uuid.uuid4().hex}"
        cls.test_file_name = "test_file.csv"
        cls.test_file_path = "test_file.csv"
        cls.non_existent_folder_name = "NonExistentFolder"
        cls.non_existent_file_name = "non_existent_file.csv"


        # テスト用フォルダの作成
        folder = cls.api.drive.CreateFile({'title': cls.test_folder_name, 'mimeType': 'application/vnd.google-apps.folder'})
        folder.Upload()
        cls.test_folder_id = folder['id']

        # テスト用ファイルの作成
        with open(cls.test_file_path, "w") as f:
            f.write("col1,col2\n1,2\n3,4")

        # テスト用ファイルのアップロード
        cls.api.upload_file(cls.test_folder_name, cls.test_file_path, cls.test_file_name)

        print("-" * 70)
        print(f" テストフォルダ '{cls.test_folder_name}' を作成しました")
        print("-" * 70)

    @classmethod
    def tearDownClass(cls):
        # クラス全体のクリーンアップ。テスト用フォルダとファイルを削除
        if os.path.exists(cls.test_file_path):
            os.remove(cls.test_file_path)
        try:
            folder = cls.api.drive.CreateFile({'id': cls.test_folder_id})
            folder.Delete()
            print("-" * 70)
            print(f" テストフォルダ '{cls.test_folder_name}' を削除しました")
            print("-" * 70)
        except Exception as e:
            print(f"クリーンアップ中にエラーが発生しました: {e}")

    def setUp(self):
        # 各テストメソッドの前処理。特に何もしていません。
        self.captured_output = io.StringIO()
        sys.stdout = self.captured_output

    def tearDown(self):
        # 各テストメソッドの後処理。キャプチャした出力をリセット
        sys.stdout = sys.__stdout__
        if self.captured_output.getvalue() and not self._outcome.success:
            print(f"\n詳細情報 (テスト失敗時のみ):\n{self.captured_output.getvalue()}")

    def print_test_result(self, result, test_name):
        print("-" * 70)
        print(f"  {test_name}: {'成功' if result else '失敗'}")
        print("-" * 70)

    def test_01_list_files_in_folder(self):
        test_name = "test_01_list_files_in_folder"
        print(f"テストフォルダ: {self.test_folder_name} の内容を確認")
        self.api.list_files_in_folder(self.test_folder_name)
        output = self.captured_output.getvalue()

        result = self.test_file_name in output and "ID:" in output
        self.print_test_result(result, test_name)
        self.assertTrue(result, "アップロードしたファイルが一覧に表示されていない、またはファイルIDが一覧に表示されていません")

    def test_02_get_file_df(self):
        test_name = "test_02_get_file_df"
        print(f"テストフォルダ: {self.test_folder_name} のファイル情報をDataFrameで取得")
        df = self.api.get_file_df(self.test_folder_name)

        # DataFrame の内容を見やすく表示
        print("  DataFrame:")
        if not df.empty:
            print(df.to_string(index=False))  # インデックスを非表示にして表示
        else:
            print("    (空のDataFrame)")

        result = not df.empty and all(col in df.columns for col in ['title', 'createdDate', 'id']) and df['title'][0] == self.test_file_name
        self.print_test_result(result, test_name)
        self.assertTrue(result, "DataFrameが空、必要なカラムがない、またはアップロードしたファイル名が含まれていません")

    def test_03_download_file(self):
        test_name = "test_03_download_file"
        # ダウンロードしたファイルを一時保存するためのディレクトリを作成
        download_dir = "downloaded_files"
        os.makedirs(download_dir, exist_ok=True)

        # テスト用に、ファイルをもう一つアップロード
        self.api.upload_file(self.test_folder_name, self.test_file_path, "download.csv")

        self.api.download_file(folder_name=self.test_folder_name, file_name="download.csv", target_dir=download_dir)

        downloaded_file_path = os.path.join(download_dir, "download.csv")
        result = os.path.exists(downloaded_file_path)
        self.print_test_result(result, test_name)
        self.assertTrue(result, "ファイルがダウンロードされていません")

        # ダウンロードしたファイルを削除
        if os.path.exists(downloaded_file_path):
            os.remove(downloaded_file_path)
        # ダウンロード用ディレクトリを削除
        os.rmdir(download_dir)

    def test_04_delete_file(self):
        test_name = "test_04_delete_file"
        # テスト用に、ファイルをもう一つアップロード
        self.api.upload_file(self.test_folder_name, self.test_file_path, "delete.csv")

        file_id = self.api.get_item_id("delete.csv", parent_id=self.test_folder_id)
        self.api.delete_file(file_id=file_id)

        # 削除されたか確認
        deleted_file_id = self.api.get_item_id("delete.csv", parent_id=self.test_folder_id)

        result = deleted_file_id is None
        self.print_test_result(result, test_name)
        self.assertTrue(result, "ファイルの削除に失敗しました")

    def test_05_get_item_id_folder_exists(self):
        test_name = "test_05_get_item_id_folder_exists"
        folder_id = self.api.get_item_id(self.test_folder_name, is_folder=True)
        result = folder_id is not None
        self.print_test_result(result, test_name)
        self.assertTrue(result, f"フォルダ '{self.test_folder_name}' のIDが取得できませんでした")

    def test_06_get_item_id_folder_not_exists(self):
        test_name = "test_06_get_item_id_folder_not_exists"
        folder_id = self.api.get_item_id(self.non_existent_folder_name, is_folder=True)
        result = folder_id is None
        self.print_test_result(result, test_name)
        self.assertTrue(result, f"存在しないフォルダ '{self.non_existent_folder_name}' のIDが取得されました")

    def test_07_get_item_id_file_exists(self):
        test_name = "test_07_get_item_id_file_exists"
        file_id = self.api.get_item_id(self.test_file_name, parent_id=self.api.get_item_id(self.test_folder_name, is_folder=True))
        result = file_id is not None
        self.print_test_result(result, test_name)
        self.assertTrue(result, f"ファイル '{self.test_file_name}' のIDが取得できませんでした")

    def test_08_get_item_id_file_not_exists(self):
        test_name = "test_08_get_item_id_file_not_exists"
        file_id = self.api.get_item_id(self.non_existent_file_name, parent_id=self.api.get_item_id(self.test_folder_name, is_folder=True))
        result = file_id is None
        self.print_test_result(result, test_name)
        self.assertTrue(result, f"存在しないファイル '{self.non_existent_file_name}' のIDが取得されました")

    def test_09_get_latest_file_id(self):
        test_name = "test_09_get_latest_file_id"
        # 名前を変えてもう一つアップロード
        time.sleep(1) # アップロード間隔を空ける
        self.api.upload_file(self.test_folder_name, self.test_file_path, self.test_file_name + "2")
        latest_file_id = self.api.get_latest_file_id(self.test_folder_id)
        result = latest_file_id is not None
        self.print_test_result(result, test_name)
        self.assertTrue(result, "最新のファイルIDが取得できませんでした")

    def test_10_get_latest_file_id_empty_folder(self):
        test_name = "test_10_get_latest_file_id_empty_folder"
        # テスト用の空のフォルダを作成するために、一意なフォルダ名を使用します
        unique_folder_name = f"EmptyTestFolder_{uuid.uuid4().hex}"
        # 空のフォルダを作成
        empty_folder = self.api.drive.CreateFile({'title': unique_folder_name, 'mimeType': 'application/vnd.google-apps.folder'})
        empty_folder.Upload()
        empty_folder_id = empty_folder['id']

        latest_file_id = self.api.get_latest_file_id(empty_folder_id)

        # テストで使用した空のフォルダを削除
        empty_folder.Delete()

        result = latest_file_id is None
        self.print_test_result(result, test_name)
        self.assertTrue(result, "空のフォルダでファイルIDが取得されました")

    def test_11_get_all_file_ids(self):
        test_name = "test_11_get_all_file_ids"
        file_ids = self.api.get_all_file_ids(self.test_folder_name)
        result = len(file_ids) >= 1
        self.print_test_result(result, test_name)
        self.assertTrue(result, "ファイルIDのリストが空です")

    def test_12_get_all_file_ids_empty_folder(self):
        test_name = "test_12_get_all_file_ids_empty_folder"
        file_ids = self.api.get_all_file_ids(self.non_existent_folder_name)
        result = len(file_ids) == 0
        self.print_test_result(result, test_name)
        self.assertTrue(result, "存在しないフォルダでファイルIDが取得されました")

    def test_13_get_created_at(self):
        test_name = "test_13_get_created_at"
        created_at_dict = self.api.get_created_at(self.test_folder_name)
        result = len(created_at_dict) > 0
        self.print_test_result(result, test_name)
        self.assertTrue(result, "作成日時の辞書が空です")

    def test_14_get_created_at_empty_folder(self):
        test_name = "test_14_get_created_at_empty_folder"
        created_at_dict = self.api.get_created_at(self.non_existent_folder_name)
        result = len(created_at_dict) == 0
        self.print_test_result(result, test_name)
        self.assertTrue(result, "存在しないフォルダで作成日時が取得されました")

    def test_15_upload_file(self):
        test_name = "test_15_upload_file"
        # テスト用ファイルをアップロード
        self.api.upload_file(self.test_folder_name, self.test_file_path, "upload.csv")
        file_id = self.api.get_item_id("upload.csv", parent_id=self.api.get_item_id(self.test_folder_name, is_folder=True))
        result = file_id is not None
        self.print_test_result(result, test_name)
        self.assertTrue(result, "ファイルのアップロードに失敗しました")

    def test_16_upload_file_to_nonexistent_folder(self):
        test_name = "test_16_upload_file_to_nonexistent_folder"
        # 存在しないフォルダにアップロードを試みる
        result = self.api.upload_file(self.non_existent_folder_name, self.test_file_path, self.test_file_name)
        # メソッドが正常に失敗したことを確認するために、結果がNoneであることを期待する
        self.print_test_result(result is None, test_name)
        self.assertIsNone(result)

    def test_17_download_nonexistent_file(self):
        test_name = "test_17_download_nonexistent_file"
        self.api.download_file(folder_name=self.test_folder_name, file_name=self.non_existent_file_name)
        # 存在しないファイルのダウンロードを試みた際にエラーメッセージが表示されることを確認
        # このテストは、エラーが適切に処理されることを検証するために、コンソール出力をキャプチャする必要があります。
        # ただし、unittestの標準的な方法では直接キャプチャできないため、ここではスキップします。
        # 代わりに、手動でテストを実行し、コンソール出力を確認してください。
        result = "ファイルが見つかりません" in self.captured_output.getvalue()
        self.print_test_result(result, test_name)
        self.assertTrue(result, "存在しないファイルをダウンロードしようとした際に、エラーメッセージが表示されませんでした")

    def test_18_delete_nonexistent_file(self):
        test_name = "test_18_delete_nonexistent_file"
        result = self.api.delete_file(folder_name=self.test_folder_name, file_name=self.non_existent_file_name)
        # 存在しないファイルを削除しようとしたときにエラーが発生しないことを確認する。
        # メソッドが正常に失敗したことを確認するために、結果がNoneであることを期待する
        self.print_test_result(result is None, test_name)
        self.assertIsNone(result)

def test_19_list_files_in_nonexistent_folder(self):
        test_name = "test_19_list_files_in_nonexistent_folder"
        self.api.list_files_in_folder(self.non_existent_folder_name)
        result = f"指定されたフォルダ'{self.non_existent_folder_name}'が見つかりません" in self.captured_output.getvalue()
        self.print_test_result(result, test_name)
        self.assertTrue(result, "存在しないフォルダに対してlist_files_in_folderを実行した際に、エラーメッセージが表示されませんでした")

if __name__ == '__main__':
    unittest.main(argv=['first-arg-is-ignored'], exit=False, verbosity=2)
よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!

この記事を書いた人

都内の金融機関で経営企画をしています。
2年でメガバンクを辞めてしまいましたが、むしろ人生が豊かになりました。
データアナリスト的なことをしていたのでPythonとTableauがちょっとだけ使えます。
文系大卒→メガバンク(営業)→広告系ベンチャー(経営企画、FP&A、データアナリスト)→都内金融機関(経営企画)

コメント

コメントする

目次