MusicBrainz データセットの使用

このチュートリアルでは、MusicBrainz データベース内のテーブルの JSON スナップショットを使用します。このデータベースは PostgreSQL で作成され、MusicBrainz の音楽に関する次のような情報が保存されています。

MusicBrainz スキーマには、artistrecordingartist_credit_name という 3 つの関連テーブルが定義されています。アーティストのクレジットは、レコーディングでアーティストに与えられたクレジットを表し、artist_credit_name 行は artist_credit 値でレコーディングとアーティストをリンクしています。

pg_cmd="\\copy (select row_to_json(r) from (select * from artist) r ) toexported_artist.json"psql -w -h $host -U $user -d $db -c $pg_cmdsed -i -e 's/\\\\/\\/g' exported_artist.json #clean up extra '\' characters

方法 1: BigQuery ウェブ UI で ETL を行う

この方法では、1 回に少量のデータを BigQuery に読み込み、分析を行います。サイズの大きいデータセットや複数のデータセットの自動処理を行う前に、データセットのプロトタイプを作成する場合にも使用します。

BigQuery データセットを作成する

次の図は、BigQuery データセットの作成手順を表しています。

MusicBrainz のテーブルを BigQuery に個別に読み込み、読み込まれたテーブルを結合します。各行には必要なデータリンクが含まれます。結合結果を新しい BigQuery テーブルに保存します。この処理が完了したら、読み込んだ元のテーブルを削除できます。

  1. データセット名の右側にある下矢印ボタンをクリックします。

  2. [Create new dataset] をクリックします。
  3. [Create Dataset] ダイアログで、データセット ID と有効期限を追加して [OK] をクリックします。

MusicBrainz テーブルをインポートする

次の表に、このステップで使用する JSON ファイルの場所を示します。

テーブル名スキーマ ファイルデータファイル
artist
https://storage.googleapis.com/solutions-public-assets/bqetl/artist_schema.json
gs://solutions-public-assets/bqetl/artist.json
artist_credit_name
https://storage.googleapis.com/solutions-public-assets/bqetl/artist_credit_name_schema.json
gs://solutions-public-assets/bqetl/artist_credit_name.json 
recording
https://storage.googleapis.com/solutions-public-assets/bqetl/recording_schema.json
gs://solutions-public-assets/bqetl/recording.json

