data-pulse-provider.ts 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. import {
  2. LibrarySymbolInfo,
  3. SubscribeBarsCallback,
  4. } from '../../../charting_library/datafeed-api';
  5. import {
  6. GetBarsResult,
  7. HistoryProvider,
  8. } from './history-provider';
  9. import {
  10. getErrorMessage,
  11. logMessage,
  12. } from './helpers';
  13. interface DataSubscriber {
  14. symbolInfo: LibrarySymbolInfo;
  15. resolution: string;
  16. lastBarTime: number | null;
  17. listener: SubscribeBarsCallback;
  18. }
  19. interface DataSubscribers {
  20. [guid: string]: DataSubscriber;
  21. }
  22. export class DataPulseProvider {
  23. private readonly _subscribers: DataSubscribers = {};
  24. private _requestsPending: number = 0;
  25. private readonly _historyProvider: HistoryProvider;
  26. public constructor(historyProvider: HistoryProvider, updateFrequency: number) {
  27. this._historyProvider = historyProvider;
  28. setInterval(this._updateData.bind(this), updateFrequency);
  29. }
  30. public subscribeBars(symbolInfo: LibrarySymbolInfo, resolution: string, newDataCallback: SubscribeBarsCallback, listenerGuid: string): void {
  31. if (this._subscribers.hasOwnProperty(listenerGuid)) {
  32. logMessage(`DataPulseProvider: already has subscriber with id=${listenerGuid}`);
  33. return;
  34. }
  35. this._subscribers[listenerGuid] = {
  36. lastBarTime: null,
  37. listener: newDataCallback,
  38. resolution: resolution,
  39. symbolInfo: symbolInfo,
  40. };
  41. logMessage(`DataPulseProvider: subscribed for #${listenerGuid} - {${symbolInfo.name}, ${resolution}}`);
  42. }
  43. public unsubscribeBars(listenerGuid: string): void {
  44. delete this._subscribers[listenerGuid];
  45. logMessage(`DataPulseProvider: unsubscribed for #${listenerGuid}`);
  46. }
  47. private _updateData(): void {
  48. if (this._requestsPending > 0) {
  49. return;
  50. }
  51. this._requestsPending = 0;
  52. for (const listenerGuid in this._subscribers) { // tslint:disable-line:forin
  53. this._requestsPending += 1;
  54. this._updateDataForSubscriber(listenerGuid)
  55. .then(() => {
  56. this._requestsPending -= 1;
  57. logMessage(`DataPulseProvider: data for #${listenerGuid} updated successfully, pending=${this._requestsPending}`);
  58. })
  59. .catch((reason?: string | Error) => {
  60. this._requestsPending -= 1;
  61. logMessage(`DataPulseProvider: data for #${listenerGuid} updated with error=${getErrorMessage(reason)}, pending=${this._requestsPending}`);
  62. });
  63. }
  64. }
  65. private _updateDataForSubscriber(listenerGuid: string): Promise<void> {
  66. const subscriptionRecord = this._subscribers[listenerGuid];
  67. const rangeEndTime = parseInt((Date.now() / 1000).toString());
  68. // BEWARE: please note we really need 2 bars, not the only last one
  69. // see the explanation below. `10` is the `large enough` value to work around holidays
  70. const rangeStartTime = rangeEndTime - periodLengthSeconds(subscriptionRecord.resolution, 10);
  71. return this._historyProvider.getBars(subscriptionRecord.symbolInfo, subscriptionRecord.resolution, rangeStartTime, rangeEndTime)
  72. .then((result: GetBarsResult) => {
  73. this._onSubscriberDataReceived(listenerGuid, result);
  74. });
  75. }
  76. private _onSubscriberDataReceived(listenerGuid: string, result: GetBarsResult): void {
  77. // means the subscription was cancelled while waiting for data
  78. if (!this._subscribers.hasOwnProperty(listenerGuid)) {
  79. logMessage(`DataPulseProvider: Data comes for already unsubscribed subscription #${listenerGuid}`);
  80. return;
  81. }
  82. const bars = result.bars;
  83. if (bars.length === 0) {
  84. return;
  85. }
  86. const lastBar = bars[bars.length - 1];
  87. const subscriptionRecord = this._subscribers[listenerGuid];
  88. if (subscriptionRecord.lastBarTime !== null && lastBar.time < subscriptionRecord.lastBarTime) {
  89. return;
  90. }
  91. const isNewBar = subscriptionRecord.lastBarTime !== null && lastBar.time > subscriptionRecord.lastBarTime;
  92. // Pulse updating may miss some trades data (ie, if pulse period = 10 secods and new bar is started 5 seconds later after the last update, the
  93. // old bar's last 5 seconds trades will be lost). Thus, at fist we should broadcast old bar updates when it's ready.
  94. if (isNewBar) {
  95. if (bars.length < 2) {
  96. throw new Error('Not enough bars in history for proper pulse update. Need at least 2.');
  97. }
  98. const previousBar = bars[bars.length - 2];
  99. subscriptionRecord.listener(previousBar);
  100. }
  101. subscriptionRecord.lastBarTime = lastBar.time;
  102. subscriptionRecord.listener(lastBar);
  103. }
  104. }
  105. function periodLengthSeconds(resolution: string, requiredPeriodsCount: number): number {
  106. let daysCount = 0;
  107. if (resolution === 'D' || resolution === '1D') {
  108. daysCount = requiredPeriodsCount;
  109. } else if (resolution === 'M' || resolution === '1M') {
  110. daysCount = 31 * requiredPeriodsCount;
  111. } else if (resolution === 'W' || resolution === '1W') {
  112. daysCount = 7 * requiredPeriodsCount;
  113. } else {
  114. daysCount = requiredPeriodsCount * parseInt(resolution) / (24 * 60);
  115. }
  116. return daysCount * 24 * 60 * 60;
  117. }