Bài 35.3: Streaming responses và error handling

Chào mừng bạn trở lại với chuỗi bài viết về Lập trình Web Front-end! Hôm nay, chúng ta sẽ đi sâu vào một kỹ thuật hiện đại đang ngày càng phổ biến: Streaming responses và cách chúng ta xử lý các lỗi có thể xảy ra trong quá trình này.

Trong mô hình truyền thống của Web, khi trình duyệt gửi một yêu cầu (request) đến máy chủ (server), trình duyệt sẽ chờ đợi cho đến khi máy chủ chuẩn bị xong toàn bộ dữ liệu phản hồi (response) rồi mới gửi về một lần duy nhất. Điều này giống như việc bạn yêu cầu một ly cà phê và phải chờ người pha chế hoàn thành tất cả các bước, từ xay hạt, pha chế, cho đường sữa (nếu có), rồi mới mang ra cho bạn cả ly.

Tuy nhiên, với sự phát triển của các ứng dụng web tương tác cao, thời gian thực, và đặc biệt là sự bùng nổ của các dịch vụ AI tạo sinh (Generative AI), mô hình truyền thống này bộc lộ hạn chế. Người dùng muốn thấy kết quả ngay lập tức, từng phần một, thay vì phải chờ đợi toàn bộ quá trình xử lý hoàn tất. Đây chính là lúc Streaming responses tỏa sáng.

Streaming Responses là gì?

Streaming responses (Phản hồi dạng luồng) là kỹ thuật cho phép máy chủ gửi dữ liệu về trình duyệt theo từng phần nhỏ (chunks) ngay khi dữ liệu đó sẵn sàng, thay vì chờ có đủ toàn bộ dữ liệu. Trình duyệt có thể bắt đầu xử lý và hiển thị các phần dữ liệu này ngay lập tức.

Trở lại với ví dụ ly cà phê, streaming giống như việc người pha chế hoàn thành bước nào thì đưa nguyên liệu hoặc thành phẩm của bước đó cho bạn xem ngay lập tức: xay hạt xong bạn thấy hạt đã xay, pha xong nước cốt bạn thấy nước cốt, thêm đá bạn thấy đá,... cho đến khi hoàn thành ly. Bạn thấy quá trình và nhận được phản hồi từng bước, tạo cảm giác nhanh chóng và tương tác hơn rất nhiều.

Tại sao Streaming lại quan trọng?

  1. Cải thiện trải nghiệm người dùng (UX): Người dùng thấy nội dung xuất hiện dần dần, tạo cảm giác phản hồi tức thì, đặc biệt hữu ích cho các tác vụ tốn thời gian như tạo nội dung bằng AI.
  2. Hiệu quả về tài nguyên: Đối với dữ liệu lớn, streaming giúp client bắt đầu xử lý ngay mà không cần lưu trữ toàn bộ response trong bộ nhớ đệm.
  3. Ứng dụng thời gian thực: Là nền tảng cho các ứng dụng cần cập nhật liên tục như chat, thông báo, bảng tin chứng khoán,... (mặc dù thường dùng WebSockets hoặc SSE cho những trường hợp này, Fetch streaming cũng đang nổi lên).
  4. Tối ưu hiệu suất: Giảm độ trễ (latency) cảm nhận, giúp ứng dụng có vẻ nhanh hơn.
Các Kỹ thuật Streaming Phổ biến cho Front-end

Có một số cách để làm việc với streaming trên Front-end:

  • Server-Sent Events (SSE): Một giao thức đơn giản dựa trên HTTP cho phép máy chủ gửi dữ liệu dạng luồng một chiều đến client. Thường dùng cho các cập nhật từ server (ví dụ: thông báo mới, cập nhật dữ liệu trực tiếp). Dễ triển khai hơn WebSockets cho các trường hợp chỉ cần server gửi data.
  • WebSockets: Cung cấp kênh giao tiếp song công (two-way), toàn diện giữa client và server. Phù hợp cho các ứng dụng tương tác cao, thời gian thực như chat, game online. Tuy mạnh mẽ, nhưng cấu trúc hơi khác so với mô hình request/response truyền thống.
  • Fetch API with getReader(): Đây là phương pháp hiện đại và ngày càng phổ biến, đặc biệt trong bối cảnh các framework như Next.js và các API AI. Bạn sử dụng Fetch API quen thuộc, nhưng thay vì lấy toàn bộ response cùng lúc, bạn truy cập vào body của response dưới dạng một ReadableStream và đọc dữ liệu theo từng phần.

