/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
* All rights reserved.
*
* This source code is licensed under the license found in the
* LICENSE file in the root directory of this source tree.
*/
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading;
using Meta.Voice;
using Meta.WitAi.Configuration;
using Meta.WitAi.Data;
using Meta.WitAi.Data.Configuration;
using Meta.WitAi.Json;
using Meta.WitAi.Requests;
using UnityEngine;
using UnityEngine.Networking;
#if UNITY_EDITOR
using UnityEditor;
#endif
namespace Meta.WitAi
{
///
/// Manages a single request lifecycle when sending/receiving data from Wit.ai.
///
/// Note: This is not intended to be instantiated directly. Requests should be created with the
/// WitRequestFactory
///
public class WitRequest : VoiceServiceRequest
{
#region PARAMETERS
///
/// The wit Configuration to be used with this request
///
public WitConfiguration Configuration { get; private set; }
///
/// The request timeout in ms
///
public int Timeout { get; private set; } = 1000;
///
/// Encoding settings for audio based requests
///
public AudioEncoding AudioEncoding { get; set; }
[Obsolete("Deprecated for AudioEncoding")]
public AudioEncoding audioEncoding
{
get => AudioEncoding;
set => AudioEncoding = value;
}
///
/// Endpoint to be used for this request
///
public string Path { get; private set; }
///
/// Final portion of the endpoint Path
///
public string Command { get; private set; }
///
/// Whether a post command should be called
///
public bool IsPost { get; private set; }
///
/// Key value pair that is sent as a query param in the Wit.ai uri
///
[Obsolete("Deprecated for Options.QueryParams")]
public VoiceServiceRequestOptions.QueryParam[] queryParams
{
get
{
List results = new List();
foreach (var key in Options?.QueryParams?.Keys)
{
VoiceServiceRequestOptions.QueryParam p = new VoiceServiceRequestOptions.QueryParam()
{
key = key,
value = Options?.QueryParams[key]
};
results.Add(p);
}
return results.ToArray();
}
}
public byte[] postData;
public string postContentType;
public string forcedHttpMethodType = null;
#endregion PARAMETERS
#region REQUEST
///
/// Returns true if the request is being performed
///
public bool IsRequestStreamActive => IsActive || IsInputStreamReady;
///
/// Returns true if the response had begun
///
public bool HasResponseStarted { get; private set; }
///
/// Returns true if the response had begun
///
public bool IsInputStreamReady { get; private set; }
public AudioDurationTracker audioDurationTracker;
private HttpWebRequest _request;
private Stream _writeStream;
private object _streamLock = new object();
private int _bytesWritten;
private string _stackTrace;
private DateTime _requestStartTime;
private ConcurrentQueue _writeBuffer = new ConcurrentQueue();
#endregion REQUEST
#region RESULTS
///
/// The current status of the request
///
public string StatusDescription { get; private set; }
///
/// Simply return the Path to be called
///
public override string ToString() => Path;
///
/// Last response data parsed
///
private WitResponseNode _lastResponseData;
#endregion RESULTS
#region EVENTS
///
/// Provides an opportunity to provide custom headers for the request just before it is
/// executed.
///
public event OnProvideCustomHeadersEvent onProvideCustomHeaders;
public delegate Dictionary OnProvideCustomHeadersEvent();
///
/// Callback called when the server is ready to receive data from the WitRequest's input
/// stream. See WitRequest.Write()
///
public event Action onInputStreamReady;
///
/// Returns the raw string response that was received before converting it to a JSON object.
///
/// NOTE: This response comes back on a different thread. Do not attempt ot set UI control
/// values or other interactions from this callback. This is intended to be used for demo
/// and test UI, not for regular use.
///
public Action onRawResponse;
///
/// Provides an opportunity to customize the url just before a request executed
///
[Obsolete("Deprecated for WitVRequest.OnProvideCustomUri")]
public OnCustomizeUriEvent onCustomizeUri;
public delegate Uri OnCustomizeUriEvent(UriBuilder uriBuilder);
///
/// Allows customization of the request before it is sent out.
///
/// Note: This is for devs who are routing requests to their servers
/// before sending data to Wit.ai. This allows adding any additional
/// headers, url modifications, or customization of the request.
///
public static PreSendRequestDelegate onPreSendRequest;
public delegate void PreSendRequestDelegate(ref Uri src_uri, out Dictionary headers);
///
/// Returns a partial utterance from an in process request
///
/// NOTE: This response comes back on a different thread.
///
[Obsolete("Deprecated for Events.OnPartialTranscription")]
public event Action onPartialTranscription;
///
/// Returns a full utterance from a completed request
///
/// NOTE: This response comes back on a different thread.
///
[Obsolete("Deprecated for Events.OnFullTranscription")]
public event Action onFullTranscription;
///
/// Callback called when a response is received from the server off a partial transcription
///
[Obsolete("Deprecated for Events.OnPartialResponse")]
public event Action onPartialResponse;
///
/// Callback called when a response is received from the server
///
[Obsolete("Deprecated for Events.OnComplete")]
public event Action onResponse;
#endregion EVENTS
#region INITIALIZATION
///
/// Initialize wit request with configuration & path to endpoint
///
///
///
///
public WitRequest(WitConfiguration newConfiguration, string newPath,
WitRequestOptions newOptions, VoiceServiceRequestEvents newEvents)
: base(NLPRequestInputType.Audio, newOptions, newEvents)
{
// Set Configuration & path
Configuration = newConfiguration;
Path = newPath;
// Finalize
_initialized = true;
SetState(VoiceRequestState.Initialized);
}
///
/// Only set state if initialized
///
private bool _initialized = false;
protected override void SetState(VoiceRequestState newState)
{
if (_initialized)
{
base.SetState(newState);
}
}
///
/// Finalize initialization
///
protected override void OnInit()
{
// Determine configuration setting
Timeout = Configuration == null ? Timeout : Configuration.timeoutMS;
// Set request settings
Command = Path.Split('/').First();
IsPost = WitEndpointConfig.GetEndpointConfig(Configuration).Speech == this.Command
|| WitEndpointConfig.GetEndpointConfig(Configuration).Dictation == this.Command;
// Finalize bases
base.OnInit();
}
#endregion INITIALIZATION
#region AUDIO
// Handle audio activation
protected override void HandleAudioActivation()
{
SetAudioInputState(VoiceAudioInputState.On);
}
// Handle audio deactivation
protected override void HandleAudioDeactivation()
{
// If transmitting,
if (State == VoiceRequestState.Transmitting)
{
CloseRequestStream();
}
// Call deactivated
SetAudioInputState(VoiceAudioInputState.Off);
}
#endregion
#region REQUEST
// Errors that prevent request submission
protected override string GetSendError()
{
// No configuration found
if (Configuration == null)
{
return "Configuration is not set. Cannot start request.";
}
// Cannot start without client access token
if (string.IsNullOrEmpty(Configuration.GetClientAccessToken()))
{
return "Client access token is not defined. Cannot start request.";
}
// Cannot perform without input stream delegate
if (onInputStreamReady == null)
{
return "No input stream delegate found";
}
// Base
return base.GetSendError();
}
// Simple getter for final uri
private Uri GetUri()
{
// Get query parameters
Dictionary queryParams = new Dictionary(Options.QueryParams);
// Get uri using override
var uri = WitVRequest.GetWitUri(Configuration, Path, queryParams);
#pragma warning disable CS0618
if (onCustomizeUri != null)
{
#pragma warning disable CS0618
uri = onCustomizeUri(new UriBuilder(uri));
}
// Return uri
return uri;
}
// Simple getter for final uri
private Dictionary GetHeaders()
{
// Get default headers
Dictionary headers = WitVRequest.GetWitHeaders(Configuration, Options?.RequestId, false);
// Append additional headers
if (onProvideCustomHeaders != null)
{
foreach (OnProvideCustomHeadersEvent e in onProvideCustomHeaders.GetInvocationList())
{
Dictionary customHeaders = e();
if (customHeaders != null)
{
foreach (var key in customHeaders.Keys)
{
headers[key] = customHeaders[key];
}
}
}
}
// Return headers
return headers;
}
///
/// Start the async request for data from the Wit.ai servers
///
protected override void HandleSend()
{
// Begin
HasResponseStarted = false;
// Generate results
StatusCode = 0;
StatusDescription = "Starting request";
_bytesWritten = 0;
_requestStartTime = DateTime.UtcNow;
_stackTrace = "-";
// Get uri & headers
var uri = GetUri();
var headers = GetHeaders();
// Allow overrides
onPreSendRequest?.Invoke(ref uri, out headers);
#if UNITY_WEBGL && !UNITY_EDITOR
StartUnityRequest(uri, headers);
#else
#if UNITY_WEBGL && UNITY_EDITOR
if (IsPost)
{
VLog.W("Voice input is not supported in WebGL this functionality is fully enabled at edit time, but may not work at runtime.");
}
#endif
StartThreadedRequest(uri, headers);
#endif
}
#endregion REQUEST
#region HTTP REQUEST
///
/// Performs a threaded http request
///
private void StartThreadedRequest(Uri uri, Dictionary headers)
{
// Create http web request
_request = WebRequest.Create(uri.AbsoluteUri) as HttpWebRequest;
// Off to not wait for a response indefinitely
_request.KeepAlive = false;
// Configure request method, content type & chunked
if (forcedHttpMethodType != null)
{
_request.Method = forcedHttpMethodType;
}
if (null != postContentType)
{
if (forcedHttpMethodType == null) {
_request.Method = "POST";
}
_request.ContentType = postContentType;
_request.ContentLength = postData.Length;
}
if (IsPost)
{
_request.Method = string.IsNullOrEmpty(forcedHttpMethodType) ? "POST" : forcedHttpMethodType;
_request.ContentType = AudioEncoding.ToString();
_request.SendChunked = true;
}
// Apply user agent
if (headers.ContainsKey(WitConstants.HEADER_USERAGENT))
{
_request.UserAgent = headers[WitConstants.HEADER_USERAGENT];
headers.Remove(WitConstants.HEADER_USERAGENT);
}
// Apply all other headers
foreach (var key in headers.Keys)
{
_request.Headers[key] = headers[key];
}
// Apply timeout
_request.Timeout = Timeout;
// Begin calling on main thread if needed
WatchMainThreadCallbacks();
// Perform http post or put
if (_request.Method == "POST" || _request.Method == "PUT")
{
var getRequestTask = _request.BeginGetRequestStream(HandleWriteStream, _request);
ThreadPool.RegisterWaitForSingleObject(getRequestTask.AsyncWaitHandle,
HandleTimeoutTimer, _request, Timeout, true);
}
// Move right to response
else
{
StartResponse();
}
}
// Start response
private void StartResponse()
{
if (_request == null)
{
if (StatusCode == 0)
{
StatusCode = WitConstants.ERROR_CODE_GENERAL;
StatusDescription = $"Request canceled prior to start";
}
HandleNlpResponse(null, StatusDescription);
return;
}
var asyncResult = _request.BeginGetResponse(HandleResponse, _request);
ThreadPool.RegisterWaitForSingleObject(asyncResult.AsyncWaitHandle, HandleTimeoutTimer, _request, Timeout, true);
}
// Handle timeout callback
private void HandleTimeoutTimer(object state, bool timeout)
{
// Ignore false or too late
if (!timeout)
{
return;
}
// No longer active
StatusCode = WitConstants.ERROR_CODE_TIMEOUT;
StatusDescription = $"Request timed out after {(DateTime.UtcNow - _requestStartTime).Seconds:0.00} seconds";
// Clean up the current request if it is still going
if (null != _request)
{
_request.Abort();
}
// Close any open stream resources and clean up streaming state flags
CloseActiveStream();
// Complete
MainThreadCallback(() => HandleNlpResponse(null, StatusDescription));
}
// Write stream
private void HandleWriteStream(IAsyncResult ar)
{
try
{
// Start response stream
StartResponse();
// Get write stream
var stream = _request.EndGetRequestStream(ar);
// Got write stream
_bytesWritten = 0;
// Immediate post
if (postData != null && postData.Length > 0)
{
Debug.Log("Wrote directly");
_bytesWritten += postData.Length;
stream.Write(postData, 0, postData.Length);
stream.Close();
}
// Wait for input stream
else
{
// Request stream is ready to go
IsInputStreamReady = true;
_writeStream = stream;
// Call input stream ready delegate
if (onInputStreamReady != null)
{
MainThreadCallback(() => onInputStreamReady(this));
}
}
}
catch (WebException e)
{
// Ignore cancelation errors & if error already occured
if (e.Status == WebExceptionStatus.RequestCanceled || StatusCode != 0)
{
return;
}
// Write stream error
_stackTrace = e.StackTrace;
StatusCode = (int) e.Status;
StatusDescription = e.Message;
VLog.W(e);
MainThreadCallback(() => HandleNlpResponse(null, StatusDescription));
}
catch (Exception e)
{
// Call an error if have not done so yet
if (StatusCode != 0)
{
return;
}
// Non web error occured
_stackTrace = e.StackTrace;
StatusCode = WitConstants.ERROR_CODE_GENERAL;
StatusDescription = e.Message;
VLog.W(e);
MainThreadCallback(() => HandleNlpResponse(null, StatusDescription));
}
}
///
/// Write request data to the Wit.ai post's body input stream
///
/// Note: If the stream is not open (IsActive) this will throw an IOException.
/// Data will be written synchronously. This should not be called from the main thread.
///
///
///
///
public void Write(byte[] data, int offset, int length)
{
// Ignore without write stream
if (!IsInputStreamReady || data == null || length == 0)
{
return;
}
try
{
_writeStream.Write(data, offset, length);
_bytesWritten += length;
if (audioDurationTracker != null)
{
audioDurationTracker.AddBytes(length);
}
}
catch (ObjectDisposedException e)
{
// Handling edge case where stream is closed remotely
// This problem occurs when the Web server resets or closes the connection after
// the client application sends the HTTP header.
// https://support.microsoft.com/en-us/topic/fix-you-receive-a-system-objectdisposedexception-exception-when-you-try-to-access-a-stream-object-that-is-returned-by-the-endgetrequeststream-method-in-the-net-framework-2-0-bccefe57-0a61-517a-5d5f-2dce0cc63265
VLog.W($"Stream already disposed. It is likely the server reset the connection before streaming started.\n{e}");
// This prevents a very long holdup on _writeStream.Close
_writeStream = null;
}
catch (IOException e)
{
VLog.W(e.Message);
}
catch (Exception e)
{
VLog.E(e);
}
// Perform a cancellation if still waiting for a post
if (WaitingForPost())
{
MainThreadCallback(() => Cancel("Stream was closed with no data written."));
}
}
// Handles response from server
private void HandleResponse(IAsyncResult asyncResult)
{
// Begin response
HasResponseStarted = true;
string stringResponse = "";
try
{
// Get response
CheckStatus();
using (var response = _request.EndGetResponse(asyncResult))
{
// Got response
CheckStatus();
HttpWebResponse httpResponse = response as HttpWebResponse;
// Apply status & description
StatusCode = (int) httpResponse.StatusCode;
StatusDescription = httpResponse.StatusDescription;
// Get stream
using (var responseStream = httpResponse.GetResponseStream())
{
using (var responseReader = new StreamReader(responseStream))
{
string chunk;
while ((chunk = ReadToDelimiter(responseReader, WitConstants.ENDPOINT_JSON_DELIMITER)) != null)
{
stringResponse = chunk;
ProcessStringResponse(stringResponse);
}
}
}
}
}
catch (JSONParseException e)
{
_stackTrace = e.StackTrace;
StatusCode = WitConstants.ERROR_CODE_INVALID_DATA_FROM_SERVER;
StatusDescription = "Server returned invalid data.";
VLog.W(e);
}
catch (WebException e)
{
if (e.Status != WebExceptionStatus.RequestCanceled)
{
// Apply status & error
_stackTrace = e.StackTrace;
StatusCode = (int) e.Status;
StatusDescription = e.Message;
VLog.W(e);
// Attempt additional parse
if (e.Response is HttpWebResponse errorResponse)
{
StatusCode = (int) errorResponse.StatusCode;
try
{
using (var errorStream = errorResponse.GetResponseStream())
{
if (errorStream != null)
{
using (StreamReader errorReader = new StreamReader(errorStream))
{
stringResponse = errorReader.ReadToEnd();
if (!string.IsNullOrEmpty(stringResponse))
{
ProcessStringResponses(stringResponse);
}
}
}
}
}
catch (JSONParseException)
{
// Response wasn't encoded error, ignore it.
}
catch (Exception errorResponseError)
{
// We've already caught that there is an error, we'll ignore any errors
// reading error response data and use the status/original error for validation
VLog.W(errorResponseError);
_stackTrace = e.StackTrace;
}
}
}
}
catch (Exception e)
{
_stackTrace = e.StackTrace;
StatusCode = WitConstants.ERROR_CODE_GENERAL;
StatusDescription = e.Message;
VLog.W(e);
}
// Close request stream if possible
CloseRequestStream();
// Confirm valid response
if (null != _lastResponseData)
{
var error = _lastResponseData["error"];
if (!string.IsNullOrEmpty(error))
{
// Get code if possible
var code = _lastResponseData["code"];
if (code != null)
{
StatusCode = code.AsInt;
}
// Use general error if code is not provided
if (StatusCode == (int)HttpStatusCode.OK)
{
StatusCode = WitConstants.ERROR_CODE_GENERAL;
}
// Set error & description
if (string.IsNullOrEmpty(StatusDescription))
{
StatusDescription = $"Error: {code}\n{error}";
}
}
}
// Invalid response
else if (StatusCode == (int)HttpStatusCode.OK)
{
StatusCode = WitConstants.ERROR_CODE_NO_DATA_FROM_SERVER;
StatusDescription = $"Server did not return a valid json response.";
#if UNITY_EDITOR
StatusDescription += $"\nActual Response\n{stringResponse}";
#endif
}
// Done
HasResponseStarted = false;
MainThreadCallback(() =>
{
// Send partial data if not previously sent
if (!_lastResponseData.HasResponse())
{
ResponseData = _lastResponseData;
}
// Apply error if needed
if (null != _lastResponseData)
{
var error = _lastResponseData["error"];
if (!string.IsNullOrEmpty(error))
{
StatusDescription += $"\n{error}";
}
}
// Call completion delegate
HandleNlpResponse(_lastResponseData, StatusCode == (int)HttpStatusCode.OK ? string.Empty : $"{StatusDescription}\n\nStackTrace:\n{_stackTrace}\n\n");
});
}
// Check status
private void CheckStatus()
{
if (StatusCode == 0) return;
switch (StatusCode)
{
case WitConstants.ERROR_CODE_ABORTED:
throw new WebException("Request was aborted", WebExceptionStatus.RequestCanceled);
default:
throw new WebException("Status changed before response was received.", (WebExceptionStatus) StatusCode);
}
}
// Read stream until delimiter is hit
private string ReadToDelimiter(StreamReader reader, string delimiter)
{
// Allocate all vars
StringBuilder results = new StringBuilder();
int delLength = delimiter.Length;
int i;
bool found;
char nextChar;
// Iterate each byte in the stream
while (reader != null && !reader.EndOfStream)
{
// Continue until found
if (reader.Peek() == 0)
{
continue;
}
// Append next character
nextChar = (char)reader.Read();
results.Append(nextChar);
// Continue until long as delimiter
if (results.Length < delLength)
{
continue;
}
// Check if string builder ends with delimiter
found = true;
for (i=0;i onRawResponse?.Invoke(stringResponse));
}
// Decode full response
WitResponseNode responseNode = WitResponseNode.Parse(stringResponse);
bool hasResponse = responseNode.HasResponse();
bool isFinal = responseNode.GetIsFinal();
string transcription = responseNode.GetTranscription();
_lastResponseData = responseNode;
// Apply on main thread
MainThreadCallback(() =>
{
// Set transcription
if (!string.IsNullOrEmpty(transcription) && (!hasResponse || isFinal))
{
ApplyTranscription(transcription, isFinal);
}
// Set response
if (hasResponse)
{
ResponseData = responseNode;
}
});
}
// On text change callback
protected override void OnTranscriptionChanged()
{
if (!IsFinalTranscription)
{
onPartialTranscription?.Invoke(Transcription);
}
else
{
onFullTranscription?.Invoke(Transcription);
}
base.OnTranscriptionChanged();
}
// On response data change callback
protected override void OnResponseDataChanged()
{
onPartialResponse?.Invoke(this);
base.OnResponseDataChanged();
}
// Check if data has been written to post stream while still receiving data
private bool WaitingForPost()
{
return IsPost && _bytesWritten == 0 && StatusCode == 0;
}
// Close active stream & then abort if possible
private void CloseRequestStream()
{
// Cancel due to no audio if not an error
if (WaitingForPost())
{
Cancel("Request was closed with no audio captured.");
}
// Close
else
{
CloseActiveStream();
}
}
// Close stream
private void CloseActiveStream()
{
IsInputStreamReady = false;
lock (_streamLock)
{
if (null != _writeStream)
{
try
{
_writeStream.Close();
}
catch (Exception e)
{
VLog.W($"Write Stream - Close Failed\n{e}");
}
_writeStream = null;
}
}
}
// Perform a cancellation/abort
protected override void HandleCancel()
{
// Close stream
CloseActiveStream();
// Apply abort code
if (StatusCode == 0)
{
StatusCode = WitConstants.ERROR_CODE_ABORTED;
StatusDescription = Results.Message;
}
// Abort request
if (null != _request)
{
_request.Abort();
_request = null;
}
}
// Add response callback & log for abort
protected override void OnComplete()
{
base.OnComplete();
// Close write stream if still existing
if (null != _writeStream)
{
CloseActiveStream();
}
// Abort request if still existing
if (null != _request)
{
_request.Abort();
_request = null;
}
// Finalize response
onResponse?.Invoke(this);
onResponse = null;
}
#endregion HTTP REQUEST
#region CALLBACKS
// Check performing
private CoroutineUtility.CoroutinePerformer _performer = null;
// All actions
private ConcurrentQueue _mainThreadCallbacks = new ConcurrentQueue();
// Called from background thread
private void MainThreadCallback(Action action)
{
if (action == null)
{
return;
}
_mainThreadCallbacks.Enqueue(action);
}
// While active, perform any sent callbacks
private void WatchMainThreadCallbacks()
{
// Ignore if already performing
if (_performer != null)
{
return;
}
// Check callbacks every frame (editor or runtime)
_performer = CoroutineUtility.StartCoroutine(PerformMainThreadCallbacks());
}
// Every frame check for callbacks & perform any found
private System.Collections.IEnumerator PerformMainThreadCallbacks()
{
// While checking, continue
while (HasMainThreadCallbacks())
{
// Wait for frame
if (Application.isPlaying && !Application.isBatchMode)
{
yield return new WaitForEndOfFrame();
}
// Wait for a tick
else
{
yield return null;
}
// Perform if possible
while (_mainThreadCallbacks.Count > 0 && _mainThreadCallbacks.TryDequeue(out var result))
{
result();
}
}
_performer = null;
}
// If active or performing callbacks
private bool HasMainThreadCallbacks()
{
return IsActive || _mainThreadCallbacks.Count > 0;
}
#endregion
}
}