博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊spring cloud gateway的streaming-media-types属性
阅读量:7097 次
发布时间:2019-06-28

本文共 7486 字,大约阅读时间需要 24 分钟。

  hot3.png

本文主要研究下spring cloud gateway的streaming-media-types属性

配置

配置说明

{      "sourceType": "org.springframework.cloud.gateway.config.GatewayProperties",      "name": "spring.cloud.gateway.streaming-media-types",      "type": "java.util.List
" }

GatewayProperties

spring-cloud-gateway-core-2.0.0.RC2-sources.jar!/org/springframework/cloud/gateway/config/GatewayProperties.java

@ConfigurationProperties("spring.cloud.gateway")@Validatedpublic class GatewayProperties {	/**	 * List of Routes	 */	@NotNull	@Valid	private List
routes = new ArrayList<>(); /** * List of filter definitions that are applied to every route. */ private List
defaultFilters = new ArrayList<>(); private List
streamingMediaTypes = Arrays.asList(MediaType.TEXT_EVENT_STREAM, MediaType.APPLICATION_STREAM_JSON); public List
getRoutes() { return routes; } public void setRoutes(List
routes) { this.routes = routes; } public List
getDefaultFilters() { return defaultFilters; } public void setDefaultFilters(List
defaultFilters) { this.defaultFilters = defaultFilters; } public List
getStreamingMediaTypes() { return streamingMediaTypes; } public void setStreamingMediaTypes(List
streamingMediaTypes) { this.streamingMediaTypes = streamingMediaTypes; } @Override public String toString() { return "GatewayProperties{" + "routes=" + routes + ", defaultFilters=" + defaultFilters + ", streamingMediaTypes=" + streamingMediaTypes + '}'; }}

可以看到默认是MediaType.TEXT_EVENT_STREAM(text/event-stream)、MediaType.APPLICATION_STREAM_JSON(application/stream+json)

使用

GatewayAutoConfiguration

spring-cloud-gateway-core-2.0.0.RC2-sources.jar!/org/springframework/cloud/gateway/config/GatewayAutoConfiguration.java

@Configuration	@ConditionalOnClass(HttpClient.class)	protected static class NettyConfiguration {		@Bean		@ConditionalOnMissingBean		public HttpClient httpClient(@Qualifier("nettyClientOptions") Consumer
options) { return HttpClient.create(options); } //...... @Bean public HttpClientProperties httpClientProperties() { return new HttpClientProperties(); } @Bean public NettyRoutingFilter routingFilter(HttpClient httpClient, ObjectProvider
> headersFilters) { return new NettyRoutingFilter(httpClient, headersFilters); } @Bean public NettyWriteResponseFilter nettyWriteResponseFilter(GatewayProperties properties) { return new NettyWriteResponseFilter(properties.getStreamingMediaTypes()); } @Bean public ReactorNettyWebSocketClient reactorNettyWebSocketClient(@Qualifier("nettyClientOptions") Consumer
options) { return new ReactorNettyWebSocketClient(options); } }

这里的NettyWriteResponseFilter使用到了properties.getStreamingMediaTypes()

NettyWriteResponseFilter

spring-cloud-gateway-core-2.0.0.RC2-sources.jar!/org/springframework/cloud/gateway/filter/NettyWriteResponseFilter.javac

