Skip to content

VEO 视频生成 代码示例 各种编程语言的 VEO API 使用示例

Python 完整示例

安装依赖

Copy pip install requests

完整客户端实现

Copy import requests import time import json

class VEOVideoClient: def init(self, base_url, api_key): self.base_url = base_url self.api_key = api_key self.headers = { 'Content-Type': 'application/json', 'Authorization': f'Bearer {api_key}' }

def submit_task(self, prompt, model='veo3', enhance_prompt=True, images=None):
    """提交视频生成任务"""
    request_body = {
        'prompt': prompt,
        'model': model,
        'enhance_prompt': enhance_prompt
    }

    if images:
        request_body['images'] = images

    response = requests.post(
        f'{self.base_url}/veo/v1/api/video/submit',
        headers=self.headers,
        json=request_body
    )

    if response.status_code != 200:
        raise Exception(f"提交失败: {response.text}")

    return response.json()['data']

def get_status(self, task_id):
    """查询任务状态"""
    response = requests.get(
        f'{self.base_url}/veo/v1/api/video/status/{task_id}',
        headers={'Authorization': f'Bearer {self.api_key}'}
    )

    if response.status_code != 200:
        raise Exception(f"查询失败: {response.text}")

    return response.json()['data']

def wait_for_completion(self, task_id, max_wait=1800, poll_interval=10):
    """等待任务完成(最多等待30分钟)"""
    start_time = time.time()

    while time.time() - start_time < max_wait:
        status_data = self.get_status(task_id)
        status = status_data.get('status')

        if status == 'completed':
            return status_data['result']
        elif status == 'failed':
            raise Exception(f"生成失败: {status_data.get('error')}")

        print(f"任务进行中... 状态: {status}")
        time.sleep(poll_interval)

    raise Exception("等待超时")

使用示例

if name == "main": client = VEOVideoClient('https://api.apiyi.com', 'your-api-key')

try:
    # 提交任务
    task = client.submit_task(
        prompt='一只猫咪在雨夜散步,准备抓一只老鼠',
        model='veo3',
        enhance_prompt=True
    )
    print(f'任务已提交,ID: {task["taskId"]}')

    # 等待完成
    result = client.wait_for_completion(task['taskId'])
    print(f'视频生成成功!')
    print(f'视频URL: {result["video_url"]}')

except Exception as e:
    print(f'错误: {e}')

Node.js 示例

安装依赖

Copy npm install axios

完整客户端实现

Copy const axios = require('axios');

class VEOVideoClient { constructor(baseUrl, apiKey) { this.baseUrl = baseUrl; this.apiKey = apiKey; this.headers = { 'Content-Type': 'application/json', 'Authorization': Bearer ${apiKey} }; }

async submitTask(prompt, model = 'veo3', enhancePrompt = true, images = null) {
    const requestBody = {
        prompt,
        model,
        enhance_prompt: enhancePrompt
    };

    if (images) {
        requestBody.images = images;
    }

    try {
        const response = await axios.post(
            `${this.baseUrl}/veo/v1/api/video/submit`,
            requestBody,
            { headers: this.headers }
        );

        return response.data.data;
    } catch (error) {
        throw new Error(`提交失败: ${error.response?.data?.message || error.message}`);
    }
}

async getStatus(taskId) {
    try {
        const response = await axios.get(
            `${this.baseUrl}/veo/v1/api/video/status/${taskId}`,
            { headers: { 'Authorization': `Bearer ${this.apiKey}` } }
        );

        return response.data.data;
    } catch (error) {
        throw new Error(`查询失败: ${error.response?.data?.message || error.message}`);
    }
}

async waitForCompletion(taskId, maxWait = 1800, pollInterval = 10) {
    const startTime = Date.now();

    while ((Date.now() - startTime) / 1000 < maxWait) {
        const statusData = await this.getStatus(taskId);
        const status = statusData.status;

        if (status === 'completed') {
            return statusData.result;
        } else if (status === 'failed') {
            throw new Error(`生成失败: ${statusData.error}`);
        }

        console.log(`任务进行中... 状态: ${status}`);
        await new Promise(resolve => setTimeout(resolve, pollInterval * 1000));
    }

    throw new Error('等待超时');
}

}