Trong bài viết này, chúng ta sẽ tập trung vào kỹ thuật sử dụng Fetch API với getReader() vì tính linh hoạt và sự phù hợp của nó với các API AI hiện đại.

Thực Hành: Streaming với Fetch API

Khi bạn thực hiện một yêu cầu Fetch và response có Content-Type thích hợp (ví dụ: text/event-stream, application/octet-stream hoặc đơn giản là không có Content-Type cụ thể mà server gửi data theo luồng), bạn có thể truy cập body của response dưới dạng luồng.

async function streamResponse() {
  try {
    const response = await fetch('/api/stream-data'); // Giả định có API trả về dữ liệu dạng luồng

    // Kiểm tra response có thành công không (status code 2xx)
    if (!response.ok) {
      throw new Error(`HTTP error! status: ${response.status}`);
    }

    // Kiểm tra xem body có tồn tại và là ReadableStream không
    if (!response.body) {
      throw new Error('Response body is not available or not a stream');
    }

    const reader = response.body.getReader();
    const decoder = new TextDecoder(); // Giúp decode các byte thành string

    let result = ''; // Biến để lưu trữ kết quả tích lũy
    let done = false;

    console.log("Bắt đầu đọc luồng dữ liệu...");

    while (!done) {
      const { value, done: readerDone } = await reader.read(); // Đọc chunk tiếp theo

      done = readerDone;

      if (value) {
        const chunk = decoder.decode(value, { stream: true }); // Decode chunk
        result += chunk;
        console.log(`Nhận được chunk: ${chunk}`);
        // Cập nhật UI tại đây, ví dụ: hiển thị chunk mới nhận được
        document.getElementById('output').innerText = result;
      }
    }

    console.log("Kết thúc luồng dữ liệu. Toàn bộ kết quả:", result);

  } catch (error) {
    console.error('Có lỗi xảy ra khi streaming:', error);
    document.getElementById('output').innerText = 'Lỗi: ' + error.message;
  }
}

// Gọi hàm khi cần bắt đầu stream
// streamResponse();

Giải thích Code:

  1. Chúng ta sử dụng await fetch(url) như bình thường.
  2. Kiểm tra response.ok để xác định yêu cầu ban đầu có thành công hay không.
  3. Lấy reader từ response.body.getReader(). reader này là một ReadableStreamDefaultReader cho phép chúng ta đọc dữ liệu từ luồng.
  4. Sử dụng TextDecoder để chuyển đổi dữ liệu dạng Uint8Array (byte) mà reader trả về thành chuỗi (string). Thuộc tính { stream: true } cho decode rất quan trọng vì nó cho phép bộ giải mã xử lý dữ liệu theo từng phần, ngay cả khi một ký tự Unicode bị ngắt quãng giữa hai value khác nhau.
  5. Vòng lặp while (!done) liên tục gọi await reader.read(). Mỗi lần đọc sẽ trả về một đối tượng với hai thuộc tính: value (dữ liệu dạng Uint8Array) và done (boolean, true khi luồng kết thúc).
  6. Bên trong vòng lặp, chúng ta decode value thành chunk string và xử lý nó. Ở đây, chúng ta chỉ đơn giản nối các chunk lại và hiển thị. Trong thực tế, bạn có thể cần xử lý từng chunk dựa trên định dạng dữ liệu trả về từ server (ví dụ: các dòng dữ liệu dạng data: ...\n\n trong SSE, hoặc các token từ API AI).
  7. Khi donetrue, vòng lặp kết thúc, nghĩa là luồng đã đóng.
  8. Khối catch bao bọc toàn bộ quá trình để bắt các lỗi xảy ra.