各 MusicBrainz テーブルで、次の操作を行います。

  1. 上で作成したデータセットにテーブルを追加します。BigQuery 列のデータセット名にカーソルを合わせて、プラス記号をクリックします。
  2. [Create Table] ダイアログで、[Location] プルダウン リストから [Google Cloud Storage] を選択します。
  3. [Location] プルダウン リストの右側にあるテキスト フィールドに、データファイルの URL を入力します(例: gs://solutions-public-assets/bqetl/artist.json)
  4. [Table name] にテーブル名を入力します(例: artist)
  5. [File format] で [JSON(Newline Delimited)] を選択します。
  6. [Table type] で [Native table] を選択したままにします。
  7. [Schema] セクションで [Edit as Text] をクリックします。
  8. 上の表の URL を使用して、テーブルのスキーマをダウンロードします。
  9. ダウンロードしたスキーマ ファイルの内容で [Schema] セクションの内容を置き換えます。
  10. [Create Table] をクリックします。

データを非正規化する

データを非正規化するには、アーティストのレコーディングを 1 行とする新しい BigQuery テーブルにデータを結合し、分析用に特定のメタデータを使用します。

  1. 次のクエリをクエリのテキスト ボックスにコピーします。

    SELECT artist.id, artist.gid, artist.name, artist.area, recording.name,recording.length, recording.gid, recording.video FROM[DATASET].artist as artist inner join[DATASET].artist_credit_name as artist_credit_name on artist.id= artist_credit_name.artist inner join [DATASET].recording asrecording on artist_credit_name.artist_credit = recording.artist_credit
  2. [DATASET] は、データセットの名前で置き換えます。

  3. [Show Options] をクリックし、次のように選択します。

    1. [Table for Destination Table] を選択して、recordings_by_artists という名前で新しいテーブルを作成します。
    2. [Allow Large Results] をオンにします。
    3. [Write Preference] で [Overwrite table] を選択します。
    4. [Run Query] をクリックします。

新しく作成した BigQuery テーブルでは、データがアーティストの曲ごとに編成されます。

方法 2: Cloud Dataflow で BigQuery に ETL を行う

このセクションでは、BigQuery ウェブ UI ではなく、サンプル プログラムを使用します。このプログラムでは、Cloud Dataflow パイプラインを使用してデータを BigQuery に読み込みます。次に、Cloud Dataflow プログラミング モデルを使用してデータの非正規化とクレンジングを行い、BigQuery に読み込みます。

始める前に、概念とサンプルコードを確認しておきましょう。

概念を確認する

ここで使用するデータはサイズが小さく、BigQuery ウェブ UI でも簡単に読み込むことができます。しかし、大規模な結合処理(500~5000 行で、10 TB を超えるデータ)の場合には、次の理由から BigQuery ウェブ UI ではなく Cloud Dataflow を使用して BigQuery に ETL を行います。

  • BigQuery に読み込むときにデータをクレンジングできる。後で保存して結合する必要はありません(TB あたり $5 かかります)。また、ストレージ要件も低くなります。
  • カスタムデータのクレンジングを行う。
  • OLTP 以外のデータ(ログやリモートからアクセスするデータなど)とデータを組み合わせる。
  • 継続的インテグレーションまたは継続的デプロイ(CI / CD)でデータの読み込みを自動化する。
  • 段階的な繰り返しにより、ETL プロセスの強化 / 改善が期待できる。
  • ETL を 1 回で行うのではなく、データを増分的に追加する。

次の図は、サンプル プログラムが作成するデータ パイプラインを表しています。

このサンプルコードでは、多くのパイプライン ステップがグループ化され、便利なメソッドでラッピングされています。わかりやすい名前が付いているので、再利用も簡単です。上の図で、再利用されるステップはダッシュで囲まれています。

パイプライン コードを確認する

このコードでは、次の操作を実行するパイプラインを作成します。

  1. 結合するテーブルを PCollection に読み込みます。各要素は、テーブル行の JSON 表現で構成されています。

    public static PCollection<String> loadText(Pipeline p, String name) { BQETLOptions options = (BQETLOptions) p.getOptions(); String loadingBucket = options.getLoadingBucketURL(); String objectToLoad = storedObjectName(loadingBucket, name); return p.apply(TextIO.Read.named(name).from(objectToLoad));}

  2. これらの JSON 文字列をオブジェクト表現(MusicBrainzDataObject オブジェクト)に変換し、列の値の 1 つ(主キーまたは外部キー)で編成します。

    public static PCollection<KV<Long, MusicBrainzDataObject>> loadTableFromText(PCollection<String> text, String name, String keyName) { final String namespacedKeyname = name + "_" + keyName; return text.apply("load " + name, MapElements.via((String input) -> { MusicBrainzDataObject datum = JSONReader.readObject(name, input); Long key = (Long) datum.getColumnValue(namespacedKeyname); return KV.of(key, datum); }).withOutputType(new TypeDescriptor<KV<Long, MusicBrainzDataObject>>() { }));}

  3. artist テーブルと artist_credit_name テーブルを MusicBrainzDataObject オブジェクトのリストに変換したら、共通のアーティストで 2 つのリストを結合します。artist_credit_name がアーティストのクレジットとレコーディングをリンクし、アーティストの外部キーが設定されます。artist_credit_name テーブルがキー値 KV オブジェクトのリストとして読み込まれます。K のメンバーがアーティストです。

    PCollection<MusicBrainzDataObject> artistCredits = MusicBrainzTransforms.innerJoin("artists with artist credits", artists, artistCreditName);

    次のように、MusicBrainzTransforms.innerJoin() メソッドでリストが結合されます。

    public static PCollection<MusicBrainzDataObject> innerJoin(String name, PCollection<KV<Long, MusicBrainzDataObject>> table1, PCollection<KV<Long, MusicBrainzDataObject>> table2) { final TupleTag<MusicBrainzDataObject> t1 = new TupleTag<MusicBrainzDataObject>(); final TupleTag<MusicBrainzDataObject> t2 = new TupleTag<MusicBrainzDataObject>(); PCollection<KV<Long, CoGbkResult>> joinedResult = group(name, table1, table2, t1, t2);

MusicBrainzTransforms.innerJoin() メソッドは次のことを行います。

  1. 結合するキーメンバーで KV オブジェクトのコレクションをグループ化します。KV オブジェクトの PCollection に長整数型キー(artist.id 列の値)が設定され、CoGbkResult が生成されます(キーの結果でグループが結合されていることを表します)。CoGbkResult オブジェクトは、最初と 2 番目の PCollections に共通のキー値を持つオブジェクト リストのタプルです。このタプルは、group by 演算の実行前に各 PCollection に構成されたタプルタグでアクセスできます。

  2. オブジェクトの一致を MusicBrainzDataObject オブジェクトにマージし、結合結果を表します。

    PCollection<List<MusicBrainzDataObject>> mergedResult = joinedResult.apply("merge join results", MapElements.via((KV<Long, CoGbkResult> group) -> { List<MusicBrainzDataObject> result = new ArrayList<MusicBrainzDataObject>(); Iterable<MusicBrainzDataObject> leftObjects = group.getValue().getAll(t1); Iterable<MusicBrainzDataObject> rightObjects = group.getValue().getAll(t2); leftObjects.forEach((MusicBrainzDataObject l) -> { rightObjects.forEach((MusicBrainzDataObject r) -> { result.add(l.duplicate().merge(r)); }); }); return result;}).withOutputType(new TypeDescriptor<List<MusicBrainzDataObject>>() {}));

  3. MusicBrainzDataObject オブジェクト リストから構成されるリストが生成されます。各リストで、最初のテーブルにある一致の合計数に 2 番目のテーブルにある一致の合計数を掛けて特定のキー値が生成されます。複数リストが 1 つの PCollection にフラット化され、複数のリストの MusicBrainzDataObject オブジェクトがすべて含まれます。

    return mergedResult.apply(new Flatten.FlattenIterables<>());

  4. コレクションを KV オブジェクトのリストに編成し、次の結合を開始します。ここで、K 値は artist_credit 列で、recording テーブルとの結合に使用されます。

    PCollection<KV<Long,MusicBrainzDataObject>> artistCreditNamesByArtistCredit = MusicBrainzTransforms.by("artist_credit_name_artist_credit", artistCredits);

  5. MusicBrainzDataObject オブジェクトの最終的なコレクションを取得します。この結果を artist_credit.id で編成された recordings のコレクションと結合します。

    PCollection<MusicBrainzDataObject> artistRecordings = MusicBrainzTransforms.innerJoin("joined recordings", artistCreditNamesByArtistCredit, recordingsByArtistCredit);

  6. 結果の MusicBrainzDataObjects オブジェクトを TableRows にマッピングします。

    PCollection<TableRow> tableRows = MusicBrainzTransforms.transformToTableRows(artistRecordings, bqTableSchema);

  7. 結果のテーブルを BigQuery に書き込みます。

    tableRows.apply(BigQueryIO.Write .named("Write") .to(BQETLOptions.getBigQueryTablename()) .withSchema(bqTableSchema) .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE) .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

Cloud Dataflow パイプライン プログラミングの詳細については、プログラミング モデルに関する次のトピックをご覧ください。

コードの処理内容を確認したら、コードを実行しましょう。

パイプライン コードを実行する

  1. 作成したプロジェクトを使用するように gcloud SDK を設定します。

    gcloud config set project [PROJECT]
  2. Cloud Dataflow コードを含むレポジトリのクローンを作成します。

    git clone https://github.com/GoogleCloudPlatform/bigquery-etl-dataflow-sample.git
  3. サンプルのあるディレクトリに移動します。

    cd bigquery-etl-dataflow-sample
  4. Cloud Storage にステージング バケットを作成します。

    gsutil mb gs://[STAGING_BUCKET_NAME]
  5. [STAGING_BUCKET_NAME] のオブジェクト ライスサイクルに dataflow-staging-policy.json の値を設定します。

    gsutil lifecycle set dataflow-staging-policy.json gs://[STAGING_BUCKET_NAME]
  6. このチュートリアルのサンプル スクリプトを編集します。このコードは、Maven を呼び出し、パイプラインを実行します。

    1. run-simple.example ファイルを run-simple という名前のファイルにコピーし、コードを変更してプロジェクトの名前、BigQuery データセット、宛先テーブルを設定します。

    2. 結果が比較できるように、宛先テーブルには前のセクションと別のものを使用します。

  7. このサンプルの Cloud Dataflow ジョブを実行します。

    ./run-simple
  8. パイプラインでデータの書き込みが完了したら、次のスクリーンショットのように、新しいテーブルにクエリを実行します。

データをクレンジングする

次に、Cloud Dataflow パイプラインを少し変更します。次の図のように、ルックアップ テーブルを読み込み、副入力として処理します。

結果の BigQuery テーブルにクエリを実行する場合、アーティストの取得元を簡単に推測できません。MusicBrainz データベースでは、これは area になります。MusicBrainz データベースと同様に、結果の BigQuery テーブルでは area は ID で表示されます。これでクエリの結果を分析すると、上のクエリ結果のように簡単に分析を行うことができます。

同様に、アーティストの性別が ID で表されていますが、全体の MusicBrainz gender テーブルには 3 つの行しかありません。この問題を解決するため、MusicBrainz の area テーブルと gender テーブルを使用して、ID を正しいラベルにマッピングします。

artist_areaartist_gender は、アーティストやレコーディング データの量で制限がありませんが、それぞれ地理的なエリアや性別の制約を受けます。また、サイズも MB 単位の制約があります。この場合、副入力という Cloud Dataflow 機能を使用できます。

副入力は、行で区切りられた JSON 形式のテーブル エクスポートとして読み込まれます。

副入力をパイプラインに追加する

BQETLSimple.java ファイルには、いくつかのコメント行があります。

// PCollection<KV<Long,MusicBrainzDataObject>> artists = MusicBrainzTransforms.loadTable(p,"artist","id",// MusicBrainzTransforms.lookup("area", "id", "name", "area", "begin_area"),// MusicBrainzTransforms.lookup("gender","id","name","gender")); PCollection<KV<Long, MusicBrainzDataObject>> artists = MusicBrainzTransforms.loadTable(p, "artist", "id");

このコードは、副入力を使用してデータのクレンジングを行います。MusicBrainzTransforms クラスにより、副入力を使用して外部キーとラベルを簡単にマッピングできます。また、MusicBrainzTransforms ライブラリのメソッドを使用すると、内部ルックアップ クラスを作成できます。ルックアップ クラスは、ラベルと変数の長さ引数で置換されるルックアップ テーブルとフィールドを記述します。keyKey はルックアップ キーを含む列の名前です。valueKey は、対応するラベルを含む列の名前です。

public static LookupDescription lookup(String objectName, String keyKey, String valueKey, String... destinationKeys) { return new LookupDescription(objectName, keyKey, valueKey, destinationKeys);}

それぞれの副入力は、1 つのマップ オブジェクトとして読み込まれ、ID に対応するラベルの検索に使用されます。

まず、ルックアップ テーブルの JSON が空の名前空間を持つ MusicBrainzDataObjects に読み込まれ、Key 列値から Value 列値のマップに変換されます。

public static PCollectionView<Map<Long, String>> loadMapFromText(PCollection<String> text, String keyKey, String valueKey) { String keyKeyName = "_" + keyKey; String valueKeyName = "_" + valueKey; PCollection<KV<Long, String>> entries = text.apply(MapElements.via((String input) -> { MusicBrainzDataObject object = JSONReader.readObject("", input); Long key = (Long) object.getColumnValue(keyKeyName); String value = (String) object.getColumnValue(valueKeyName); return KV.of(key, value); }).withOutputType(new TypeDescriptor<KV<Long, String>>() { })); return entries.apply(View.<Long, String>asMap());}

これらの Map オブジェクトが destinationKey, の値で Map に読み込まれます。このキーは、検索された値で置き換わります。

Map<String, PCollectionView<Map<Long, String>>> mapSideInputs = new HashMap<String, PCollectionView<Map<Long, String>>>();for (LookupDescription mapper : mappers) { PCollectionView<Map<Long, String>> mapView = loadMap(text.getPipeline(), mapper.objectName, mapper.keyKey, mapper.valueKey); mapper.destinationKeys.forEach((destinationKey) -> { mapSideInputs.put(name + "_" + destinationKey, mapView); });}

JSON からアーティスト オブジェクトを変換するときに、destinationKey の値(数字で始まります)がラベルで置き換わります。

Map<Long, String> sideInputMap = processContext.sideInput(mapping);Long id = (Long) result.getColumnValue(key);if (id != null) { String label = (String) sideInputMap.get(id); if (label == null) { label = "" + id; } result.replace(key, label);

artist_areaartist_gender からデータをクリーンアップします。

  1. BQETLSimple.java で、ルックアップを使用してアーティストのデータを読み込む行のコメントを解除し、ルックアップを使用せずにアーティスト データを読み込むコードをコメントにします。

  2. 対応する int フィールドをコメントにし、対応する string フィールドのコメントを解除して、artist_areaartist_genderTableFieldSchemasint から string に変更します。

    /*Switch these two lines when using mapping table for artist_area */// .stringField("artist_area") .intField("artist_area")

    /*Switch these two lines when using mapping table for artist_gender */// .stringField("artist_gender") .intField("artist_gender")

     .intField("artist_begin_area")// .stringField("artist_begin_area")

  3. 簡単なパイプライン コードを再度実行して、次のスクリーンショットのように、artist_areaartist_gender を含む同じクエリを実行します。

BigQuery スキーマを最適化する

次の図は、若干異なる Cloud Dataflow パイプラインを表しています。ここでは、重複するアーティスト行を作成するのではなく、アーティストのレコーディングをアーティスト行にネストします。

現在のデータ表現はかなりフラットです。クレジットのあるレコーディングごとに 1 つの行が存在し、BigQuery スキーマから取得したすべてのアーティスト メタデータを含みます。また、すべてのレコーディングと artist_credit_name メタデータも含まれています。このフラットな表現には、少なくとも 2 つの欠点があります。

  • アーティストのレコーディングごとに artist メタデータを繰り返すため、必要なストレージが増加します。
  • データを JSON としてエクスポートすると、レコーディング データがネストされたアーティストではなく、このデータを繰り返す配列がエクスポートされます。おそらく、前者が必要なものです。

1 行に 1 つのレコーディングを保存するわけではないので、パフォーマンス上の問題はなく、追加のストレージも必要ありません。Cloud Dataflow パイプラインに簡単な変更を行うことで、アーティスト レコードの繰り返しフィールドとしてレコーディングを保存できます。

  1. アーティスト情報とレコーディングを artist_credit_name.artist で結合せずに、パイプラインがレコーディングのリストを作成し、アーティスト オブジェクト内にネストします。

    public static PCollection<MusicBrainzDataObject> nest(PCollection<KV<Long, MusicBrainzDataObject>> parent, PCollection<KV<Long, MusicBrainzDataObject>> child, String nestingKey) { final TupleTag<MusicBrainzDataObject> parentTag = new TupleTag<MusicBrainzDataObject>(); final TupleTag<MusicBrainzDataObject> childTag = new TupleTag<MusicBrainzDataObject>(); PCollection<KV<Long, CoGbkResult>> joinedResult = group("nest " + nestingKey, parent, child, parentTag, childTag); return joinedResult.apply("merge join results " + nestingKey, MapElements.via((KV<Long, CoGbkResult> group) -> { MusicBrainzDataObject parentObject = group.getValue().getOnly(parentTag); Iterable<MusicBrainzDataObject> children = group.getValue().getAll(childTag); List<MusicBrainzDataObject> childList = new ArrayList<MusicBrainzDataObject>(); children.forEach(childList::add); parentObject = parentObject.duplicate(); parentObject.addColumnValue("recordings", childList); return parentObject; }).withOutputType(new TypeDescriptor<MusicBrainzDataObject>() { }));}

    TableRow は、BigQuery API でサイズ制限があります。1 つのレコードにネストできるレコーディングは 1,000 要素に制限されています。特定のアーティストに 1,000 を超えるレコーディングが存在すると、コードが artist メタデータを含む行を複製し、複製した行にレコーディング データのネストを行います。

    private static List<TableRow> toTableRows(MusicBrainzDataObject mbdo, Map<String, Object> serializableSchema) { TableRow row = new TableRow(); List<TableRow> result = new ArrayList<TableRow>(); Map<String, List<MusicBrainzDataObject>> nestedLists = new HashMap<String, List<MusicBrainzDataObject>>(); Set<String> keySet = serializableSchema.keySet(); /* * construct a row object without the nested objects */ int maxListSize = 0; for (String key : keySet) { Object value = serializableSchema.get(key); Object fieldValue = mbdo.getColumnValue(key); if (fieldValue != null) { if (value instanceof Map) { List<MusicBrainzDataObject> list = (List<MusicBrainzDataObject>) fieldValue; if (list.size() > maxListSize) { maxListSize = list.size(); } nestedLists.put(key, list); } else { row.set(key, fieldValue); } } } /* * add the nested objects but break up the nested objects across duplicate rows if nesting limit exceeded */ TableRow parent = row.clone(); Set<String> listFields = nestedLists.keySet(); for (int i = 0; i < maxListSize; i++) { parent = (parent == null ? row.clone() : parent); final TableRow parentRow = parent; nestedLists.forEach((String key, List<MusicBrainzDataObject> nestedList) -> { if (nestedList.size() > 0) { if (parentRow.get(key) == null) { parentRow.set(key, new ArrayList<TableRow>()); } List<TableRow> childRows = (List<TableRow>) parentRow.get(key); childRows.add(toChildRow(nestedList.remove(0), (Map<String, Object>) serializableSchema.get(key))); } }); if ((i > 0) && (i % BIGQUERY_NESTING_LIMIT == 0)) { result.add(parent); parent = null; } } if (parent != null) { result.add(parent); } return result;}

  2. run-nested.example スクリプトを ./run-nested にコピーし、設定に一致するように #PROJECT#STAGING_BUCKET#DATASET#DESTINATION_TABLE を変更します。

  3. パイプラインを実行して、アーティスト行内にレコーディング行をネストします。

     ./run-nested