// 使用示例 async function main() { const client = new VEOVideoClient('https://api.apiyi.com', 'your-api-key');

try {
    // 提交任务
    const task = await client.submitTask(
        '一只猫咪在雨夜散步,准备抓一只老鼠',
        'veo3',
        true
    );
    console.log(`任务已提交,ID: ${task.taskId}`);

    // 等待完成
    const result = await client.waitForCompletion(task.taskId);
    console.log('视频生成成功!');
    console.log(`视频URL: ${result.video_url}`);

} catch (error) {
    console.error('错误:', error.message);
}

}

main();

Go 示例

Copy package main

import ( "bytes" "encoding/json" "fmt" "io/ioutil" "net/http" "time" )

type VEOClient struct { BaseURL string APIKey string }

type SubmitRequest struct { Prompt string json:"prompt" Model string json:"model" EnhancePrompt bool json:"enhance_prompt" Images []string json:"images,omitempty" }

type Response struct { Success bool json:"success" Data struct { TaskID string json:"taskId" Status string json:"status" Result struct { VideoURL string json:"video_url" } json:"result" } json:"data" }

func (c *VEOClient) SubmitTask(prompt, model string, enhancePrompt bool) (string, error) { reqBody := SubmitRequest{ Prompt: prompt, Model: model, EnhancePrompt: enhancePrompt, }

jsonData, _ := json.Marshal(reqBody)
req, err := http.NewRequest("POST", c.BaseURL+"/veo/v1/api/video/submit", bytes.NewBuffer(jsonData))
if err != nil {
    return "", err
}

req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+c.APIKey)

client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
    return "", err
}
defer resp.Body.Close()

body, _ := ioutil.ReadAll(resp.Body)
var response Response
json.Unmarshal(body, &response)

if !response.Success {
    return "", fmt.Errorf("提交失败")
}

return response.Data.TaskID, nil

}

func (c *VEOClient) GetStatus(taskID string) (*Response, error) { req, err := http.NewRequest("GET", c.BaseURL+"/veo/v1/api/video/status/"+taskID, nil) if err != nil { return nil, err }

req.Header.Set("Authorization", "Bearer "+c.APIKey)

client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
    return nil, err
}
defer resp.Body.Close()

body, _ := ioutil.ReadAll(resp.Body)
var response Response
json.Unmarshal(body, &response)

return &response, nil

}

func main() { client := VEOClient{ BaseURL: "https://api.apiyi.com", APIKey: "your-api-key", }

// 提交任务
taskID, err := client.SubmitTask("一只猫咪在雨夜散步", "veo3", true)
if err != nil {
    fmt.Printf("错误: %v\n", err)
    return
}
fmt.Printf("任务已提交,ID: %s\n", taskID)

// 轮询状态
for {
    time.Sleep(10 * time.Second)
    status, err := client.GetStatus(taskID)
    if err != nil {
        fmt.Printf("查询错误: %v\n", err)
        continue
    }

    if status.Data.Status == "completed" {
        fmt.Printf("视频生成成功!\n")
        fmt.Printf("视频URL: %s\n", status.Data.Result.VideoURL)
        break
    } else if status.Data.Status == "failed" {
        fmt.Printf("生成失败\n")
        break
    }

    fmt.Printf("任务进行中... 状态: %s\n", status.Data.Status)
}

}

Java 示例

Copy import com.google.gson.Gson; import okhttp3.*; import java.io.IOException; import java.util.concurrent.TimeUnit;

public class VEOVideoClient { private final String baseUrl; private final String apiKey; private final OkHttpClient client; private final Gson gson;

public VEOVideoClient(String baseUrl, String apiKey) {
    this.baseUrl = baseUrl;
    this.apiKey = apiKey;
    this.client = new OkHttpClient.Builder()
            .connectTimeout(30, TimeUnit.SECONDS)
            .readTimeout(30, TimeUnit.SECONDS)
            .build();
    this.gson = new Gson();
}

public static class SubmitRequest {
    String prompt;
    String model;
    boolean enhance_prompt;
    String[] images;

    public SubmitRequest(String prompt, String model, boolean enhancePrompt) {
        this.prompt = prompt;
        this.model = model;
        this.enhance_prompt = enhancePrompt;
    }
}

public static class ApiResponse {
    boolean success;
    Data data;