Tầm Quan Trọng của Error Handling trong Streaming

Trong khi streaming mang lại nhiều lợi ích, nó cũng đặt ra những thách thức mới về xử lý lỗi. Không giống như mô hình truyền thống chỉ có hai trạng thái chính (thành công hoặc lỗi khi nhận toàn bộ response), lỗi có thể xảy ra bất cứ lúc nào trong quá trình stream.

Các loại lỗi có thể gặp phải:

  • Lỗi yêu cầu ban đầu: Server trả về status code lỗi (4xx, 5xx) ngay từ đầu.
  • Lỗi mạng giữa chừng: Kết nối bị gián đoạn trong khi đang stream dữ liệu.
  • Lỗi từ phía Server: Server gặp lỗi xử lý trong khi đang stream, và có thể gửi một thông báo lỗi hoặc chỉ đơn giản là đóng kết nối.
  • Lỗi Client-side: Xảy ra khi bạn cố gắng xử lý (decode, parse) một chunk dữ liệu bị lỗi hoặc không đúng định dạng.

Nếu không xử lý lỗi đúng cách, ứng dụng của bạn có thể bị treo, hiển thị thông tin sai lệch, hoặc tệ hơn là không thông báo cho người dùng biết có sự cố.

Chiến Lược Xử Lý Lỗi Hiệu Quả
  1. Kiểm tra lỗi ban đầu: Luôn kiểm tra response.ok (hoặc response.status) ngay sau khi gọi fetch. Đây là cách bắt các lỗi HTTP tiêu chuẩn (404, 500, v.v.) trước khi bắt đầu đọc luồng.
  2. Sử dụng try...catch cho quá trình đọc luồng: Bọc vòng lặp while (!done) trong khối try...catch. Điều này sẽ bắt các lỗi xảy ra trong quá trình gọi reader.read(), thường liên quan đến lỗi mạng hoặc luồng bị đóng bất ngờ từ phía server.
  3. Xử lý lỗi dựa trên nội dung luồng: Server có thể gửi các thông báo lỗi như một phần của luồng dữ liệu (ví dụ: một chunk chứa JSON { "error": "..." }). Bạn cần parse từng chunk nhận được và kiểm tra xem nó có phải là thông báo lỗi theo định dạng mà server quy định hay không. Nếu đúng, dừng quá trình đọc luồng và xử lý lỗi.
  4. Xử lý lỗi Client-side khi xử lý chunk: Bọc logic xử lý từng chunk (ví dụ: parsing JSON) trong một khối try...catch riêng để bắt các lỗi xảy ra trong code client của bạn.
  5. Thông báo cho người dùng: Khi phát hiện lỗi, hãy thông báo rõ ràng cho người dùng biết có sự cố xảy ra.
  6. Cleanup: Sử dụng AbortController để có thể hủy yêu cầu và đóng luồng một cách chủ động khi component bị unmount (trong React/Next.js) hoặc khi người dùng hủy thao tác. Điều này giúp tránh rò rỉ bộ nhớ và các hoạt động không cần thiết.
Thực Hành: Thêm Xử Lý Lỗi vào Streaming Fetch

Hãy nâng cấp ví dụ trước để bao gồm các chiến lược xử lý lỗi đã nêu. Giả sử server có thể trả về một chunk JSON { "error": "Some error message" } trong luồng data.

