博客地址:https://www.yuque.com/autunomy/emwi09
第一步肯定是引入依赖
com.squareup.okhttp3
okhttp
4.9.3
com.alibaba
fastjson
1.2.79
第二步是配置okHttp
import okhttp3.ConnectionPool; import okhttp3.OkHttpClient; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.net.ssl.*; import java.security.*; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; import java.util.concurrent.TimeUnit; @Configuration public class OkHttpConfiguration { @Value("${ok.http.connect-timeout}") private Integer connectTimeout; @Value("${ok.http.read-timeout}") private Integer readTimeout; @Value("${ok.http.write-timeout}") private Integer writeTimeout; @Value("${ok.http.max-idle-connections}") private Integer maxIdleConnections; @Value("${ok.http.keep-alive-duration}") private Long keepAliveDuration; @Bean public OkHttpClient okHttpClient() @Bean public X509TrustManager x509TrustManager() @Override public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException { } @Override public X509Certificate[] getAcceptedIssuers() }; } @Bean public SSLSocketFactory sslSocketFactory() , new SecureRandom()); return sslContext.getSocketFactory(); } catch (NoSuchAlgorithmException | KeyManagementException e) { e.printStackTrace(); } return null; } @Bean public ConnectionPool pool() { return new ConnectionPool(maxIdleConnections, keepAliveDuration, TimeUnit.SECONDS); } }
同时需要在application.properties配置文件中进行配置
# okhttp3配置 ok.http.connect-timeout=30 ok.http.read-timeout=30 ok.http.write-timeout=30 # 连接池中整体的空闲连接的最大数量 ok.http.max-idle-connections=200 # 连接空闲时间最多为 300 秒 ok.http.keep-alive-duration=300
第三步是配置一些全局变量
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import okhttp3.*; import okhttp3.MediaType; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import java.io.IOException; import java.util.Date; / * @author hty * @date 2023-11-18 9:42 * @email @.com * @description */ @Configuration @Slf4j public class WenXinConfig { @Value("${wenxin.apiKey}") public String API_KEY; @Value("${wenxin.secretKey}") public String SECRET_KEY; @Value("${wenxin.accessTokenUrl}") public String ACCESS_TOKEN_URL; @Value("${wenxin.ERNIE-Bot4.0URL}") public String ERNIE_Bot_4_0_URL; //过期时间为30天 public String ACCESS_TOKEN = null; public String REFRESH_TOKEN = null; public Date CREATE_TIME = null;//accessToken创建时间 public Date EXPIRATION_TIME = null;//accessToken到期时间 / * 获取accessToken * @return true表示成功 false表示失败 */ public synchronized String flushAccessToken() catch (IOException e) { log.error("ACCESS_TOKEN获取失败"); return null; } //刷新令牌以及更新令牌创建时间和过期时间 JSONObject jsonObject = JSON.parseObject(response); ACCESS_TOKEN = jsonObject.getString("access_token"); REFRESH_TOKEN = jsonObject.getString("refresh_token"); CREATE_TIME = new Date(); EXPIRATION_TIME = new Date(Long.parseLong(jsonObject.getString("expires_in")) + CREATE_TIME.getTime()); return ACCESS_TOKEN; } }
可以看到代码中有@Value注解,表示需要注入属性,所以继续在application.properties配置文件中进行配置
wenxin.apiKey=你的apiKey wenxin.secretKey=你的secretKey #获取AccessToken的url地址 wenxin.accessTokenUrl=https://aip.baidubce.com/oauth/2.0/token #文心ERNIE-Bot4.0模型访问地址 wenxin.ERNIE-Bot4.0URL=https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/completions_pro
至此配置完成,接下来就是编写代码来进行访问,我使用的是springboot框架,所以就在controller中编写一个方法来实现。
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.hty.config.WenXinConfig; import lombok.extern.slf4j.Slf4j; import okhttp3.*; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @RestController @Slf4j public class TestController String responseJson = null; //先获取令牌然后才能访问api if (wenXinConfig.flushAccessToken() != null) catch (IOException e) { log.error("网络有问题"); return "网络有问题,请稍后重试"; } } return responseJson; } / * 构造请求的请求参数 * @param userId * @param temperature * @param topP * @param penaltyScore * @param messages * @return */ public String constructRequestJson(Integer userId, Double temperature, Double topP, Double penaltyScore, boolean stream, List
使用PostMan调用接口即可获取到回复
流式回答就是在原来非流式回答的基础上将stream这个参数从false改为true即可,但是还要注意的一个事情就是响应中的内容是会有一点变化的,会在原来响应的内容的前面加上data这个字符,需要在转jsonObject的时候注意
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.hty.config.WenXinConfig; import lombok.extern.slf4j.Slf4j; import okhttp3.*; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @RestController @Slf4j public class TestController } } } else { System.out.println("Unexpected code " + response); } } catch (IOException e) { log.error("流式请求出错"); throw new RuntimeException(e); } //将回复的内容添加到消息中 HashMap
assistant = new HashMap<>(); assistant.put("role","assistant"); assistant.put("content",""); //取出我们需要的内容,也就是result部分 String[] answerArray = answer.toString().split("data: "); for (int i=1;i
> messages) { Map
request = new HashMap<>(); request.put("user_id",userId.toString()); request.put("temperature",temperature); request.put("top_p",topP); request.put("penalty_score",penaltyScore); request.put("stream",stream); request.put("messages",messages); System.out.println(JSON.toJSONString(request)); return JSON.toJSONString(request); } }
这个地方分两种方式,一种是使用SSE(Server-Sent Events),一种是使用websocket
引入依赖
com.squareup.okhttp3
okhttp
4.9.3
com.squareup.okhttp3
okhttp-sse
4.9.3
配置跨域
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.servlet.config.annotation.CorsRegistry; import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; @Configuration public class CorsConfig { @Bean public WebMvcConfigurer corsConfigurer() { return new WebMvcConfigurer() { @Override public void addCorsMappings(CorsRegistry registry) { registry.addMapping("/") .allowedOrigins("*");//允许域名访问,如果*,代表所有域名 } }; } }
前端代码
SSE Example
后端代码部分的写法和WebSocket非常的像,都是向创建连接,之后再发送消息
/ * 这个用来保存用户与服务器之间的连接信息 */ private static final Map
sseEmitterMap = new ConcurrentHashMap<>(); //创建连接 @GetMapping(value = "/sse/connect", produces="text/event-stream;charset=UTF-8") public SseEmitter sseConnect(Long clientId) try { // 设置超时时间,0表示不过期。默认30秒 SseEmitter sseEmitter = new SseEmitter(30 * 1000L); // 注册回调 sseEmitter.onCompletion(completionCallBack(clientId)); sseEmitter.onTimeout(timeoutCallBack(clientId)); sseEmitterMap.put(clientId, sseEmitter); log.info("创建sse连接完成,当前客户端:{}", clientId); return sseEmitter; } catch (Exception e) { log.info("创建sse连接异常,当前客户端:{}", clientId); } return null; } //发送消息,这里采用异步的方式来进行发送 / * 用来异步发送消息 */ private final ExecutorService executorService = Executors.newCachedThreadPool(); / * SSE方式向前端发送消息 * @param clientId * @param question */ @PostMapping(value = "/sse/chat") public void streamOutputToPage(Long clientId,String question) OkHttpClient client = new OkHttpClient(); String requestJson = wenxinUtils.constructRequestJson(1,0.95,1.0,true,messages); RequestBody body = RequestBody.create(MediaType.parse("application/json"), requestJson); Request request = new Request.Builder() .url(wenXinConfig.ERNIE_Bot_4_0_URL + "?access_token=" + wenXinConfig.flushAccessToken()) .method("POST", body) .addHeader("Content-Type", "application/json") .build(); //将回复的内容添加到消息中 HashMap
assistant = new HashMap<>(); assistant.put("role","assistant"); assistant.put("content",""); // 发起异步请求 try } if(!result.equals("")) } messages.add(assistant); } } } else { System.out.println("Unexpected code " + response); } } catch (IOException e) { log.error("流式请求出错,断开与{}的连接",clientId); //移除当前的连接 sseEmitterMap.remove(clientId); //移除本次对话的内容 messages.remove(user); } }); }
利用到的是SSE中的SseEmitter对象来向前端发送消息,其实流式只是一个称呼而已,最终的实际情况也不过是将消息拆分开来进行发送,这样做的好处就是让用户使用过程中更加的舒服,防止用户没有目的的等待。
我们通过对返回数据的观察发现,所有的data之间都使用两个换行符来进行分割的,那我们就可以利用 来进行消息的分割'
让缓冲区尽量大,保证每次都能将一条data数据读取完毕,这样就能保证 一定是两条消息之间的分隔符,目前也只有这一种方法可行,根据我的测试来看,只要设置1024个字节以上的缓冲区,基本不会存在一次性读取不完数据的情况
由于我们缓冲区设置的足够大,所以我们甚至不需要再对消息进行分割,我们每次缓冲区读取到的数据就一定是独立的一条消息
SSE协议默认是以两个 换行符为结束标志,所以在数据中如果有 存在的话就会导致后面的数据被截断,无法收到,解决办法就是再对 进行一次转义即可
待补充
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/277981.html