Search Unity

  1. Megacity Metro Demo now available. Download now.
    Dismiss Notice
  2. Unity support for visionOS is now available. Learn more in our blog post.
    Dismiss Notice

UniRx - Reactive Extensions for Unity

Discussion in 'Assets and Asset Store' started by neuecc, May 28, 2014.

  1. neuecc

    neuecc

    Joined:
    Apr 20, 2014
    Posts:
    114
    Oh, sorry.
    It is bug, it was not my intended, "True" is same as ReactiveProperty.
    And I'll add ToReadOnlyReactiveProperty(bool distinctUntilChanged) overload for configure behaviour(default is true).

    I've tracked issue. https://github.com/neuecc/UniRx/issues/161
     
  2. neuecc

    neuecc

    Joined:
    Apr 20, 2014
    Posts:
    114
    This is concept idea code...

    Code (CSharp):
    1. list.Select(obj =>
    2. {
    3.     // before side-effect
    4.     return Observable.Delay(timeToDelay).Do(_ => { /* after side-effect */ });
    5. })
    6. .Concat() // do on sequential
    7. .RepeatUntilDestroy(this.gameObject)
    8. .Subscribe();
    But I recommend use Coroutine for simpleness.
    You can combinate Coroutine with `FromCoroutine/FromMicroCoroutine` and `ToYieldInstruction`.
     
  3. MV10

    MV10

    Joined:
    Nov 6, 2015
    Posts:
    1,889
    I noticed this from a little over a year ago. Is ObservableWWW using the old Unity WWW class or the new (no longer "experimental") UnityWebRequest?

    Edit: Never mind, I see that's System.Net.WebRequest.


    Any plans to implement support for UnityWebRequest?
     
    Last edited: Aug 25, 2016
  4. Adoll

    Adoll

    Joined:
    Apr 16, 2016
    Posts:
    20
    Hi,neuecc
    I am still struggling to wrap my head around Rx.
    Now I have a stream which is returned by EveryUpdate() where space bar is pressed.
    I can subscribe to this stream and do staff. But I am wondering if there is a way to subscribe to the same stream and do staff whenever space bar is not pressed. I know I can create another stream for this, but I suspect there is some operator existing to inverse the stream. Do you know how to work this around?
     
  5. neuecc

    neuecc

    Joined:
    Apr 20, 2014
    Posts:
    114
    Yes, I should needs implement it.
    Issue is here, I want to implement in the near updates.
    https://github.com/neuecc/UniRx/issues/101
     
  6. neuecc

    neuecc

    Joined:
    Apr 20, 2014
    Posts:
    114
    I can't point what is best practice, but I can provide some patterns.

    Code (CSharp):
    1. // returns only space down.
    2. public static IObservable<Unit> GetSpaceDown()
    3. {
    4.     return Observable.EveryUpdate()
    5.         .Where(_ => Input.GetKeyDown(KeyCode.Space))
    6.         .AsUnitObservable();
    7. }
    8.  
    9. // true or false on every update but if argument is true, only space down.
    10. public static IObservable<bool> GetSpaceDown2(bool returnOnlyDown = true)
    11. {
    12.     return Observable.EveryUpdate()
    13.         .Select(_ => Input.GetKeyDown(KeyCode.Space))
    14.         .Where(x => returnOnlyDown ? x : true);
    15. }
    16.  
    17. // share observable pattern for avoid multiple invoke selector.
    18. // (I don't recommend always use this pattern, but sometimes useful for optimize peformance)
    19. static IObservable<bool> getSpaceDown = null;
    20. public static IObservable<bool> GetSpaceDown3()
    21. {
    22.     if (getSpaceDown == null)
    23.     {
    24.         getSpaceDown = Observable.EveryUpdate()
    25.             .Select(_ => Input.GetKeyDown(KeyCode.Space))
    26.             .Share();
    27.     }
    28.  
    29.     return getSpaceDown;
    30. }
     
  7. Adoll

    Adoll

    Joined:
    Apr 16, 2016
    Posts:
    20
    Thanks a million, neuecc.
    You saved my day, and allowed me to see more diverse ways to utilize Rx.
     
  8. neuecc

    neuecc

    Joined:
    Apr 20, 2014
    Posts:
    114
    If upgrading to Unity 5.5, UniRx causes compiler error.
    It comes from StartCoroutine_Auto has been obsolete and causes by auto unity upgrade.

    If you fix manualy, change error line to
    `(dispatcher as MonoBehaviour).StartCoroutine(routine);`

    I'll release quick fix to AssetStore soon.
     
  9. Donbing

    Donbing

    Joined:
    May 6, 2015
    Posts:
    1
    Hi, first of all. awesome library!.

    i'm trying to find a nice way instantiate a bunch of objects async frm a WWW result.

    e.g. subscribe to a WWW request, then instantiate a game object foreach element in the result arrray. (but instantiate each object in a separate frame, using a coroutine?)

    can anyone offer some advice?
     
  10. jbelmonte

    jbelmonte

    Joined:
    Jan 9, 2015
    Posts:
    5
    How difficult is it to implement a custom dispatcher (or is it scheduler?) which uses a priority value other than time?

    For example, I have a pool of coroutines which may execute an expensive operation (image decode on main thread), so only one such operations is allowed per frame. Hence the coroutines are advanced in a certain order, so that the most important instances have first opportunity to do the expensive operation. Is there a better way to model this situation?

    Thank you
     
  11. neuecc

    neuecc

    Joined:
    Apr 20, 2014
    Posts:
    114
    WWW operation is async, the result is always another frame.
    recommended way is to deliver async signature(IObservable) to the top.

    Code (CSharp):
    1. IObservable<XxxComponent[]> CreateAsync()
    2. {
    3.     return ObservableWWW.Get("http://......")
    4.         .Select(x =>
    5.         {
    6.             // deserialize, instantiate elements...
    7.            
    8.             return XxxComponent[];
    9.         });
    10. }
     
  12. neuecc

    neuecc

    Joined:
    Apr 20, 2014
    Posts:
    114
    Operator combinations can do three pattern.

    Code (CSharp):
    1. IObservable<Unit>[] manyCoroutinesWrappedObservableFromCoroutine;
    2.  
    3. // take only first one.
    4. Observable.Amb(manyCoroutinesWrappedObservableFromCoroutine);
    5.  
    6. // subscribe in concurrent
    7. Observable.Merge(manyCoroutinesWrappedObservableFromCoroutine);
    8.  
    9. // subscribe in concurrent and await all completed
    10. Observable.WhenAll(manyCoroutinesWrappedObservableFromCoroutine);
    But the other manual control, maybe should use standard coroutine and wrap by FromCoroutine.

    Code (CSharp):
    1. public IObservable<Unit> Manual()
    2. {
    3.     return Observable.FromCoroutine<Unit>(observer = ManualCore(observer));
    4. }
    5.  
    6. IEnumerator ManualCore(IObserver<Unit> observer)
    7. {
    8.     IObservable<Unit> priorityOne;
    9.     IObservable<Unit>[] others;
    10.  
    11.     var a = priorityOne.ToYieldInstruction();
    12.     yield return a;
    13.     observer.OnNext(a.Result);
    14.  
    15.     yield return Observable.WhenAll(others).ToYieldInstruction();
    16.     observer.OnNext(/* ??? */);
    17. }
     
  13. neuecc

    neuecc

    Joined:
    Apr 20, 2014
    Posts:
    114
    Today accepted new UniRx ,but sorry I create package by 5.5 beta.
    I need to use Unity 5.0, I'll re-package and upload soon.
     
  14. neuecc

    neuecc

    Joined:
    Apr 20, 2014
    Posts:
    114
    Published 5.4.1 on AssetStore.

    The main feature is to support Unity 5.5 Beta.

    ```
    Improvement: Support Unity 5.5 Beta
    Fix: ThrottleFirstFrame throws NullReferenceException when stream sends OnNext immediately. #160, thanks @hsanno
    Fix: Error on LogEntry.cs #164, thanks @kimsama
    Fix: ToReadOnlyReactiveProperty should be distinct publishing #161
    Fix: To(ReadOnly)ReactiveProperty cause strange result when source observable returns OnCompleted or OnError
    (Breaking) Changes: PresenterBase was obsoleted(but it only commented, not marked [Obsolete])
    Breaking Changes: Sample14 was removed
    Breaking Changes: Removed ObservableMonoBehaviour, TypedMonoBehaviour in UNITY 5.5 or newer
    ```

    I've started medium in English, I've written more details about this update.
    https://medium.com/@neuecc/update-unirx-5-4-1-and-linq-to-gameobject-2-2-d89005be55f6#.d99uucqfj
    Please check it and follow me.
     
  15. daleknauss

    daleknauss

    Joined:
    Sep 15, 2016
    Posts:
    1
    Hello, I'm new to UniRx and am trying to figure out the best way to structure things. I'm trying to make an input manager that works for various VR controllers and streams the state when they change.

    It works but I'm not sure if I'm setting this up correctly. I'm also not sure if I'm disposing of streams correctly game object is destroyed.

    Here's a condensed version of what I did:

    Code (CSharp):
    1. public class InputManager : Singleton<InputManager> {
    2.     IGenericInput input;
    3.  
    4.     void Awake() {
    5.         // TODO Switch statement for different controllers
    6.         input = new GvrInputManager();
    7.     }
    8.  
    9.     public IObservable<Quaternion> OrientationStream() {
    10.         return input.OrientationStream();
    11.     }
    12.  
    13.     public IObservable<bool> TouchDownStream() {
    14.         return input.TouchDownStream();
    15.     }
    16. }
    Code (CSharp):
    1. public class GvrInputManager : IGenericInput {
    2.  
    3.     public IObservable<bool> TouchDownStream() {
    4.           return Observable.EveryUpdate()
    5.             .Select(_ => GvrController.TouchDown)
    6.             .Where(x => x);
    7.        }
    8.  
    9.     public IObservable<Quaternion> OrientationStream() {
    10.           return Observable.EveryUpdate()
    11.              .Select(_ => GvrController.Orientation);
    12.       }
    13. }
    Code (CSharp):
    1. public interface IGenericInput {
    2.     IObservable<Quaternion> OrientationStream();
    3.     IObservable<bool> TouchDownStream();
    4. }
    And using it:

    Code (CSharp):
    1. public class LaserPointerController : MonoBehaviour {
    2.     void Start() {
    3.         SetRotationStream();
    4.         SetTouchDownStream();
    5.     }
    6.     private void SetRotationStream() {
    7.         InputManager.Instance.OrientationStream()
    8.             .Subscribe(orientation => {
    9.                 transform.rotation = orientation;
    10.             }).AddTo(this);
    11.     }
    12.  
    13.     private void SetTouchDownStream() {
    14.         InputManager.Instance.TouchDownStream()
    15.             .Subscribe(_ => {
    16.                 TogglePointer();
    17.             }).AddTo(this);
    18.     }
    19. }
    Any feedback would be greatly appreciated.

    Thank you!
     
  16. Bomanden

    Bomanden

    Joined:
    Sep 7, 2014
    Posts:
    2
    has anyone tried to use unirx to monitor user inactivity in unity ?
    wondering what would be the right setup to fire/reset a timer
     
  17. neuecc

    neuecc

    Joined:
    Apr 20, 2014
    Posts:
    114
    Hi, today accepted UniRx 5.5.0 on Unity AssetStore.
    This release contains new Experimental support for Unity .NET 4.6 async/await

    //

    Add: ObserveEveryValueChanged(bool fastDestroyCheck) overload
    Add: Observable.Sample(intervalSourcea) overload, thanks @svermeulen
    Fix: UWP Platform compile error
    Breaking Changes: Remove workaround for Unity WWW dispose freeze bug

    Unity 5.5.0xMono-Runtime-Upgrade-1-b4 Support details
    ---
    use System.IObservable, IObserver instead of UniRx.IObservable, IObserver
    use System.Tuple instad of UniRx.Tuple (but lost ValueType tuple, should we create other tuple?)
    use System.IProgress instead of UniRx.Progress
    use System.Threading.CancellationToken instead of UniRx.CancellationToken
    add GetAwaiter to UniRx for supports async/await for IObservable
    create UniRxSynchronizationContext to support async/await completly
    add ToObservable to Task
    supplemental Unity coroutine support async/await

    //

    I blogged details of async/await supports.
    https://medium.com/@neuecc/async-await-support-for-unity-with-unirx-421a958408ed#.qo4abt4w5
     
    zyzyx likes this.
  18. neuecc

    neuecc

    Joined:
    Apr 20, 2014
    Posts:
    114
    aalmada, jprocha101 and yuliyF like this.
  19. Fraser-K

    Fraser-K

    Joined:
    Oct 7, 2016
    Posts:
    1
    Very nice library, will be very useful in writing clean code.

    I have a question on best practices for heavy multi-threading in an MVP/MVC design. Say I have four related properties that are being updated at the same time by a sub-thread...if I use four ReactiveProperties each with their own ObserveOnMainThread subscribers elsewhere that will result in four thread join operations, correct? My understanding from your docs is that each set call will join on the main Unity UI thread, call the subscribers then return back to the calling thread. I try to avoid joins where possible so I was wondering if the following pattern might be slightly more efficient?:

    Code (CSharp):
    1. public class MyDto {
    2.     public float val1;
    3.     public float val2;
    4.     public float val3;
    5.     public float val4;
    6. }
    7.  
    8. private readonly MyDto _dto = new MyDto();
    9.  
    10. public ReactiveProperty<MyDto> dto { get; private set; }
    11.  
    12. public void threadMainLoop() {
    13.     while(running) {
    14.  
    15.         _dto.val1 = calculateVal1();
    16.         _dto.val2 = calculateVal2();
    17.         _dto.val3 = calculateVal3();
    18.         _dto.val4 = calculateVal4();
    19.  
    20.         dto.SetValueAndForceNotify(_dto);
    21.  
    22.     }
    23. }
    My thinking is that here we only have one join and there is no garbage generation through creating a new DTO object each pass. Looking at the UniRx source suggests SetValueAndForceNotify ought to work for this with no issues but I'm new to unity & dotnet so am unsure on the behind-the-scenes working of the runtime. I know I can implement the Equality interfaces on the object but I think that's maybe overkill in this case and inefficient when I know it's only being set upon an actual change.

    Alternatively, do you have any thoughts on an "invoke later" pattern, placing the UI update call in a queue & returning immediately? Java/Swing has one such method & it's very useful for writing efficient sub threads that interface with a single-threaded UI as there are no joins. Downside is that you cannot guarantee ordering however for displaying values on screen that is generally not an issue.
     
  20. aalmada

    aalmada

    Joined:
    Apr 29, 2013
    Posts:
    21
    Hi,
    Is there any special reason for ObservableTriggerExtensions not include AwakeAsObservable and StartAsObservable extension methods? Also, Awake and Start are handled in ObservableTriggerBase but why are they kept private?
    Thanks
     
  21. aalmada

    aalmada

    Joined:
    Apr 29, 2013
    Posts:
    21
    I'm sorry, ObservableTriggerBase has AwakeAsObservable() and StartAsObservable(). What's missing is only the extension methods.
     
  22. mle_samlabs

    mle_samlabs

    Joined:
    Jul 14, 2016
    Posts:
    2
  23. mrpmorris

    mrpmorris

    Joined:
    Dec 25, 2012
    Posts:
    50
    Does anyone have any plans to create an extension for handling HTC Vive interactions?
     
  24. neuecc

    neuecc

    Joined:
    Apr 20, 2014
    Posts:
    114
    Our company using UniRx with VRTK for Vive programming.
    simply wrapped by FromEvent.

    Code (CSharp):
    1. public static class ControllerEventsExtensions
    2. {
    3.     public static IObservable<ControllerInteractionEventArgs> ControllerTriggerPressedAsObservable(this ControllerEvents self)
    4.     {
    5.         if (self == null) throw new ArgumentNullException("ControllerEvents : self");
    6.  
    7.         return Observable.FromEvent<ControllerInteractionEventHandler, ControllerInteractionEventArgs>
    8.         (
    9.             h => (sender, e) => h(e),
    10.             h => self.TriggerPressed += h,
    11.             h => self.TriggerPressed -= h
    12.         );
    13.     }
    14.  
    15.     // and more...
    16. }
     
  25. mrpmorris

    mrpmorris

    Joined:
    Dec 25, 2012
    Posts:
    50
    Hi Neuecc

    I already like the look of this approach for only receiving notifications of changes in the mouse position.

    Observable.EveryFixedUpdate()
    .Select(_ => Input.mousePosition)
    .Distinct()
    .Subscribe(_ => Debug.Log(Time.time + "=" + _));

    So that I can get my head around your implementation, perhaps you could explain something to me?

    Traditionally I'd get the location of the Vive controller in every update, do a sphere cast to see which game objects are within a certain distance, filter that list down to any game object that has a component implementing a custom interface (e.g. IControllerInteractable), and then execute a method on the closest one.

    How would this look in UniRx?

    Many thanks.
     
  26. RobGraat

    RobGraat

    Joined:
    Oct 12, 2014
    Posts:
    11
    Hi @neuecc

    (EDIT: Got my answer at: http://stackoverflow.com/questions/...ables-resolved-entirely-for-each-subscription
    Cold Observables run their entire chain for each subscriber. Hot Observables share a single chain across multiple subscribers.)

    I am trying to lazily concatenate a sequence of observables. After an observable has completed, the GetLazyObservableSequence function will return the next observable to be concatenated.

    Code (CSharp):
    1. using UnityEngine;
    2. using UniRx;
    3. using System.Collections.Generic;
    4. using System.Linq;
    5.  
    6. public class ConcatenatedObservableSequence : MonoBehaviour
    7. {
    8.     protected void Start()
    9.     {
    10.         var subscribeText = new Subject<string>();
    11.  
    12.         Debug.Log("TEST WITH SUBSCRIPTION");
    13.  
    14.         GetObservableWithSubscription(new Data(subscribeText, "INIT"))
    15.             .Concat()
    16.             .Subscribe(data => Debug.Log("Received: " + data.Text));
    17.  
    18.         subscribeText.OnNext("SUBSCRIBE");
    19.  
    20.         Debug.Log("\n TEST WITHOUT SUBSCRIPTION");
    21.  
    22.         var doText = new Subject<string>();
    23.  
    24.         GetObservableWithoutSubscription(new Data(doText, "INIT"))
    25.             .Concat()
    26.             .Subscribe(data => Debug.Log("Subscribe: " + data.Text));
    27.  
    28.         doText.OnNext("DO");
    29.     }
    30.  
    31.     /// Function that lazily evaluates and returns a
    32.     /// new observable after the previous has been completed.
    33.     /// Uses a subscription to update the nextData.
    34.     private static IEnumerable<IObservable<Data>> GetObservableWithSubscription(Data initialData)
    35.     {
    36.         var nextData = initialData;
    37.  
    38.         while (true)
    39.         {
    40.             var currentData = nextData;
    41.  
    42.             currentData.DataObservable
    43.                 .Subscribe(changedData => nextData = changedData);
    44.  
    45.             yield return currentData.DataObservable;
    46.         }
    47.     }
    48.  
    49.     /// Uses a Do method to update the nextData.
    50.     private static IEnumerable<IObservable<Data>> GetObservableWithoutSubscription(Data initialData)
    51.     {
    52.         var nextData = initialData;
    53.  
    54.         while (true)
    55.         {
    56.             var currentData = nextData;
    57.  
    58.             yield return currentData.DataObservable
    59.                 .Do(changedData => nextData = changedData);
    60.         }
    61.     }
    62.  
    63.     /// Represents a immutable data container that streams
    64.     /// a new instance containing updated data.
    65.     private class Data
    66.     {
    67.         public readonly string Text;
    68.  
    69.         public readonly IObservable<Data> DataObservable;
    70.  
    71.         public Data(IObservable<string> textObservable, string text)
    72.         {
    73.             /// The Concat extension method will call this function again
    74.             /// after a returned observable completes.
    75.             /// The First() method makes sure that the observable
    76.             /// completes after having observed the fist item.
    77.             this.Text = text;
    78.  
    79.             this.DataObservable = textObservable
    80.                 .First()
    81.                 .Select(nextText =>
    82.                 {
    83.                     Debug.Log("Do Expensive Computation: " + nextText);
    84.                     return nextText;
    85.                 } )
    86.                 .Select(nextText => new Data(textObservable, nextText));
    87.         }
    88.     }
    89. }
    90.  
    Result:
    I do not understand why the Data.DataObservable seems to be triggered twice as proven by the "Do Expensive Computation: SUBSCRIBE" log entry.

    The Data.DataObservable is only triggered once if I remove the subscription in the GetObservable function and replace it with a Do method to change the nextData reference in GetObservableWithoutSubscription.

    Is this intented behaviour?

    Cheers,
    Rob
     
    Last edited: Oct 31, 2016
  27. michal_gjk

    michal_gjk

    Joined:
    Aug 13, 2016
    Posts:
    69
    Hi.

    When using Observable.Interval(TimeSpan.FromMilliseconds(250)) what's the recommended way to ensure the subscription will get invoked during the next EndOfFrame after the timer expires ?

    With the way I understand it Intervals are normally triggered during Update(). Is that correct?
     
  28. NicolasVidal

    NicolasVidal

    Joined:
    Dec 18, 2013
    Posts:
    3
    Hi neuecc !

    First, thanks a lot for providing this Rx port for Unity, it's a life saver! :)
    I'm on a quest of doing the minimum heap allocation count possible during Gameplay and I was wondering something about the Repeat orperator.

    Let's imagine I've got a complicated transformation which will trigger OnNext only once per event, and I want it to be performed again on each OnNext. Using Repeat, this is how it looks like:

    Code (CSharp):
    1. ComplicatedAsyncTransformation<Unit, Unit>(Observable.Return(Unit.Default))
    2.                 .Repeat()
    3.                 .Subscribe();
    The thing is everytime Repeat is triggered (after the previous OnComplete), it triggers another subscription and hence a heap allocation.

    I managed to achieved the same desired behaviour with a Subject without any heap allocation after the initial subscription:

    Code (CSharp):
    1. var repeatPump = new Subject<Unit>();
    2.  
    3.             ComplicatedAsyncTransformation<Unit, Unit>(repeatPump)
    4.                 .Subscribe(_ => repeatPump.OnNext(Unit.Default));
    5.  
    6.             repeatPump.OnNext(Unit.Default);
    But I'm trying to use UniRx without requiring the use of Subjects for the most part.

    Do you see a way of doing that without using a Subject?

    Thanks a lot anyway !
     
  29. heroichippo1

    heroichippo1

    Joined:
    Mar 30, 2015
    Posts:
    36
    Last edited: Dec 23, 2016
  30. onesoftgames

    onesoftgames

    Joined:
    Dec 27, 2016
    Posts:
    6
    Hi! This is really awesome lib! Thank you very much!
    I want to build a simple chain of coroutine functions (execute in order).
    I run bellow code from github readme but i do not understand what SelectMany for and if this suitable in my case.
    So how can i do it? (I want AsyncA execute complete and then AsyncB, AsyncC and so on..)
    Sorry if it too simple question. I just get start with this lib!

    Code (CSharp):
    1. // two coroutines
    2.  
    3. IEnumerator AsyncA()
    4. {
    5.     Debug.Log("a start");
    6.     yield return new WaitForSeconds(1);
    7.     Debug.Log("a end");
    8. }
    9.  
    10. IEnumerator AsyncB()
    11. {
    12.     Debug.Log("b start");
    13.     yield return new WaitForEndOfFrame();
    14.     Debug.Log("b end");
    15. }
    16.  
    17. // main code
    18. // Observable.FromCoroutine converts IEnumerator to Observable<Unit>.
    19. // You can also use the shorthand, AsyncA().ToObservable()
    20.  
    21. // after AsyncA completes, run AsyncB as a continuous routine.
    22. // UniRx expands SelectMany(IEnumerator) as SelectMany(IEnumerator.ToObservable())
    23. var cancel = Observable.FromCoroutine(AsyncA)
    24.     .SelectMany(AsyncB)
    25.     .Subscribe();
    26.  
    27. // you can stop a coroutine by calling your subscription's Dispose.
    28. cancel.Dispose();
     
  31. garcia-raul

    garcia-raul

    Joined:
    Dec 8, 2014
    Posts:
    10
    Hi!, so I did this method with RxJava in Android that returns an Observable of a pojo like this:

    Code (csharp):
    1.  
    2.     public Observable<MyPojo> doSomething(final String param) {
    3.        return Observable.defer(new Func0<Observable<MyPojo>>() {
    4.             @Override
    5.             public Observable<PasswordResetResponse> call() {
    6.                 return Observable.just(processParam(param));
    7.             }
    8.          
    9.             private MyPojo processParam(final String paramL) {
    10.                 // Bussiness logic
    11.                 Pojo pojo = new Pojo();
    12.                 pojo.setParam(paramL);
    13.                 return new Pojo();
    14.             }
    15.        });
    16.     }
    17.  
    And then in my activity I consume it like this:
    Code (csharp):
    1.  
    2.     myInstance.doSomething("hello")
    3.     .subscribeOn(Schedulers.io()) // does the work on the io thread
    4.     .observeOn(AndroidSchedulers.mainThread()) // returns result to the main thread
    5.     .subscribe(new Observer<MyPojo>() {
    6.         @Override
    7.         public void onCompleted() {
    8.         }
    9.         @Override
    10.         public void onError(Throwable e) {
    11.         }
    12.         @Override
    13.         public void onNext(MyPojo s) {
    14.             Log.d(TAG, s.getParam());
    15.         }
    16.     });
    17.  
    Now I want to do this in Unity with C# and UniRx, how will this be?
    Thanks in advance.
     
  32. CCSEducation

    CCSEducation

    Joined:
    Mar 8, 2017
    Posts:
    12
    Guys ,

    I am new to uniRX but i have a problem understanding how i can make something possible on the observablewww.get


    Code (CSharp):
    1. ObservableWWW.Get (_urlHolder.url)
    2.                     .Subscribe (
    3.                     x => Debug.Log (x.Substring (0, 100)), // onSuccess
    4.                     ex => Debug.LogException (ex)); // onError
    5.  
    6.  
    The problem is that i cannot get the onSuccess "x.Substring" to be stored to a global variable. I have tried quite lots and i am failing to get the concept. Anyone can help?
     
  33. CCSEducation

    CCSEducation

    Joined:
    Mar 8, 2017
    Posts:
    12
    Code (CSharp):
    1. public void StartUrl ()
    2.  
    3.         {
    4.             if (_urlHolder.urlPurpose == UrlPurpose.rooting) {
    5.  
    6.                 ObservableWWW.Get (_urlHolder.url).SubscribeOnMainThread ()
    7.                     .Subscribe (
    8.                     x => _jsonString = x, // onSuccess
    9.                     ex => Debug.LogException (ex)); // onError
    10.                
    11.             } else {
    12.                 Debug.Log ("The urlHolder object instance does not have a generated url.");
    13.             }
    14.             Debug.Log (_jsonString);
    15.         }
    This is where the Debug.Log(_jsonString) is always empty. I cannot understand why though.
     
  34. MV10

    MV10

    Joined:
    Nov 6, 2015
    Posts:
    1,889
    Think of the "code" portion of onSuccess like any other function -- wrap it in braces like so:

    Code (csharp):
    1. (x) => { _jsonString = x; }
    You can even do multiple lines this way:

    Code (csharp):
    1. (x) => {
    2.     _jsonString = x;
    3.     Debug.Log(x);
    4. }
     
  35. CCSEducation

    CCSEducation

    Joined:
    Mar 8, 2017
    Posts:
    12
    Yeah i am familiar with lamdas like that. But i cannot get it why i cannot store the value to a global variable. Somehow i am failing to understand the proper way to do it.
     
  36. Leslie-Young

    Leslie-Young

    Joined:
    Dec 24, 2008
    Posts:
    1,148
    If I understand this correctly (I am very new to this kit myself) and ObservableWWW.Get is async then your Debug.Log (_jsonString); executes before the ObservableWWW.Get is done.

    What is happening is that you call ObservableWWW.Get. It sets up to do whatever it will do and returns so that the next lines of code can execute.

    To see what jsonString is you want to do what MV10 suggested and do the Debug.Log directly after where you set it. Of course for whatever it is you doing you will then make further calls to tell whatever needed to know that the json string is set and can be processed.



     
    CCSEducation and MV10 like this.
  37. Leslie-Young

    Leslie-Young

    Joined:
    Dec 24, 2008
    Posts:
    1,148
    I read this and now I am worried I am doing something wrong.

    I do this when a button is pressed

    Code (CSharp):
    1. Observable.Timer(System.TimeSpan.FromMilliseconds(500)).Subscribe(_ => { some code here });
    Is this bad since this button can be pressed over and over again (but not within the 500 ms timeout)? If I understand correctly then I am adding a bunch of timers which are not being used after 500ms and never removed/ cleaned up.

    How do I clean it up?
     
  38. MV10

    MV10

    Joined:
    Nov 6, 2015
    Posts:
    1,889
    Well, the assignment part should work fine as you wrote it. After all, that comes directly from the UniRx page. But what Leslie-Young wrote is also true:

    The way you wrote it, Debug.Log executes before the Subscribe ever runs to assign the value. The sequence looks like this at runtime:

    1. Define the Get call (it is not executed yet, just prepared)
    2. Define the Subscribe "event" when Get is completed
    3. Define the Error "event" if the Get fails
    4. Send the HTTP request
    5. Execute the Debug.Log
    6. Receive HTTP response
    7. Trigger Subscribe event (assign Get results)

    The way I wrote it, Debug.Log is part of what happens when Subscribe gets called:

    1. Define the Get call (it is not executed yet, just prepared)
    2. Define the Subscribe "event" when Get is completed
    3. Define the Error "event" if the Get fails
    4. Send the HTTP request
    5. Receive HTTP response
    6. Trigger Subscribe event (assign Get results, execute Debug.Log)

    Technically, we also need to know whether ObservableWWW.Get is hot or cold, which tells us whether it can return results immediately or it waits until it has a Subscriber. Unfortunately the UniRx and ReactiveX docs don't clearly indicate this, as far as I can tell. Maybe it can be controlled somehow? But in reality, this simple code would set up the Subscriber far more quickly than any "hot" network request could trigger Subscriber events, so it's probably not relevant. Still, it would be nice to know.
     
    CCSEducation likes this.
  39. CCSEducation

    CCSEducation

    Joined:
    Mar 8, 2017
    Posts:
    12
    Yes you are right @MV10
    It has to do with when anything is called. I was silly enough to have done some wrong call on the debug.log.

    Though i have now a new problem.

    Code (CSharp):
    1.  
    2. _responseHeaders = new Dictionary<string, string> ();
    3. _progressNotifier = new ScheduledNotifier<float> ();
    4. _progressNotifier.Subscribe (x => OnProgress (_www));
    5.  
    6. if (_urlHolder.urlPurpose == UrlPurpose.rooting) { // for rooting url.
    7.  
    8.      ObservableWWW.GetWWW (
    9.       "http://www.cultofmac.com",
    10.        headers: _responseHeaders,
    11.        progress: _progressNotifier)
    12.              
    13.        .Subscribe (
    14.        x => OnSuccess (x), // onSuccess
    15.        ex => OnError (ex, _urlHolder.url), // onError
    16.        () => OnComplete (_jsonString, _callBack)); // onComplete
    17.              
    18.              
    19. } else { // for the rest kind of urls.
    20.       Debug.Log ("The urlHolder object instance does not have a generated url.");
    21. }
    I just cannot find a way to post proper progress. If i just press a float on the progressNotifier then it is just a sum of the time that the get is taking to download the page. I wanted to be able to put there the WWW or at least handle the percentage of the page so that i can make a proper loading bar. Is there something that i am missing from the subscribe?
     
    Last edited: Mar 29, 2017
  40. MV10

    MV10

    Joined:
    Nov 6, 2015
    Posts:
    1,889
    I actually chose not to use UniRx so this is just a guess from the source code, but the Progress parameter can take a reference to an object that implements the IProgress<T> interface, which has a single Report(T value) method. So it looks like you'd have to derive from IProgress and implement the Report method, then pass that object to the GetWWW progress parameter. (I see no reason the same class calling GetWWW couldn't also implement IProgress and pass "this" to the parameter -- no need for a separate class, as long as that class is only running one GetWWW, or that single Report() implementation will work for any GetWWW the class executes.)
     
  41. Leslie-Young

    Leslie-Young

    Joined:
    Dec 24, 2008
    Posts:
    1,148
    Anyone know how I can stop this error message from happening when I stop play in editor?

    It is caused by this code. I guess the RepeatUntilDestroy(). TakeUntilDisable is just an attempt to prevent the error but did not work.

    Code (CSharp):
    1. private void Start()
    2. {
    3.     var eventTrigger = gameObject.GetOrAddComponent<ObservableEventTrigger>();
    4.  
    5.     eventTrigger.OnPointerClickAsObservable()
    6.         .Where(ev => ev.button == PointerEventData.InputButton.Right)
    7.         .TakeUntilDisable(this)
    8.         .RepeatUntilDestroy(gameObject)
    9.         .Subscribe(_ => Debug.Log("test"));
    10.  
     
  42. CCSEducation

    CCSEducation

    Joined:
    Mar 8, 2017
    Posts:
    12
    Are you using a singleton with this @Leslie-Young ? I usually got this kind of error messages when i was having a problem with singletons.

    On my problem by the way i found out what is the problem.
    Code (CSharp):
    1.  
    2.  
    3.         public void StartUrl ()
    4.         {
    5.             _responseHeaders = new Dictionary<string, string> ();
    6.  
    7.             _progressNotifier = new ScheduledNotifier<float> ();
    8.             _progressNotifier.Subscribe ((x) => OnProgress (x));
    9.  
    10.             if (_urlHolder.urlPurpose == UrlPurpose.rooting) { // for rooting url.
    11.                
    12.                 ObservableWWW.GetWWW (
    13.                     _urlHolder.url,
    14.                     headers: _responseHeaders,
    15.                     progress: _progressNotifier)
    16.                     .Subscribe (
    17.                     (x) => OnSuccess (x), // onSuccess
    18.                     (ex) => OnError (ex, _urlHolder.url), // onError
    19.                     () => OnComplete (_jsonString, _callBack)); // onComplete
    20.                
    21.                
    22.             } else { // for the rest kind of urls.
    23.                 Debug.Log ("The urlHolder object instance does not have a generated url.");
    24.             }
    25.         }
    My MAMP pro instance does not have proper headers it seems. So i was not getting any kind of progress about the data downloaded.

    I will have to check about it tomorrow.


    $output=$header."\n".$data;

    // create table header showing to download a xls (excel) file
    header("Content-type: application/octet-stream");
    header("Content-Disposition: attachment; filename=$export_filename");
    header("Cache-Control: public");
    header("Content-length: " . strlen($output); // tells file size
    header("Pragma: no-cache");
    header("Expires: 0");

    // output data
    echo $output;


    Info got from this post http://stackoverflow.com/questions/3561596/why-is-my-content-length-header-wrong/3563714#3563714
     
  43. Leslie-Young

    Leslie-Young

    Joined:
    Dec 24, 2008
    Posts:
    1,148
    This Start() is in an object held by a singleton but I do not think that is the problem, except if I am missing the point here. I do not use that mechanism where a singleton creates itself when it does not exist, in my game. All references are applied during Awake and set to null in OnDestroy again.

    Anyway, I moved the code to OnEnable since it works for this specific case where I can use RepeatUntilDisable(this) rather than RepeatUntilDestroy().

    Still curious why the error could happen cause I even got it with sample code from the github page. I created for example this simple script and placed it on an empty GameObject in a clean scene to be sure and get the error after I pressed Play and stopped.

    Code (CSharp):
    1. using UnityEngine;
    2. using UniRx;
    3. using UniRx.Triggers;
    4.  
    5. public class SomeTestObject : MonoBehaviour
    6. {
    7.     private void Start()
    8.     {
    9.         this.gameObject.OnMouseDownAsObservable()
    10.         .SelectMany(_ => this.gameObject.UpdateAsObservable())
    11.         .TakeUntil(this.gameObject.OnMouseUpAsObservable())
    12.         .Select(_ => Input.mousePosition)
    13.         .RepeatUntilDestroy(this) // safety way
    14.         .Subscribe(x => Debug.Log(x));
    15.     }
    16. }
    17.  
     
  44. CCSEducation

    CCSEducation

    Joined:
    Mar 8, 2017
    Posts:
    12
    I will try that as well to see why. It makes me curious to be honest.
     
  45. recursive

    recursive

    Joined:
    Jul 12, 2012
    Posts:
    669
    So if anyone is wondering what the status of nuecc and UniRx is, he put a post on the gihub issues page, looks like he's been busy and will have time to focus on UniRx again after his current project ships:

    https://github.com/neuecc/UniRx/issues/214
     
    zyzyx likes this.
  46. Leslie-Young

    Leslie-Young

    Joined:
    Dec 24, 2008
    Posts:
    1,148
    Does anyone know if you can and how to unsubsrcibe?

    I have UI elements which subscribe to properties of an object but I am reusing the UI objects. The objects being subscribed to will also not be destroyed necessarily.

    Think for example of a Card going into a player's hand. Now I reuse an existing Card UI to watch the properties on the card object. Later the card goes to the grave so the Card UI gets recycled and the actual card object do not get destroyed. It is just in another collection.

    Example code ...

    Code (CSharp):
    1. [SerializeField] private Text draftCost;
    2.  
    3. ....
    4.  
    5. public void SetCard(Card card)
    6. {
    7.    this.card = card;
    8.    if (this.card == null)
    9.    {
    10.        gameObject.SetActive(false);
    11.      
    12.        // unsubscribe here?
    13.        // ...
    14.  
    15.        return;
    16.    }
    17.  
    18.    // subscribing to card's proeprties
    19.    card.draftCost.SubscribeToText(draftCost);
     
  47. recursive

    recursive

    Joined:
    Jul 12, 2012
    Posts:
    669
    Under the hood, all Observer subscriptions implement IDisposable, allowing you to tear them down / cancel them and rebuild them if you need to.

    One of the handy things I've found is CompositeDisposable for recycled UI elements, I believe there are a few other special case ways to handle reusing subscriptions (or writing your own for performance only if absolutely necessary).

    CompositeDisposable allows you to manage a collection of disposables and dispose them all at once.
    The Clear() function will dispose whatever is currently part of the collection and remove the disposed elements.
    Dispose will dispose the elements and the CompositeDisposable itself (and any disposable added will be immediately disposed).

    If you just have the one subscription you can just store a reference to it's IDisposable and call Dispose() when you are done.

    Using CompositeDisposable:
    Code (CSharp):
    1. public class Card
    2. {
    3.     public IObservable<int> draftCost;
    4. }
    5.  
    6. public class Example : MonoBehaviour
    7. {
    8.     private readonly CompositeDisposable _subscriptions = new CompositeDisposable();
    9.  
    10.     [SerializeField] private Text _draftCost;
    11.  
    12.     private Card _card;
    13.  
    14.     public void Clear()
    15.     {
    16.         _subscriptions.Clear();
    17.     }
    18.  
    19.     public void SetCard(Card card)
    20.     {
    21.         if (_card == card) return;
    22.        
    23.         Clear();
    24.  
    25.         _card = card;
    26.  
    27.         if (_card == null) return;
    28.  
    29.         _card
    30.             .draftCost
    31.             .SubscribeToText(_draftCost)
    32.             .AddTo(_subscriptions);
    33.     }
    34.  
    35.     private void OnDestroy()
    36.     {
    37.         _subscriptions.Dispose();
    38.     }
    39. }
    Using just IDisposable:
    Code (CSharp):
    1. public class Card
    2. {
    3.     public IObservable<int> draftCost;
    4. }
    5.  
    6. public class Example : MonoBehaviour
    7. {
    8.     private IDisposable _subscription;
    9.  
    10.     [SerializeField]
    11.     private Text _draftCost;
    12.  
    13.     private Card _card;
    14.  
    15.     public void Clear()
    16.     {
    17.         if (_subscription == null) return;
    18.  
    19.         _subscription.Dispose();
    20.         _subscription = null;
    21.     }
    22.  
    23.     public void SetCard(Card card)
    24.     {
    25.         if (_card == card) return;
    26.  
    27.         Clear();
    28.  
    29.         _card = card;
    30.  
    31.         if (_card == null) return;
    32.  
    33.         _subscription = _card
    34.             .draftCost
    35.             .SubscribeToText(_draftCost);
    36.     }
    37.  
    38.     private void OnDestroy()
    39.     {
    40.         Clear();
    41.     }
    42. }

    If you dig through the docs and the source code, you can find more example usages.
     
  48. Leslie-Young

    Leslie-Young

    Joined:
    Dec 24, 2008
    Posts:
    1,148
    Thanks, CompositeDisposable looks to be what I needed to know about.
    I am still so stuck in the "Add/Remove" callback functions thinking.
     
  49. recursive

    recursive

    Joined:
    Jul 12, 2012
    Posts:
    669
    Yeah, it takes a bit to wrap your head around it but there's multiple built-in options for subscription lifecycle management.
    There's overloads of the AddTo() extention for IObserver<> that tie it's lifecycle to a MonoBehaviour or GameObject (will get cleaned up when OnDestroy() would be called, and ways of controlling subscriptions with OnEnable/Disable too.
     
  50. JakeShields

    JakeShields

    Joined:
    Feb 23, 2014
    Posts:
    13
    Cannot await UnityEngine.WWW and IEnumerator in Unity version 2017.1.0b3

    Compiler Error Message:

    [Error] Cannot await `UnityEngine.WWW' expression
    [Error] Cannot await `System.Collections.IEnumerator' expression

    Can anyone help me?