序
本文主要研究下spring cloud gateway的streaming-media-types属性
配置
配置说明
{ "sourceType": "org.springframework.cloud.gateway.config.GatewayProperties", "name": "spring.cloud.gateway.streaming-media-types", "type": "java.util.List" }
GatewayProperties
spring-cloud-gateway-core-2.0.0.RC2-sources.jar!/org/springframework/cloud/gateway/config/GatewayProperties.java
@ConfigurationProperties("spring.cloud.gateway")@Validatedpublic class GatewayProperties { /** * List of Routes */ @NotNull @Valid private Listroutes = new ArrayList<>(); /** * List of filter definitions that are applied to every route. */ private List defaultFilters = new ArrayList<>(); private List streamingMediaTypes = Arrays.asList(MediaType.TEXT_EVENT_STREAM, MediaType.APPLICATION_STREAM_JSON); public List getRoutes() { return routes; } public void setRoutes(List routes) { this.routes = routes; } public List getDefaultFilters() { return defaultFilters; } public void setDefaultFilters(List defaultFilters) { this.defaultFilters = defaultFilters; } public List getStreamingMediaTypes() { return streamingMediaTypes; } public void setStreamingMediaTypes(List streamingMediaTypes) { this.streamingMediaTypes = streamingMediaTypes; } @Override public String toString() { return "GatewayProperties{" + "routes=" + routes + ", defaultFilters=" + defaultFilters + ", streamingMediaTypes=" + streamingMediaTypes + '}'; }}
可以看到默认是MediaType.TEXT_EVENT_STREAM(
text/event-stream
)、MediaType.APPLICATION_STREAM_JSON(application/stream+json
)
使用
GatewayAutoConfiguration
spring-cloud-gateway-core-2.0.0.RC2-sources.jar!/org/springframework/cloud/gateway/config/GatewayAutoConfiguration.java
@Configuration @ConditionalOnClass(HttpClient.class) protected static class NettyConfiguration { @Bean @ConditionalOnMissingBean public HttpClient httpClient(@Qualifier("nettyClientOptions") Consumer options) { return HttpClient.create(options); } //...... @Bean public HttpClientProperties httpClientProperties() { return new HttpClientProperties(); } @Bean public NettyRoutingFilter routingFilter(HttpClient httpClient, ObjectProvider
> headersFilters) { return new NettyRoutingFilter(httpClient, headersFilters); } @Bean public NettyWriteResponseFilter nettyWriteResponseFilter(GatewayProperties properties) { return new NettyWriteResponseFilter(properties.getStreamingMediaTypes()); } @Bean public ReactorNettyWebSocketClient reactorNettyWebSocketClient(@Qualifier("nettyClientOptions") Consumer options) { return new ReactorNettyWebSocketClient(options); } }
这里的NettyWriteResponseFilter使用到了properties.getStreamingMediaTypes()
NettyWriteResponseFilter
spring-cloud-gateway-core-2.0.0.RC2-sources.jar!/org/springframework/cloud/gateway/filter/NettyWriteResponseFilter.javac
public class NettyWriteResponseFilter implements GlobalFilter, Ordered { private static final Log log = LogFactory.getLog(NettyWriteResponseFilter.class); public static final int WRITE_RESPONSE_FILTER_ORDER = -1; private final ListstreamingMediaTypes; public NettyWriteResponseFilter(List streamingMediaTypes) { this.streamingMediaTypes = streamingMediaTypes; } @Override public int getOrder() { return WRITE_RESPONSE_FILTER_ORDER; } @Override public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) { // NOTICE: nothing in "pre" filter stage as CLIENT_RESPONSE_ATTR is not added // until the WebHandler is run return chain.filter(exchange).then(Mono.defer(() -> { HttpClientResponse clientResponse = exchange.getAttribute(CLIENT_RESPONSE_ATTR); if (clientResponse == null) { return Mono.empty(); } log.trace("NettyWriteResponseFilter start"); ServerHttpResponse response = exchange.getResponse(); NettyDataBufferFactory factory = (NettyDataBufferFactory) response.bufferFactory(); //TODO: what if it's not netty final Flux body = clientResponse.receive() .retain() //TODO: needed? .map(factory::wrap); MediaType contentType = response.getHeaders().getContentType(); return (isStreamingMediaType(contentType) ? response.writeAndFlushWith(body.map(Flux::just)) : response.writeWith(body)); })); } //TODO: use framework if possible //TODO: port to WebClientWriteResponseFilter private boolean isStreamingMediaType(@Nullable MediaType contentType) { return (contentType != null && this.streamingMediaTypes.stream() .anyMatch(contentType::isCompatibleWith)); }}
可以看到这里根据isStreamingMediaType方法判断是否是stream类型,如果是则采用writeAndFlushWith方法,不是则采用writeWith方法
ReactiveHttpOutputMessage
spring-web-5.0.6.RELEASE-sources.jar!/org/springframework/http/ReactiveHttpOutputMessage.java
/** * A "reactive" HTTP output message that accepts output as a {@link Publisher}. * *Typically implemented by an HTTP request on the client-side or an * HTTP response on the server-side. * * @author Arjen Poutsma * @author Sebastien Deleuze * @since 5.0 */public interface ReactiveHttpOutputMessage extends HttpMessage { /** * Return a {@link DataBufferFactory} that can be used to create the body. * @return a buffer factory * @see #writeWith(Publisher) */ DataBufferFactory bufferFactory(); /** * Register an action to apply just before the HttpOutputMessage is committed. *
Note: the supplied action must be properly deferred, * e.g. via {@link Mono#defer} or {@link Mono#fromRunnable}, to ensure it's * executed in the right order, relative to other actions. * @param action the action to apply */ void beforeCommit(Supplier
> action); /** * Whether the HttpOutputMessage is committed. */ boolean isCommitted(); /** * Use the given {@link Publisher} to write the body of the message to the * underlying HTTP layer. * @param body the body content publisher * @return a {@link Mono} that indicates completion or error */ MonowriteWith(Publisher body); /** * Use the given {@link Publisher} of {@code Publishers} to write the body * of the HttpOutputMessage to the underlying HTTP layer, flushing after * each {@code Publisher }. * @param body the body content publisher * @return a {@link Mono} that indicates completion or error */ Mono writeAndFlushWith(Publisher > body); /** * Indicate that message handling is complete, allowing for any cleanup or * end-of-processing tasks to be performed such as applying header changes * made via {@link #getHeaders()} to the underlying HTTP message (if not * applied already). * This method should be automatically invoked at the end of message * processing so typically applications should not have to invoke it. * If invoked multiple times it should have no side effects. * @return a {@link Mono} that indicates completion or error */ Mono
setComplete();}
从接口的注释可以看到,writeWith与writeAndFlushWith的参数泛型不同,一个是Publisher<? extends DataBuffer>,一个是Publisher<? extends Publisher<? extends DataBuffer>>。而writeAndFlushWith则是在每个Publisher<DataBuffer>写入之后就flush。
小结
NettyWriteResponseFilter根据spring.cloud.gateway.streaming-media-types配置的类型来判断是writeAndFlushWith还是writeWith,如果是指定类型则选择用writeAndFlushWith写入response。默认该配置指定了MediaType.TEXT_EVENT_STREAM(text/event-stream
)、MediaType.APPLICATION_STREAM_JSON(application/stream+json
)这两种类型。