Parallel Programming with .NET and C# NetStandard 2.0 client for the [Ripple WebSocket APIs]

ripple dot net


This time I’m going to share a piece of code which will help you learn more about Parallel Programming with .NET and C# NetStandard 2.0 client for the [Ripple WebSocket APIs].

To understand the concept better, I recommend you read this great article: https://blogs.msdn.microsoft.com/pfxteam/2012/02/02/await-synchronizationcontext-and-console-apps-part-3/

Here is a quote from the article:

C# and Visual Basic support two flavors of async methods: ones that return tasks (either Task or Task<T>) and ones that return void. The former use the returned Task to represent the completion of the async method. In the case of an “async void” method, however, there is no returned Task to represent the method’s processing. Instead, “async void” methods interact with the current SynchronizationContext to alert the context to the async method’s execution status. Before entering the body of the async method, if there is a current SynchronizationContext, it is retrieved and its OperationStarted method is called. And after the async method has completed, that same context has its OperationCompleted method called. Further, if an exception goes unhandled in the body of the async void method, the throwing of that exception is Post to the SynchronizationContext, so that the exception escapes back to the context for it to handle as it pleases.

What you’ll need?

  • RippleDotNet (Download here: https://github.com/chriswill/RippleDotNet)
  • Ripple.NetCore (Download from NuGet)
  • Visual Studio 2017
  • AsycPump Class (Provides a pump that supports running asynchronous methods on the current thread)

Before continuing I strongly suggest to have a look at Task-based Asynchronous Pattern (TAP) : https://www.microsoft.com/en-us/download/details.aspx?id=19957

And the code which will help you to get balance:

Using Statements (those statements do much more than just getting account balances):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
using System.Threading.Tasks;
using System.Windows.Forms;
using RippleDotNet.Extensions;
using RippleDotNet.Json.Converters;
using RippleDotNet.Model.Account;
using RippleDotNet.Model.Ledger.Objects;
using RippleDotNet.Model.Ledger;
using RippleDotNet.Model.Server;
using RippleDotNet.Requests.Account;
using RippleDotNet.Requests.Ledger;
using RippleDotNet.Requests.Transaction;
using RippleDotNet.Responses.Transaction.Interfaces;
using RippleDotNet.Responses.Transaction.TransactionTypes;
using RippleDotNet;
using RippleDotNet.Model.Transaction.Interfaces;
using RippleDotNet.Model;
using RippleDotNet.Model.Transaction;
using RippleDotNet.Model.Transaction.TransactionTypes;
using Ripple.Core.Types;
using Ripple.TxSigning;
using Newtonsoft.Json.Linq;
using Currency = RippleDotNet.Model.Currency;
using System.Threading;
using System.Collections.Concurrent;

The main code based on the Ripple.Net library:

1
2
3
4
5
6
7
8
        public async Task GetBalance()
        {
            IRippleClient client = new RippleClient("wss://s1.ripple.com");
            client.Connect();
            AccountInfo accountInfo = await client.AccountInfo(textBox2.Text);
            textBox1.Text = accountInfo.AccountData.Balance.ValueAsXrp.Value.ToString();
            client.Disconnect();
        }

And now we need AsyncPump Class:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
public static class AsyncPump
    {
        /// <summary>Runs the specified asynchronous method.</summary>
        /// <param name="asyncMethod">The asynchronous method to execute.</param>
        public static void Run(Action asyncMethod)
        {
            if (asyncMethod == null) throw new ArgumentNullException("asyncMethod");

            var prevCtx = SynchronizationContext.Current;
            try
            {
                // Establish the new context
                var syncCtx = new SingleThreadSynchronizationContext(true);
                SynchronizationContext.SetSynchronizationContext(syncCtx);

                // Invoke the function
                syncCtx.OperationStarted();
                asyncMethod();
                syncCtx.OperationCompleted();

                // Pump continuations and propagate any exceptions
                syncCtx.RunOnCurrentThread();
            }
            finally { SynchronizationContext.SetSynchronizationContext(prevCtx); }
        }

        /// <summary>Runs the specified asynchronous method.</summary>
        /// <param name="asyncMethod">The asynchronous method to execute.</param>
        public static void Run(Func<Task> asyncMethod)
        {
            if (asyncMethod == null) throw new ArgumentNullException("asyncMethod");

            var prevCtx = SynchronizationContext.Current;
            try
            {
                // Establish the new context
                var syncCtx = new SingleThreadSynchronizationContext(false);
                SynchronizationContext.SetSynchronizationContext(syncCtx);

                // Invoke the function and alert the context to when it completes
                var t = asyncMethod();
                if (t == null) throw new InvalidOperationException("No task provided.");
                t.ContinueWith(delegate { syncCtx.Complete(); }, TaskScheduler.Default);

                // Pump continuations and propagate any exceptions
                syncCtx.RunOnCurrentThread();
                t.GetAwaiter().GetResult();
            }
            finally { SynchronizationContext.SetSynchronizationContext(prevCtx); }
        }

        /// <summary>Runs the specified asynchronous method.</summary>
        /// <param name="asyncMethod">The asynchronous method to execute.</param>
        public static T Run<T>(Func<Task<T>> asyncMethod)
        {
            if (asyncMethod == null) throw new ArgumentNullException("asyncMethod");

            var prevCtx = SynchronizationContext.Current;
            try
            {
                // Establish the new context
                var syncCtx = new SingleThreadSynchronizationContext(false);
                SynchronizationContext.SetSynchronizationContext(syncCtx);

                // Invoke the function and alert the context to when it completes
                var t = asyncMethod();
                if (t == null) throw new InvalidOperationException("No task provided.");
                t.ContinueWith(delegate { syncCtx.Complete(); }, TaskScheduler.Default);

                // Pump continuations and propagate any exceptions
                syncCtx.RunOnCurrentThread();
                return t.GetAwaiter().GetResult();
            }
            finally { SynchronizationContext.SetSynchronizationContext(prevCtx); }
        }

        /// <summary>Provides a SynchronizationContext that's single-threaded.</summary>
        private sealed class SingleThreadSynchronizationContext : SynchronizationContext
        {
            /// <summary>The queue of work items.</summary>
            private readonly BlockingCollection<KeyValuePair<SendOrPostCallback, object>> m_queue =
                new BlockingCollection<KeyValuePair<SendOrPostCallback, object>>();
            /// <summary>The processing thread.</summary>
            private readonly Thread m_thread = Thread.CurrentThread;
            /// <summary>The number of outstanding operations.</summary>
            private int m_operationCount = 0;
            /// <summary>Whether to track operations m_operationCount.</summary>
            private readonly bool m_trackOperations;

            /// <summary>Initializes the context.</summary>
            /// <param name="trackOperations">Whether to track operation count.</param>
            internal SingleThreadSynchronizationContext(bool trackOperations)
            {
                m_trackOperations = trackOperations;
            }

            /// <summary>Dispatches an asynchronous message to the synchronization context.</summary>
            /// <param name="d">The System.Threading.SendOrPostCallback delegate to call.</param>
            /// <param name="state">The object passed to the delegate.</param>
            public override void Post(SendOrPostCallback d, object state)
            {
                if (d == null) throw new ArgumentNullException("d");
                m_queue.Add(new KeyValuePair<SendOrPostCallback, object>(d, state));
            }

            /// <summary>Not supported.</summary>
            public override void Send(SendOrPostCallback d, object state)
            {
                throw new NotSupportedException("Synchronously sending is not supported.");
            }

            /// <summary>Runs an loop to process all queued work items.</summary>
            public void RunOnCurrentThread()
            {
                foreach (var workItem in m_queue.GetConsumingEnumerable())
                    workItem.Key(workItem.Value);
            }

            /// <summary>Notifies the context that no more work will arrive.</summary>
            public void Complete() { m_queue.CompleteAdding(); }

            /// <summary>Invoked when an async operation is started.</summary>
            public override void OperationStarted()
            {
                if (m_trackOperations)
                    Interlocked.Increment(ref m_operationCount);
            }

            /// <summary>Invoked when an async operation is completed.</summary>
            public override void OperationCompleted()
            {
                if (m_trackOperations &&
                    Interlocked.Decrement(ref m_operationCount) == 0)
                    Complete();
            }
        }
    }

and finally run the AsycPump:

1
 AsyncPump.Run(() => GetBalance());

Did you like this post