第五届中间件性能挑战赛

今年参与了阿里巴巴举办的 中间件性能挑战赛,并且有幸进入到了复赛,虽然后期由于其他原因没法专心参与,不过通过这次比赛也让我在技术上收获了不少,这边小小的做下本次比赛的总结。

初赛:《自适应负载均衡的设计实现》

赛题分析

初赛的题目是在 Dubbo 之上进行的插件开发,对参与者对于 Dubbo 的调用过程有一定的要求。官方在 gateway 端提供了 CallbackListenerImpl、TestClientFilter、UserLoadBalance 供开发。在 provider 端提供了 CallbackServiceImpl、TestRequestLimiter、TestServerFilter 供开发。

将这些组件结合起来就形成了基本的调用链路。

adaptive-loadbalance

目标

  1. provider 在接收请求大于处理线程池大小的时候,将会执行拒绝策略 AbortPolicyWithReport,并将线程栈信息以日志文件输出。这无疑增加了服务端的压力,那么就需要让 provider 在 TestRequestLimiter 进行限流,在剩余线程数量不足时拒绝进来的请求。

  2. 要完成自适应的负载均衡,那么就需要由 provider 端将权重推送至 gateway,这项工作通过 CallbackServiceImpl 和 CallbackListenerImpl 来完成。但是传输的数据为字符串,不包含 provider 标识,这时就需要能够获取 provider 的服务地址,连带指标推送给 gateway。

  3. gateway 接收到请求时选择性能最优 provider,当所有 provider 压力均满时直接拒绝请求。

设计与实现

provider 拒绝服务

通过分析 AllChannelHandler 的源码发现,provider 在通过 Filter 后才会使用线程池处理请求,为了不让服务进入拒绝策略,就需要在线程池不足时利用 Filter 拒绝他。官方是允许使用 SPI 的,那么可根据 WrappedChannelHandler 的代码,从 DataStore 里获取所使用的线程池,提取出线程池最大线程数,在请求前后进行计数,拦截超出能力的请求。

public class WrappedChannelHandler implements ChannelHandlerDelegate {

    public WrappedChannelHandler(ChannelHandler handler, URL url) {
        this.handler = handler;
        this.url = url;
        executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class)
        		.getAdaptiveExtension().getExecutor(url);

        String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
        if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {
            componentKey = Constants.CONSUMER_SIDE;
        }
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
    }

...
}

经过了这一层拦截,分数就已经明显上去了,但是由于 provider 的能力各不相同,还需要通过计算权重进行请求分发。

数据通信

当消费方与多个服务方都需要进行通信时,能够唯一确定数据的来源是关键,还是通过读源码,我发现能够从服务 URL 中获取到主机地址和端口号。在 provider 端,自定义暴露监听器保存系统的地址。在 gateway 端,自定义引用监听器将 URL 与地址进行一一映射。

// gateway

public class InvokerMapperListener implements InvokerListener {

private static final Map<URL, String> addressMapper = new ConcurrentHashMap<>();

    @Override
    public void referred(Invoker<?> invoker) throws RpcException {
        URL url = invoker.getUrl();
        String address = getAddress(url);

	addressMapper.put(url, address);
    }
}

// provider

public class ThreadPoolExporterListener implements ExporterListener {

    @Override
    public void exported(Exporter<?> exporter) throws RpcException {
	if (Context.ADDRESS != null) {
		return;
	}

	URL url = exporter.getInvoker().getUrl();
        Context.ADDRESS = getAddress(url);
    }
}

public static String getAddress(URL url) {
    String ip = url.getParameter(Constants.BIND_IP_KEY);
    String port = url.getParameter(Constants.BIND_PORT_KEY);
    return ip + ":" + port;
}

通信功能算是完成了,这时可以直接使用前面 provider 获取到的线程池大小作为权重,但是由于服务能力是动态变化的,为了达到更好的效果还需要进一步改进。

动态负载

根据评测代码可知,消费服务在程序内部使用了 Semaphore 对请求进行了拦截,实际可容纳的请求小于线程数,并且并发数随着时间也在发生变化。

如果单位时间内接收的请求数大于实际并发数时,则多余的请求将会被置为等待状态。当请求数持续大于并发数时,会导致请求堆积造成大量超时。因此在 Dubbo 中有个超时检测机制 org.apache.dubbo.remoting.exchange.support.DefaultFuture##TIME_OUT_TIMER,这是一个用于中止任务的时间轮,当一个请求进入执行阶段时,它将会被放入时间轮,在一定时间后结束请求返回错误信息。

因此需使得请求在时间轮触发之前执行,这样才能保证不会造成异常响应。在这里我还是使用到了前面获取的线程池,由于线程在线程池中空闲的状态为 WAITINGTIMED_WAITING,在执行过程中工作线程的状态可能为 RUNNABLEBLOCKEDWAITINGTIMED_WAITING,空闲线程与工作线程存在状态重叠,因而无法将 getActiveCount 用作并发数。

thread_state

因此在执行阶段 Filter 前后对当前线程进行标记,并放入一个 placeHolder 中保存,在退出 Filter 时移除标记。由此一来 placeHolder 中就只存在工作中的线程,在除去 BLOCKEDWAITING 状态的线程即为并发数。

