一种流式编程的代码
使用示例
ProcessTree<ThriftSpanList> processTree = new ProcessTree<>();
final Node<ThriftSpanList, Set<DependencyCell>> firstNode = processTree.setFirstNode(sceneProcessor);
final Node<Set<DependencyCell>, Void> sceneDependencyNode = firstNode.addNext(sceneDependencyProcessor);
processTree.handle(list);
具体代码
public class ProcessTree<INPUT> {
private Way<INPUT> entryWay;
public <OUTPUT> Node<INPUT, OUTPUT> setFirstNode(NodeProcessor<INPUT, OUTPUT> firstNodeProcessor) {
final Node<INPUT, OUTPUT> firstNode = new Node<>(firstNodeProcessor);
this.entryWay = new Way<>(firstNode);
return firstNode;
}
public void handle(INPUT input) {
if (entryWay != null) {
entryWay.consume(input);
}
}
}
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
public final class Node<INPUT, OUTPUT> {
NodeProcessor<INPUT, OUTPUT> nodeProcessor;
List<Way<OUTPUT>> ways = new CopyOnWriteArrayList<>();
public Node(NodeProcessor<INPUT, OUTPUT> nodeProcessor) {
this.nodeProcessor = nodeProcessor;
}
public void process(INPUT input) {
final OUTPUT output = nodeProcessor.doProcess(input);
for (Way<OUTPUT> way : ways) {
way.consume(output);
}
}
public <NEW_OUTPUT> Node<OUTPUT, NEW_OUTPUT> addNext(NodeProcessor<OUTPUT, NEW_OUTPUT> nextNodeProcessor) {
final Node<OUTPUT, NEW_OUTPUT> nextNode = new Node<>(nextNodeProcessor);
ways.add(new Way<>(nextNode));
return nextNode;
}
public <NEW_OUTPUT> Node<OUTPUT, NEW_OUTPUT> addAsyncNext(NodeProcessor<OUTPUT, NEW_OUTPUT> nextNodeProcessor,
int threadNum, int queueSize, String tag) {
final Node<OUTPUT, NEW_OUTPUT> nextNode = new Node<>(nextNodeProcessor);
ways.add(new AsyncWay<>(nextNode, threadNum, queueSize, tag));
return nextNode;
}
}
public interface NodeProcessor<INPUT, OUTPUT> {
OUTPUT doProcess(INPUT input);
}
public class Way<INPUT> {
private Node<INPUT, ?> destination;
public Node<INPUT, ?> getDestination() {
return destination;
}
public Way(Node<INPUT, ?> destination) {
this.destination = destination;
}
public void consume(INPUT input) {
if (input != null) {
destination.process(input);
}
}
}
import com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
public class AsyncWay<INPUT> extends Way<INPUT> {
private static final Logger LOGGER = LoggerFactory.getLogger(AsyncWay.class);
private ThreadPoolExecutor executorService;
public AsyncWay(Node<INPUT, ?> destination, int threadNum, int queueSize, String tag) {
super(destination);
executorService = ThreadPoolUtil.create(threadNum, threadNum, 1, queueSize, tag);
}
@Override
public void consume(INPUT input) {
if (input != null)
executorService.execute(() -> {
getDestination().process(input);
});
}
}
局限性
相比java的stream,只支持map(一转一),不支持flatMap(一转多)