public class NettyWriteResponseFilter implements GlobalFilter, Ordered {	private static final Log log = LogFactory.getLog(NettyWriteResponseFilter.class);	public static final int WRITE_RESPONSE_FILTER_ORDER = -1;	private final List
streamingMediaTypes; public NettyWriteResponseFilter(List
streamingMediaTypes) { this.streamingMediaTypes = streamingMediaTypes; } @Override public int getOrder() { return WRITE_RESPONSE_FILTER_ORDER; } @Override public Mono
filter(ServerWebExchange exchange, GatewayFilterChain chain) { // NOTICE: nothing in "pre" filter stage as CLIENT_RESPONSE_ATTR is not added // until the WebHandler is run return chain.filter(exchange).then(Mono.defer(() -> { HttpClientResponse clientResponse = exchange.getAttribute(CLIENT_RESPONSE_ATTR); if (clientResponse == null) { return Mono.empty(); } log.trace("NettyWriteResponseFilter start"); ServerHttpResponse response = exchange.getResponse(); NettyDataBufferFactory factory = (NettyDataBufferFactory) response.bufferFactory(); //TODO: what if it's not netty final Flux
body = clientResponse.receive() .retain() //TODO: needed? .map(factory::wrap); MediaType contentType = response.getHeaders().getContentType(); return (isStreamingMediaType(contentType) ? response.writeAndFlushWith(body.map(Flux::just)) : response.writeWith(body)); })); } //TODO: use framework if possible //TODO: port to WebClientWriteResponseFilter private boolean isStreamingMediaType(@Nullable MediaType contentType) { return (contentType != null && this.streamingMediaTypes.stream() .anyMatch(contentType::isCompatibleWith)); }}

可以看到这里根据isStreamingMediaType方法判断是否是stream类型,如果是则采用writeAndFlushWith方法,不是则采用writeWith方法

ReactiveHttpOutputMessage

spring-web-5.0.6.RELEASE-sources.jar!/org/springframework/http/ReactiveHttpOutputMessage.java

/** * A "reactive" HTTP output message that accepts output as a {@link Publisher}. * * 

Typically implemented by an HTTP request on the client-side or an * HTTP response on the server-side. * * @author Arjen Poutsma * @author Sebastien Deleuze * @since 5.0 */public interface ReactiveHttpOutputMessage extends HttpMessage { /** * Return a {@link DataBufferFactory} that can be used to create the body. * @return a buffer factory * @see #writeWith(Publisher) */ DataBufferFactory bufferFactory(); /** * Register an action to apply just before the HttpOutputMessage is committed. *

Note: the supplied action must be properly deferred, * e.g. via {@link Mono#defer} or {@link Mono#fromRunnable}, to ensure it's * executed in the right order, relative to other actions. * @param action the action to apply */ void beforeCommit(Supplier

> action); /** * Whether the HttpOutputMessage is committed. */ boolean isCommitted(); /** * Use the given {@link Publisher} to write the body of the message to the * underlying HTTP layer. * @param body the body content publisher * @return a {@link Mono} that indicates completion or error */ Mono
writeWith(Publisher
body); /** * Use the given {@link Publisher} of {@code Publishers} to write the body * of the HttpOutputMessage to the underlying HTTP layer, flushing after * each {@code Publisher
}. * @param body the body content publisher * @return a {@link Mono} that indicates completion or error */ Mono
writeAndFlushWith(Publisher
> body); /** * Indicate that message handling is complete, allowing for any cleanup or * end-of-processing tasks to be performed such as applying header changes * made via {@link #getHeaders()} to the underlying HTTP message (if not * applied already). *

This method should be automatically invoked at the end of message * processing so typically applications should not have to invoke it. * If invoked multiple times it should have no side effects. * @return a {@link Mono} that indicates completion or error */ Mono

setComplete();}

从接口的注释可以看到,writeWith与writeAndFlushWith的参数泛型不同,一个是Publisher<? extends DataBuffer>,一个是Publisher<? extends Publisher<? extends DataBuffer>>。而writeAndFlushWith则是在每个Publisher<DataBuffer>写入之后就flush。

小结

NettyWriteResponseFilter根据spring.cloud.gateway.streaming-media-types配置的类型来判断是writeAndFlushWith还是writeWith,如果是指定类型则选择用writeAndFlushWith写入response。默认该配置指定了MediaType.TEXT_EVENT_STREAM(text/event-stream)、MediaType.APPLICATION_STREAM_JSON(application/stream+json)这两种类型。

doc

转载于:https://my.oschina.net/go4it/blog/1826235

你可能感兴趣的文章
nginx 配置白名单
查看>>
iOS开发助手、ipa上传工具、苹果APP快速上架辅助工具Appuploader
查看>>
Rstudio编辑界面美化设置
查看>>
重写对象ToString方法
查看>>
备忘: C++中的 vector 容器
查看>>
smt中查看图片与视频缩略图中,如何获得小视频的长度。
查看>>
图片(img标签)的onerror事件
查看>>
2013应届毕业生“百度”校招应聘总结
查看>>
CentOS系统启动流程
查看>>
SQLite数据库_实现简单的增删改查
查看>>
批量查询 xml的方式 还一种是表变量
查看>>
Java7/8 中 HashMap 和 ConcurrentHashMap的对比和分析
查看>>
Java 实现多线程切换等待唤醒交替打印奇偶数
查看>>
1077. Kuchiguse (20)
查看>>
HotSpot模板解释器目标代码生成过程源码分析
查看>>
c99标准 数据类型
查看>>
一维最大子数组和(续)
查看>>
Hillstone基础上网配置
查看>>
几种session存储方式比较
查看>>
习惯的颠覆
查看>>