async function streamResponseWithErrors() {
  const outputElement = document.getElementById('output');
  outputElement.innerText = 'Đang kết nối và stream dữ liệu...'; // Trạng thái ban đầu

  const controller = new AbortController(); // Để có thể hủy request
  const signal = controller.signal;

  try {
    // Bước 1: Kiểm tra lỗi yêu cầu ban đầu
    const response = await fetch('/api/stream-data-with-errors', { signal }); // Sử dụng signal

    if (!response.ok) {
      const errorBody = await response.text(); // Đọc body lỗi nếu có
      throw new Error(`HTTP error! status: ${response.status}, body: ${errorBody}`);
    }

    if (!response.body) {
      throw new Error('Response body is not available or not a stream');
    }

    const reader = response.body.getReader();
    const decoder = new TextDecoder();

    let result = '';
    let done = false;

    console.log("Bắt đầu đọc luồng dữ liệu...");

    // Bước 2 & 3 & 4: Xử lý lỗi trong quá trình đọc luồng và xử lý chunk
    while (!done) {
      try {
        const { value, done: readerDone } = await reader.read();
        done = readerDone;

        if (value) {
          const chunkText = decoder.decode(value, { stream: true });

          // Cố gắng xử lý từng chunk, ví dụ: kiểm tra format JSON lỗi
          try {
             // Giả định server gửi các dòng data, có thể là JSON hoặc text thuần
             // Cần xử lý dựa trên định dạng thực tế của API bạn dùng
             // Ví dụ đơn giản: Nếu chunk chứa 'ERROR:', coi đó là lỗi server gửi trong luồng
             if (chunkText.includes('ERROR:')) {
                 throw new Error(`Lỗi từ server trong luồng: ${chunkText.substring(chunkText.indexOf('ERROR:') + 6).trim()}`);
             }

             // Nếu không phải lỗi, nối vào kết quả và cập nhật UI
             result += chunkText;
             outputElement.innerText = result;
             console.log(`Nhận và xử lý chunk thành công: ${chunkText}`);

          } catch (processingError) {
             // Bước 4: Bắt lỗi khi xử lý chunk (ví dụ: JSON.parse lỗi nếu format không chuẩn)
             console.error('Lỗi khi xử lý chunk data:', processingError);
             // Dừng stream và báo lỗi xử lý
             reader.cancel(); // Hủy reader để dừng stream
             throw new Error(`Lỗi xử lý dữ liệu nhận được: ${processingError.message}`);
          }
        }

      } catch (readError) {
        // Bước 2: Bắt lỗi khi đọc luồng (mạng, server đóng kết nối đột ngột)
        console.error('Lỗi khi đọc từ reader:', readError);
        // Dừng stream và báo lỗi đọc
        reader.cancel(); // Đảm bảo reader bị hủy
        throw new Error(`Lỗi kết nối hoặc đọc luồng dữ liệu: ${readError.message}`);
      }
    }

    console.log("Kết thúc luồng dữ liệu. Toàn bộ kết quả:", result);
    outputElement.innerText = result + "\n\n(Hoàn thành)";

  } catch (finalError) {
    // Bước 5: Thông báo cho người dùng và log lỗi cuối cùng
    console.error('Có lỗi tổng quát xảy ra khi streaming:', finalError);
    outputElement.innerText = `Lỗi xảy ra: ${finalError.message}`;
  } finally {
    // Cleanup: Đảm bảo luồng được đóng nếu không tự đóng
    // reader.cancel() trong catch blocks giúp làm điều này, nhưng thêm ở đây cũng được.
    console.log("Quá trình streaming kết thúc (hoặc bị hủy/lỗi).");
  }
}

// Để hủy stream từ bên ngoài (ví dụ: click nút Hủy)
// controller.abort();

// streamResponseWithErrors();

