您好,欢迎来到源码搜藏!分享精神,快乐你我!提示:担心找不到本站?在百度搜索“源码搜藏”,网址永远不丢失!
  • 首 页
  • 在线工具
  • 当前位置:首页 > 安卓源码 > 技术博客 >

    RxJava源码浅析一:构造数据源

    时间:2016-12-05 15:17 来源:互联网 作者:源码搜藏 浏览:收藏 挑错 推荐 打印

    接触了一段时间RxJava,对它的原理还是有些模糊,打算看下它的源码。 支持原创,转载请注明出处。 RxJava构造数据的方式大概有三种: 1.create方法 ObservableString observable = Observable.create( new Observable.OnSubscribeString() { @Override publi

    接触了一段时间RxJava,对它的原理还是有些模糊,打算看下它的源码。

    支持原创,转载请注明出处。

    RxJava构造数据的方式大概有三种:

    1.create方法

    Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            subscriber.onNext("参数1");
            subscriber.onNext("参数2");
            subscriber.onCompleted();
        }
    });
    Subscriber<String> subscriber = new Subscriber<String>() {
        @Override
        public void onCompleted() {
            Log.i("dhn", "onCompleted");
        }
    
        @Override
        public void onError(Throwable e) {
        }
    
        @Override
        public void onNext(String s) {
            Log.i("dhn", "onNext " + s);
        }
    };
    observable.subscribe(subscriber);
    
    输出:
    onNext 参数1
    onNext 参数2
    onCompleted

    看下源码:

        public static <T> Observable<T> create(OnSubscribe<T> f) {
            return new Observable<T>(hook.onCreate(f));
        }

    我们创建了一个OnSubscribe对象,传递给create方法。
    hook.onCreate()方法

        public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) {
            return f;
        }

    原封不动返回OnSubscribe对象。

    protected Observable(OnSubscribe<T> f) {    this.onSubscribe = f;}

    所以创建完一个Observable会持有传入的OnSubscribe的引用。创建完Observable和OnSubscribe后调用observable.subscribe(subscriber)我们看下subscribe方法。

        public final Subscription subscribe(Subscriber<? super T> subscriber) {
            return Observable.subscribe(subscriber, this);
        }
    
    
        static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
         // validate and proceed
            if (subscriber == null) {
                throw new IllegalArgumentException("subscriber can not be null");
            }
            if (observable.onSubscribe == null) {
                throw new IllegalStateException("onSubscribe function can not be null.");
                /*
                 * the subscribe function can also be overridden but generally that's not the appropriate approach
                 * so I won't mention that in the exception
                 */
            }
    
            // new Subscriber so onStart it
            subscriber.onStart();
    
            /*
             * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
             * to user code from within an Observer"
             */
            // if not already wrapped
            if (!(subscriber instanceof SafeSubscriber)) {
                // assign to `observer` so we return the protected version
                subscriber = new SafeSubscriber<T>(subscriber);
            }
    
            // The code below is exactly the same an unsafeSubscribe but not used because it would 
            // add a significant depth to already huge call stacks.
            try {
                // hook.onSubscribeStart(observable, observable.onSubscribe)返回onSubscribe
                hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
                return hook.onSubscribeReturn(subscriber);
            } catch (Throwable e) {
                // special handling for certain Throwable/Error/Exception types
                Exceptions.throwIfFatal(e);
                // in case the subscriber can't listen to exceptions anymore
                if (subscriber.isUnsubscribed()) {
                    RxJavaPluginUtils.handleException(hook.onSubscribeError(e));
                } else {
                    // if an unhandled error occurs executing the onSubscribe we will propagate it
                    try {
                        subscriber.onError(hook.onSubscribeError(e));
                    } catch (Throwable e2) {
                        Exceptions.throwIfFatal(e2);
                        // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                        // so we are unable to propagate the error correctly and will just throw
                        RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                        // TODO could the hook be the cause of the error in the on error handling.
                        hook.onSubscribeError(r);
                        // TODO why aren't we throwing the hook's return value.
                        throw r;
                    }
                }
                return Subscriptions.unsubscribed();
            }
        }

    hook.onSubscribeStart(observable, observable.onSubscribe)返回onSubscribe,Onsubscribe是我们自己定义的我们看下它的call方法:

    Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            subscriber.onNext("参数1");
            subscriber.onNext("参数2");
            subscriber.onCompleted();
        }
    });

    依次调用传入的subscriber的onNext,onNext和onCompleted方法。由此我们知道,OnSubscribe这个类控制着发送什么数据发送数据的次序

    2. from方法

    String[] array = new String[]{"参数1", "参数2"};
    Observable<String> observable = Observable.from(array);
    Subscriber<String> subscriber = new Subscriber<String>() {
        @Override
        public void onCompleted() {
            Log.i("dhn", "onCompleted");
        }
    
        @Override
        public void onError(Throwable e) {
        }
    
        @Override
        public void onNext(String s) {
            Log.i("dhn", "onNext " + s);
        }
    };
    observable.subscribe(subscriber);
    
    输出:
    onNext 参数1
    onNext 参数2
    onCompleted

    我们看下from这个方法:

        public static <T> Observable<T> from(T[] array) {
            int n = array.length;
            if (n == 0) {
                return empty();
            } else
            if (n == 1) {
                return just(array[0]);
            }
            //这里用传入的数组构造一个OnSubscribeFromArray对象,这是一个OnSubscribe
            return create(new OnSubscribeFromArray<T>(array));
        }

    这里用传入的数组构造一个OnSubscribeFromArray对象,这是一个OnSubscribe,然后调用create方法,咦,又回到了第一种方法。我们看下OnSubscribeFromArray这个类。

    public final class OnSubscribeFromArray<T> implements OnSubscribe<T> {
        final T[] array;
        public OnSubscribeFromArray(T[] array) {
            this.array = array;
        }
    
        @Override
        public void call(Subscriber<? super T> child) {
            child.setProducer(new FromArrayProducer<T>(child, array));
        }
    
        static final class FromArrayProducer<T>
        extends AtomicLong
        implements Producer {
            /** */
            private static final long serialVersionUID = 3534218984725836979L;
    
            final Subscriber<? super T> child;
            final T[] array;
    
            int index;
    
            public FromArrayProducer(Subscriber<? super T> child, T[] array) {
                this.child = child;
                this.array = array;
            }
    
            @Override
            public void request(long n) {
                if (n < 0) {
                    throw new IllegalArgumentException("n >= 0 required but it was " + n);
                }
                if (n == Long.MAX_VALUE) {
                    if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
                        fastPath();
                    }
                } else
                if (n != 0) {
                    if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
                        slowPath(n);
                    }
                }
            }
    
            void fastPath() {
                final Subscriber<? super T> child = this.child;
    
                for (T t : array) {
                    if (child.isUnsubscribed()) {
                        return;
                    }
    
                    child.onNext(t);
                }
    
                if (child.isUnsubscribed()) {
                    return;
                }
                child.onCompleted();
            }
    
            void slowPath(long r) {
                final Subscriber<? super T> child = this.child;
                final T[] array = this.array;
                final int n = array.length;
    
                long e = 0L;
                int i = index;
    
                for (;;) {
    
                    while (r != 0L && i != n) {
                        if (child.isUnsubscribed()) {
                            return;
                        }
    
                        child.onNext(array[i]);
    
                        i++;
    
                        if (i == n) {
                            if (!child.isUnsubscribed()) {
                                child.onCompleted();
                            }
                            return;
                        }
    
                        r--;
                        e--;
                    }
    
                    r = get() + e;
    
                    if (r == 0L) {
                        index = i;
                        r = addAndGet(e);
                        if (r == 0L) {
                            return;
                        }
                        e = 0L;
                    }
                }
            }
        }
    }

    这个类的call方法

        @Override
        public void call(Subscriber<? super T> child) {
            child.setProducer(new FromArrayProducer<T>(child, array));
        }

    创建了一个FromArrayProducer,然后传给Subscriber的setProducer方法。

        public void setProducer(Producer p) {
            long toRequest;
            boolean passToSubscriber = false;
            synchronized (this) {
                toRequest = requested;
                producer = p;
                if (subscriber != null) {
                    // middle operator ... we pass through unless a request has been made
                    if (toRequest == NOT_SET) {
                        // we pass through to the next producer as nothing has been requested
                        passToSubscriber = true;
                    }
                }
            }
            // do after releasing lock
            if (passToSubscriber) {
                subscriber.setProducer(producer);
            } else {
                // we execute the request with whatever has been requested (or Long.MAX_VALUE)
                if (toRequest == NOT_SET) {
                    producer.request(Long.MAX_VALUE);
                } else {
                    producer.request(toRequest);
                }
            }
        }

    这个方法调用传入的FromArrayProducer的request方法,我们看下这个方法:

            @Override
            public void request(long n) {
                if (n < 0) {
                    throw new IllegalArgumentException("n >= 0 required but it was " + n);
                }
                if (n == Long.MAX_VALUE) {
                    if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
                        fastPath();
                    }
                } else
                if (n != 0) {
                    if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
                        slowPath(n);
                    }
                }
            }
    
            void fastPath() {
                final Subscriber<? super T> child = this.child;
    
                for (T t : array) {
                    if (child.isUnsubscribed()) {
                        return;
                    }
    
                    child.onNext(t);
                }
    
                if (child.isUnsubscribed()) {
                    return;
                }
                child.onCompleted();
            }
    
            void slowPath(long r) {
                final Subscriber<? super T> child = this.child;
                final T[] array = this.array;
                final int n = array.length;
    
                long e = 0L;
                int i = index;
    
                for (;;) {
    
                    while (r != 0L && i != n) {
                        if (child.isUnsubscribed()) {
                            return;
                        }
    
                        child.onNext(array[i]);
    
                        i++;
    
                        if (i == n) {
                            if (!child.isUnsubscribed()) {
                                child.onCompleted();
                            }
                            return;
                        }
    
                        r--;
                        e--;
                    }
    
                    r = get() + e;
    
                    if (r == 0L) {
                        index = i;
                        r = addAndGet(e);
                        if (r == 0L) {
                            return;
                        }
                        e = 0L;
                    }
                }
            }
        }

    slowPath方法中,依次遍历数组中每个元素,作为Subscriber.onNext的参数,遍历结束后调用Subscriber.onCompleted方法。和我们预期的一样。

    3.just方法

    Observable<String> observable = Observable.just("参数1", "参数2");
    Subscriber<String> subscriber = new Subscriber<String>() {
        @Override
        public void onCompleted() {
            Log.i("dhn", "onCompleted");
        }
    
        @Override
        public void onError(Throwable e) {
        }
    
        @Override
        public void onNext(String s) {
            Log.i("dhn", "onNext " + s);
        }
    };
    observable.subscribe(subscriber);
    
    输出:
    onNext 参数1
    onNext 参数2
    onCompleted

    我们看下just方法:

        public static <T> Observable<T> just(T t1, T t2) {
            return from((T[])new Object[] { t1, t2 });
        }

    很简单,将传入的参数组成数组然后调用from方法,又回到了方法2。

    总结

    本文探讨了RxJava中常见的创建数据源的方法,数据变化的部分后续探讨。

    RxJava源码浅析一:构造数据源转载http://www.codesocang.com/anzhuoyuanma/boke/33945.html
    标签:网站源码