为了避免并发冲突影响效率,采用无锁编程,预先创建好数组,利用线程id作为下标指定标记元素位置。

在得到实际并发数后,再通过与 cpu核心数、cpu频率 进行计算得出权重值,定时的发送给 gateway 用于选择最优 provider。

gateway 将收到并发数作为负载均衡还远远不够,如果一个服务由于机器资源不足、网络故障等原因也将会导致请求异常,因此请求耗时也是一个考量负载的指标。由于网络消耗的存在,则在 gateway 端做请求耗时的收集,将并发数与耗时进行结合完成负载均衡策略。

adaptive-loadbalance-architecture

复赛:《实现一个进程内基于队列的消息持久化存储引擎》

赛题分析

赛题要求实现一个进程内消息持久化的存储引擎,提供了 4g 的堆内存,2g 的堆外内存,300g 的SSD磁盘。过程分别为发送、查询聚合消息、查询聚合结果,串行执行三个步骤,每个步骤需要在 30 分钟内完成,以每个阶段的 发送数\时间 总和为成绩。

设计

  1. 由于查询阶段需要对 时间戳 和 数值 进行条件过滤,考虑以 时间戳 和 数值 创建索引键对数据分散进行存储,以数据块为存储单位。

  2. 每个线程所发送消息的时间戳是递增的,对查询过滤天然适合,则为每个线程映射一个存储组件实例。

  3. 官方提供使用的存储介质为 SSD磁盘,大量数据写入的效率好,所以将数个数据块为一组单位,缓存于内存中,在写入数据达到下一组单位时进行文件存储。

  4. 在查询阶段为了更好的利用资源,使用 lru缓存 保存查询数据。

data_write

发送阶段完成后磁盘文件组织如下,以时间戳定位文件,再以数值选择文件内的数据块(图中以 8192 为时间戳分片大小,4096 为数值为片大小,均为 2 的幂次方,以位运算即可完成路由)。

file_store

如查询条件为 { t: [8000, 22000], a: [5000, 20000] },则选择文件 hash-0、hash-1、hash-2 为目标文件,分别对文件内的 index-1、index-2、index-3、index-4 数据块进行检索,其中 hash-1 index-2hash-1 index-3 为中间数据,无需对数据进行条件过滤。

data_read

实现

其实复赛的设计思路并不难,主要的难点是分片大小的选择,如何更好的利用内存不发生 OOM,以及利用多线程提升效率。

在写入阶段,使用内存进行数据的缓存,利用时间戳的有序性检测是否进入下个文件数据集,由于需要对全量数据都进行哈希,采用了位移操作提升效率。

在读取阶段,对条件使用哈希确定目标数据的分布范围,充分利用堆内存与堆外内存,将分配的 ByteBuffer 进行缓存,提高下次查询时的效率。同理,对于无需条件过滤的结果也可以通过缓存来提高效率。

增加了缓存就需要考虑内存容量问题,为了避免 OOM 需要引入淘汰规则。直接使用 SoftReference 容易频繁产生 CMS GC,影响查询效率,而使用 WeakReference 缓存数据则具有不确定性,数据很可能被快速回收,失去了缓存的意义。

经过分析后选择了LRU算法,在固定大小集合里缓存数据,对长久未使用的数据进行清理。传统的 LRU Cache 可用继承 LinkedHashMap 的方式实现,然而这个类是非线程安全的,在查询场景可能产生并发问题,因此重新实现了一个线程安全的 LRU Cache。

public class ConcurrentLRUCache<K, V> {

    private final Queue<K> keySet;
    private final Map<K, T> cache;

    private final AtomicInteger acquire;
    private final int outSize;

    private volatile boolean full = false;

    public ConcurrentLRUCache(int size) {
        this.keySet = new ConcurrentLinkedQueue<>();
        this.cache = new ConcurrentHashMap<>(size, 1f);
        this.acquire = new AtomicInteger(size);
        this.outSize = size + 1;
    }

    public void put(K key, T value) {
        if (full) {
            K firstKey = keySet.poll();
            cache.remove(firstKey);
        } else {
            int nextSize = acquire.incrementAndGet();

            if (nextSize > outSize) {
                Thread.yield();
                put(key, value);
                return;
            }

            if (nextSize == outSize) {
                full = true;
            }
        }
        cache.put(key, value);
        keySet.offer(key);
    }

    public T get(K key) {
        T value = cache.get(key);
        if (value == null) {
            return null;
        }
        
        if (keySet.remove(key)) {
            keySet.offer(key);
        }
        return value;
    }
}

除了添加缓存外,还有其他优化手段,比如 多线程并发处理数据块、对分块数据进行压缩、数据与消息体分开存储、利用时间戳的顺序中断查询。

总结

在这次比赛中,主要收获了在系统设计上的锻炼,分析问题也更加全面了,以及在 无锁编程NIO处理编写高性能代码 方面的进步。如通过 async-profiler 发现乐观锁在大量请求下的性能问题,并转变思路使用无锁编程。了解到 MappedByteBuffer 在大量写入的情况下会频繁发生 pageCache 的 flush,可以先准备好 ByteBuffer 保存数据,之后再将整个 ByteBuffer 写入到 MappedByteBuffer 中。