    public static class Data {
        String taskId;
        String status;
        Result result;

        public static class Result {
            String video_url;
        }
    }
}

public String submitTask(String prompt, String model, boolean enhancePrompt) throws IOException {
    SubmitRequest request = new SubmitRequest(prompt, model, enhancePrompt);
    String json = gson.toJson(request);

    RequestBody body = RequestBody.create(
            json, MediaType.parse("application/json"));

    Request httpRequest = new Request.Builder()
            .url(baseUrl + "/veo/v1/api/video/submit")
            .post(body)
            .addHeader("Authorization", "Bearer " + apiKey)
            .addHeader("Content-Type", "application/json")
            .build();

    try (Response response = client.newCall(httpRequest).execute()) {
        if (!response.isSuccessful()) {
            throw new IOException("提交失败: " + response);
        }

        ApiResponse apiResponse = gson.fromJson(
                response.body().string(), ApiResponse.class);

        return apiResponse.data.taskId;
    }
}

public ApiResponse getStatus(String taskId) throws IOException {
    Request request = new Request.Builder()
            .url(baseUrl + "/veo/v1/api/video/status/" + taskId)
            .addHeader("Authorization", "Bearer " + apiKey)
            .build();

    try (Response response = client.newCall(request).execute()) {
        if (!response.isSuccessful()) {
            throw new IOException("查询失败: " + response);
        }

        return gson.fromJson(
                response.body().string(), ApiResponse.class);
    }
}

public static void main(String[] args) throws Exception {
    VEOVideoClient client = new VEOVideoClient(
            "https://api.apiyi.com", "your-api-key");

    // 提交任务
    String taskId = client.submitTask(
            "一只猫咪在雨夜散步", "veo3", true);
    System.out.println("任务已提交,ID: " + taskId);

    // 轮询状态
    while (true) {
        Thread.sleep(10000); // 等待10秒

        ApiResponse status = client.getStatus(taskId);
        String currentStatus = status.data.status;

        if ("completed".equals(currentStatus)) {
            System.out.println("视频生成成功!");
            System.out.println("视频URL: " + status.data.result.video_url);
            break;
        } else if ("failed".equals(currentStatus)) {
            System.out.println("生成失败");
            break;
        }

        System.out.println("任务进行中... 状态: " + currentStatus);
    }
}

}

cURL 命令示例

基本用法

Copy

提交任务

curl -X POST "https://api.apiyi.com/veo/v1/api/video/submit"
-H "Content-Type: application/json"
-H "Authorization: Bearer your-api-key"
-d '{ "prompt": "一只猫咪在雨夜散步", "model": "veo3" }'

查询状态

curl -X GET "https://api.apiyi.com/veo/v1/api/video/status/YOUR_TASK_ID"
-H "Authorization: Bearer your-api-key"

Shell 脚本示例

Copy #!/bin/bash

API_KEY="your-api-key" BASE_URL="https://api.apiyi.com"

提交任务

submit_video() { local prompt="$1" local model="${2:-veo3}"

response=$(curl -s -X POST "$BASE_URL/veo/v1/api/video/submit" \
    -H "Content-Type: application/json" \
    -H "Authorization: Bearer $API_KEY" \
    -d "{
        \"prompt\": \"$prompt\",
        \"model\": \"$model\",
        \"enhance_prompt\": true
    }")

echo "$response" | jq -r '.data.taskId'

}

查询状态

check_status() { local task_id="$1"

curl -s -X GET "$BASE_URL/veo/v1/api/video/status/$task_id" \
    -H "Authorization: Bearer $API_KEY" | jq

}

等待完成

wait_for_completion() { local task_id="$1"

while true; do
    response=$(check_status "$task_id")
    status=$(echo "$response" | jq -r '.data.status')

    case "$status" in
        "completed")
            video_url=$(echo "$response" | jq -r '.data.result.video_url')
            echo "视频生成成功!"
            echo "视频URL: $video_url"
            break
            ;;
        "failed")
            echo "生成失败"
            break
            ;;
        *)
            echo "任务进行中... 状态: $status"
            sleep 10
            ;;
    esac
done

}

使用示例

echo "提交视频生成任务..." task_id=$(submit_video "一只猫咪在雨夜散步") echo "任务ID: $task_id"

wait_for_completion "$task_id"