Giải thích Code Nâng Cao:

  1. Thêm AbortController để cho phép hủy bỏ yêu cầu bất cứ lúc nào. signal được truyền vào fetch.
  2. Khối try...catch đầu tiên vẫn bắt lỗi HTTP ban đầu. Thêm việc đọc response.text() trong trường hợp lỗi để lấy thông báo chi tiết hơn từ server.
  3. Vòng lặp while (!done) bây giờ được bọc trong một try...catch thứ hai. catch(readError) này sẽ bắt các lỗi xảy ra trong khi đang cố gắng đọc từ luồng (ví dụ: mạng yếu, server đóng kết nối đột ngột). Khi bắt được lỗi này, chúng ta gọi reader.cancel() để đảm bảo luồng được đóng đúng cách và sau đó throw lại lỗi để khối catch lớn nhất xử lý và hiển thị cho người dùng.
  4. Bên trong vòng lặp, sau khi nhận được chunkText, chúng ta thêm một khối try...catch thứ ba bọc logic xử lý chunk (try...catch(processingError)). Đây là nơi bạn kiểm tra định dạng chunk, parse JSON nếu cần, và bắt các lỗi trong code client khi xử lý chunk đó. Nếu có lỗi xử lý, chúng ta cũng gọi reader.cancel()throw lỗi.
  5. Giả định logic kiểm tra lỗi server trong luồng: chunkText.includes('ERROR:'). Đây là một ví dụ đơn giản. Trong thực tế với các API AI, bạn có thể nhận các dòng dữ liệu theo chuẩn SSE (data: ...\n\n) và cần parse JSON từ đó, kiểm tra các thuộc tính error hoặc finish_reason (trong OpenAI API chẳng hạn). Logic này cần được tinh chỉnh dựa trên API cụ thể.
  6. Khối catch lớn nhất bao bọc tất cả, là nơi cuối cùng bắt mọi lỗi đã throw ra, hiển thị thông báo tổng quát cho người dùng và log chi tiết vào console.
  7. Khối finally (không bắt buộc nhưng là best practice) đảm bảo một số hành động dọn dẹp luôn được thực thi, dù thành công hay thất bại.
Xử lý trong môi trường React/Next.js

Trong React hoặc Next.js, bạn thường sẽ quản lý trạng thái của luồng (dữ liệu nhận được, trạng thái loading, trạng thái lỗi) bằng useState và thực hiện yêu cầu Fetch trong useEffect.

import React, { useState, useEffect, useRef } from 'react';

function StreamingComponent() {
  const [streamedData, setStreamedData] = useState('');
  const [isLoading, setIsLoading] = useState(false);
  const [error, setError] = useState(null);

  // Dùng useRef để lưu controller AbortController
  const abortControllerRef = useRef(null);

  const startStreaming = async () => {
    setIsLoading(true);
    setStreamedData(''); // Reset data cũ
    setError(null); // Reset lỗi cũ

    // Tạo AbortController mới cho mỗi lần request
    abortControllerRef.current = new AbortController();
    const signal = abortControllerRef.current.signal;

    try {
      const response = await fetch('/api/stream-data-with-errors', { signal });

      if (!response.ok) {
        const errorBody = await response.text();
        throw new Error(`HTTP error! status: ${response.status}, body: ${errorBody}`);
      }

      if (!response.body) {
        throw new Error('Response body is not available or not a stream');
      }

      const reader = response.body.getReader();
      const decoder = new TextDecoder();

      let result = '';
      let done = false;

      while (!done) {
        // Check if component is still mounted or request is not aborted
        if (signal.aborted) {
             console.log('Stream aborted');
             reader.cancel().catch(() => {}); // Attempt to cancel reader gracefully
             break; // Exit the loop
        }

        const { value, done: readerDone } = await reader.read();
        done = readerDone;

        if (value) {
          const chunkText = decoder.decode(value, { stream: true });

          // Logic xử lý chunk và kiểm tra lỗi trong luồng tương tự ví dụ JS thuần
          // ... (bạn đưa logic xử lý chunk và kiểm tra lỗi từ ví dụ trên vào đây)
          // Cập nhật state: setStreamedData(prev => prev + processedChunk);
          result += chunkText; // Vẫn dùng result để tích lũy
          setStreamedData(result); // Cập nhật state

        }
      }
      setIsLoading(false); // Hoàn thành stream

    } catch (err) {
      if (err.name === 'AbortError') {
         console.log('Fetch aborted');
         // Không làm gì đặc biệt với AbortError nếu nó là do component unmount
         // hoặc do người dùng chủ động hủy.
      } else {
         console.error('Streaming failed:', err);
         setError(err.message); // Cập nhật state lỗi
         setIsLoading(false); // Dừng loading
      }
    } finally {
       // Đảm bảo controller được reset
       abortControllerRef.current = null;
    }
  };

  // Sử dụng useEffect để bắt đầu stream khi component mount
  useEffect(() => {
    // startStreaming(); // Bạn có thể gọi hàm này dựa trên một sự kiện khác, ví dụ click nút

    // Cleanup function: Hủy stream khi component unmount
    return () => {
      if (abortControllerRef.current) {
        console.log('Component unmounting, aborting stream...');
        abortControllerRef.current.abort();
      }
    };
  }, []); // Dependency rỗng, chỉ chạy 1 lần khi mount/unmount

  // Logic để gọi startStreaming khi người dùng nhấn nút
  const handleStartClick = () => {
     if (!isLoading) { // Tránh gọi lại khi đang stream
        startStreaming();
     }
  };

  // Hiển thị UI
  return (
    <div>
      <h2>Streaming Response Example</h2>
      <button onClick={handleStartClick} disabled={isLoading}>
         {isLoading ? 'Đang tải...' : 'Bắt đầu Stream Data'}
      </button>
      {error && <p style={{ color: 'red' }}>Lỗi: {error}</p>}
      <div style={{ border: '1px solid #ccc', padding: '10px', minHeight: '100px', marginTop: '10px', whiteSpace: 'pre-wrap' }}>
        {streamedData || (isLoading ? 'Đang nhận dữ liệu...' : 'Nhấn nút để bắt đầu stream.')}
      </div>
    </div>
  );
}

