qinfengge

qinfengge

醉后不知天在水,满船清梦压星河
github

春季 AI (二) 流式輸出

在上文,實現了一個簡單的調用輸出,使用的是 call 方法,此方法會等待結果的完整返回,所以耗時會比較高一點。

DEBUG 也可以看到結果是一起返回的。

image

但通常我們使用 AI 對話時結果是一個字一個字或一段一段蹦出來的,這裡用的就是流式輸出。

flux#

仔細看官方文檔會發現其實是有流式輸出的代碼的

image

@GetMapping("/ai/generateStream")
	public Flux<ChatResponse> generateStream(@RequestParam(value = "message", defaultValue = "Tell me a joke") String message) {
        Prompt prompt = new Prompt(new UserMessage(message));
        return chatClient.stream(prompt);
    }

不過很多人一看返回 Flux 就懵逼了,什麼玩意?其實我也不知道🤔,我只聽過 webflux 響應式編程,至於什麼是 flux 什麼是響應式也是一頭霧水。

不過問過群友後,大家都說這玩意沒必要學,很少用。

直接上代碼

/**
     * spring ai 官方的流式對話接口 使用 webflux
     * @param message prompt
     * @return Flux<String>
     */
    @GetMapping(value = "chatStream/{message}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> chatSse(@PathVariable String message) {
        Prompt prompt = getPrompt(message);

        return chatClient.stream(prompt)
                .filter(chatResponse -> chatResponse.getResult().getOutput().getContent() != null)
                .flatMap(chatResponse -> Flux.just(chatResponse.getResult().getOutput().getContent()))
                .doOnNext(System.out::println)
                .doOnError(throwable -> System.err.println("err: " + throwable.getMessage()))
                .doOnComplete(() -> System.out.println("complete~!"));
    }

這裡可以看到調用的方法是 stream

首先,第一個 filter 表示過濾掉返回的 null,這是因為流式返回的最後一個字段是 null,表示結尾。flatMap 中有個 Flux.just() 表示把返回的內容放到 Flux 裡面。所以第一步要過濾掉 null,不然這一步會報錯的。

SSE#

其實最開始想到的解決方案就是 SSE (Server Sent Events – 服務端主動推送),說起主動推送第一個想到的就是它,原生的,不用加其他依賴。

代碼也很簡單

/**
     * 流式對話接口
     *
     * @param message prompt
     * @return SseEmitter
     */
    @GetMapping("stream")
    public SseEmitter streamCompletion(@RequestParam(value = "message", defaultValue = "Tell me a joke") String message) {
        SseEmitter emitter = new SseEmitter(5L * 60 * 1000);
        Flux<String> stream = chatClient.stream(message);
        stream.subscribe(it -> {
            try {
                System.out.println(it);
                emitter.send(it, MediaType.TEXT_EVENT_STREAM);
            } catch (IOException e) {
                System.out.println("sse發送消息失敗");
                emitter.completeWithError(e);
            }
        });

        stream.doOnError(e -> {
            System.out.println("流式對話發生異常");
            emitter.completeWithError(e);
        });

        stream.doOnComplete(emitter::complete);
        return emitter;
    }

結果如下

image

載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。