コンテンツにスキップ

Line Messaging APIを使って画像・動画を取得する

意外とMessaging APIについての情報が少ないので、メモとして残しておきます。

先にやっておくこと

  • Line DevelopersからMessging APIのチャネルを作っている
  • Webhook設定済みである
  • 私はLambdaの関数 URLを設定して確認した
  • そのため、サンプルコードもLambda前提である
  • チャネルアクセストークンを発行済みであること

サンプルコード書いていますが、そこではS3を使っています。必要に応じてその辺の設定もしてください。

手順

  1. Lineから作っておいたチャネルを友だち登録する
  2. チャネルに画像または動画を投稿する

そうすると、LineからWebhookへリクエストされます。

サンプルコード

以下は、Lineからデータを取得してS3にアップロードするLambda + Pythonのサンプルコードです。

import json
import os
import urllib.request

import boto3

CHANNEL_ACCESS_TOKEN = "your token"
S3_BUCKET = "your bucket"

def lambda_handler(event, context):
    print(json.dumps(event))

    for message_event in json.loads(event["body"])["events"]:
        message_type = message_event["message"]["type"]

        # 画像・動画以外は受け付けない
        if message_type not in ["image", "video"]:
            continue

        message_id = message_event["message"]["id"]

        # 画像・動画ファイルを取得する
        content = fetch_content(message_id=message_id)

        # 画像・動画ファイルをS3にアップロード
        original_filename = os.path.join("original", message_id)
        upload_s3(bin=content, filename=original_filename)

        # プレビュー画像を取得する
        preview_content = fetch_preview_content(message_id=message_id)

        # プレビュー画像をS3にアップロード
        preview_filename = os.path.join("preview", message_id)
        upload_s3(bin=preview_content, filename=preview_filename)

    return {"statusCode": 200, "body": json.dumps("Hello from Lambda!")}


def fetch_content(message_id: str) -> bytes:
    url = f"https://api-data.line.me/v2/bot/message/{message_id}/content"
    return request_get(url=url)


def fetch_preview_content(message_id: str) -> bytes:
    url = f"https://api-data.line.me/v2/bot/message/{message_id}/content/preview"
    return request_get(url=url)


def request_get(url: str) -> bytes:
    headers = {
        "Content-Type": "application/json; charset=UTF-8",
        "Authorization": f"Bearer {CHANNEL_ACCESS_TOKEN}",
    }
    req = urllib.request.Request(url, method="GET", headers=headers)
    with urllib.request.urlopen(req) as res:
        return res.read()


def upload_s3(bin: bytes, filename: str) -> str:
    s3 = boto3.resource("s3")
    bucket = s3.Bucket(S3_BUCKET)
    bucket.put_object(Body=bin, Key=filename)

eventログ

body部分だけ参考として載せます。

画像

{
  "body": "{\"destination\":\"U668a9c2c0b1469cd8d5e984672961913\",\"events\":[{\"type\":\"message\",\"message\":{\"type\":\"image\",\"id\":\"463074078431562241\",\"contentProvider\":{\"type\":\"line\"}},\"webhookEventId\":\"01H4VG23S8TRNE9XJVEEE3HHFP\",\"deliveryContext\":{\"isRedelivery\":false},\"timestamp\":1688844963119,\"source\":{\"type\":\"user\",\"userId\":\"U0b465adc30fb8ac1fc6a414d06a0b1c7\"},\"replyToken\":\"fc66ac7d07bb408aa0f272cfb384d0f8\",\"mode\":\"active\"}]}"
}

動画

{
  "body": "{\"destination\":\"U668a9c2c0b1837cd1d5e984672961913\",\"events\":[{\"type\":\"message\",\"message\":{\"type\":\"video\",\"id\":\"463076447542233697\",\"duration\":34208,\"contentProvider\":{\"type\":\"line\"}},\"webhookEventId\":\"01H4VHD7F932NQFT58QE4FCB13\",\"deliveryContext\":{\"isRedelivery\":false},\"timestamp\":1688846375921,\"source\":{\"type\":\"user\",\"userId\":\"U0b465adc30fb8ac1fc6a414d06a0b1c7\"},\"replyToken\":\"e2b900a4dbef4f088a70d0097873dc60\",\"mode\":\"active\"}]}"
}

ドキュメント

  • https://developers.line.biz/ja/reference/messaging-api/#getting-content