|
一、自定义注解,用于标记观察者模式
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RegisterBus {
}
二、定义一个RxBus的类
public class RxBus {
public static final String TAG = "RxBus";
private static volatile RxBus instance;//使用volatile关键字声明的变量或对象通常拥有和优化和(或)多线程相关的特殊属性
//订阅者集合
private Set<Object> subscribers;
/**
* 注册DataBusSubscriber
*/
public synchronized void register(Object subscriber) {
subscribers.add(subscriber);
}
/**
* 注销DataBusSubscriber
*/
public synchronized void unregister(Object subscriber) {
subscribers.remove(subscriber);
}
/**
* 单例模式
*/
private RxBus() {
//因为通常需要复制整个基础数组,所以可变操作(add()、set() 和 remove() 等等)的开销很大
subscribers = new CopyOnWriteArraySet<>();
}
public static synchronized RxBus getInstance() {
if (instance == null) {
synchronized (RxBus.class) {
if (instance == null) {
instance = new RxBus();
}
}
}
return instance;
}
/**
* 包装处理过程
*/
public void chainProcess(Func1 func) {
Observable.just("")
.subscribeOn(Schedulers.io())//指定处理过程在IO线程
.map(func)//包装处理过程
.observeOn(AndroidSchedulers.mainThread())//指定事件消费在主线程
.subscribe(new Action1<Object>() {
@Override
public void call(Object data) {
if(data==null){
return;
}
send(data);
}
});
}
private void send(Object data) {
for (Object subscriber : subscribers) {
//扫描注解,将数据发送到注册的对象的标记方法
callMethodByAnnotiation(subscriber,data);
}
}
/**
* 反射获取对象方法列表,判断
* 1.是否被注解修饰
* 2.参数类型是否和data类型一直
*/
private void callMethodByAnnotiation(Object target, Object data) {
Method[] methodArray = target.getClass().getDeclaredMethods();
for(int i=0;i<methodArray.length;i++){
try {
if(methodArray[i].isAnnotationPresent(RegisterBus.class)){
//被@RegisterBus修饰的方法
Class paramType = methodArray[i].getParameterTypes()[0];
if(data.getClass().getName().equals(paramType.getName())){
//参数类型和data类型一样,调用此方法
methodArray[i].invoke(target,new Object[]{data});
}
}
}catch (InvocationTargetException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
}
}
三、RxBus的测试
public class RxBusTest {
public static final String TAG="RxBusTest";
Presenter presenter;
@Before
public void setUp() throws Exception{
/**
* 初始化presenter并注册
*/
presenter=new Presenter(new Manager());
RxBus.getInstance().register(presenter);
}
@After
public void tearDown(){
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
RxBus.getInstance().unregister(presenter);
}
@Test
public void testGetUser() throws Exception{
presenter.getUser();
}
@Test
public void testGetOrder()throws Exception
{
presenter.getOrder();
}
/**
* 模拟Presenter
*/
class Presenter {
private Manager manager;
public Presenter(Manager manager) {
this.manager = manager;
}
public void getUser() {
manager.getUser();
}
public void getOrder() {
manager.getOrder();
}
/**
* 接受数据
*
*/
@RegisterBus
public void onUser(User user){
Log.d(TAG, "receiver User in thread:" + Thread.currentThread());
}
@RegisterBus
public void onOrder(Order order){
Log.d(TAG, "receiver Order in thread:" + Thread.currentThread());
}
}
/**
* 模拟model
*/
class Manager {
public void getUser() {
RxBus.getInstance().chainProcess(new Func1() {
@Override
public Object call(Object o) {
Log.d(TAG, "chainProcess getUser start in thread:" + Thread.currentThread());
User user = new User();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return user;
}
});
}
public void getOrder() {
RxBus.getInstance().chainProcess(new Func1() {
@Override
public Object call(Object o) {
Log.d(TAG, "chainProcess getUser start in thread:" + Thread.currentThread());
Order order = new Order();
return order;
}
});
}
}
/**
* 要返回的数据类型
*/
class User {
}
class Order {
}
}
|