// export default StreamingComponent; // Xuất component để sử dụng

Điểm cần lưu ý trong React:

  1. Sử dụng useState để quản lý dữ liệu hiển thị (streamedData), trạng thái tải (isLoading), và lỗi (error).
  2. Sử dụng useRef để lưu trữ AbortController. Điều này quan trọng vì AbortController cần tồn tại trong suốt vòng đời của request, và useRef giữ giá trị giữa các lần render mà không gây re-render khi thay đổi.
  3. Sử dụng useEffect với cleanup function (return () => {...}) để hủy bỏ yêu cầu Fetch (sử dụng controller.abort()) khi component bị unmount. Điều này ngăn chặn việc cập nhật state trên một component đã bị hủy (gây lỗi "Can't perform a React state update on an unmounted component").
  4. Logic xử lý chunk và cập nhật streamedData cần đảm bảo cập nhật state tăng dần (setStreamedData(prev => prev + newChunk)). (Trong ví dụ trên, tôi dùng biến result tạm để tích lũy rồi set state 1 lần sau mỗi chunk, cách khác là set state ngay trong vòng lặp với functional update).
  5. Xử lý AbortError trong catch block. Lỗi này xảy ra khi bạn chủ động gọi controller.abort(). Bạn thường không muốn hiển thị lỗi này cho người dùng nếu đó là do người dùng tự hủy hoặc component unmount.
Tóm lại

Streaming responses là một kỹ thuật mạnh mẽ và ngày càng cần thiết trong lập trình web hiện đại, đặc biệt khi làm việc với lượng dữ liệu lớn hoặc các dịch vụ AI. Việc sử dụng Fetch API với getReader() cung cấp một cách tiếp cận linh hoạt để xử lý các luồng dữ liệu trực tiếp trên Front-end.

Tuy nhiên, tính chất không đồng bộ và liên tục của streaming đòi hỏi một chiến lược xử lý lỗi cẩn thận và toàn diện. Bạn cần chuẩn bị để bắt và xử lý lỗi ở nhiều giai đoạn: từ yêu cầu ban đầu, trong suốt quá trình đọc luồng, khi xử lý từng phần dữ liệu, và đảm bảo cleanup đúng cách (ví dụ: sử dụng AbortController trong React/Next.js) để tránh các vấn đề tiềm ẩn.

Nắm vững kỹ thuật streaming và xử lý lỗi đi kèm sẽ giúp bạn xây dựng các ứng dụng web nhanh hơn, tương tác hơn và đáng tin cậy hơn, đặc biệt là trong thế giới ngày càng kết nối và dựa trên AI.

Chúc bạn học tốt và hẹn gặp lại trong các bài viết tiếp theo!

Comments

There are no comments at the moment.