qinfengge

qinfengge

醉后不知天在水,满船清梦压星河
github

spring AI (2) Stream Output

In the previous section, a simple call output was implemented using the call method, which waits for the complete return of the result, so it takes a little longer.

DEBUG can also see that the results are returned together.

image

But usually when we use AI dialogue, the result is displayed word by word or paragraph by paragraph, which is called streaming output.

flux#

If you carefully read the official documentation, you will find that there is actually code for streaming output.

image

@GetMapping("/ai/generateStream")
	public Flux<ChatResponse> generateStream(@RequestParam(value = "message", defaultValue = "Tell me a joke") String message) {
        Prompt prompt = new Prompt(new UserMessage(message));
        return chatClient.stream(prompt);
    }

But many people are confused when they see the return type Flux, what is this? Actually, I don't know either 🤔. I have only heard of webflux reactive programming, and I am also confused about what Flux and reactive programming are.

But after asking my friends in the group, they all said that there is no need to learn this thing, it is rarely used.

Let's directly look at the code

/**
     * The official streaming dialogue interface of spring ai uses webflux
     * @param message prompt
     * @return Flux<String>
     */
    @GetMapping(value = "chatStream/{message}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> chatSse(@PathVariable String message) {
        Prompt prompt = getPrompt(message);

        return chatClient.stream(prompt)
                .filter(chatResponse -> chatResponse.getResult().getOutput().getContent() != null)
                .flatMap(chatResponse -> Flux.just(chatResponse.getResult().getOutput().getContent()))
                .doOnNext(System.out::println)
                .doOnError(throwable -> System.err.println("err: " + throwable.getMessage()))
                .doOnComplete(() -> System.out.println("complete~!"));
    }

Here, you can see that the method called is stream.

First, the first filter filters out the null returned, because the last field returned by the streaming is null, indicating the end. In flatMap, there is a Flux.just() which puts the returned content into Flux. So the first step is to filter out null, otherwise this step will throw an error.

SSE#

In fact, the first solution that came to mind was SSE (Server Sent Events - server actively pushes), which is the first thing that comes to mind when it comes to active pushing, it is native and does not require additional dependencies.

The code is also very simple

/**
     * Streaming dialogue interface
     *
     * @param message prompt
     * @return SseEmitter
     */
    @GetMapping("stream")
    public SseEmitter streamCompletion(@RequestParam(value = "message", defaultValue = "Tell me a joke") String message) {
        SseEmitter emitter = new SseEmitter(5L * 60 * 1000);
        Flux<String> stream = chatClient.stream(message);
        stream.subscribe(it -> {
            try {
                System.out.println(it);
                emitter.send(it, MediaType.TEXT_EVENT_STREAM);
            } catch (IOException e) {
                System.out.println("sse failed to send message");
                emitter.completeWithError(e);
            }
        });

        stream.doOnError(e -> {
            System.out.println("An exception occurred in the streaming dialogue");
            emitter.completeWithError(e);
        });

        stream.doOnComplete(emitter::complete);
        return emitter;
    }

The result is as follows

image

Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.