defmodule IBClient do
require Logger
use GenServer
@ip {127, 0, 0, 1}
@port 4001
@type connection_state() :: :disconnected | :connecting | :connected
@initial_state %{
socket: nil,
conn_state: :disconnected,
server_api: nil,
server_connect_time: nil,
next_id: 0,
inbound_handler: nil
}
def send_message(pid, message) do
GenServer.cast(pid, {:message, message})
end
def start do
Logger.debug("Starting IBClient...")
GenServer.start(__MODULE__, @initial_state)
end
def start_link(opt) do
Logger.info("Starting IBClient as a supervised process...")
GenServer.start_link(__MODULE__, @initial_state, name: __MODULE__)
end
def init(state) do
send(self(), :tick)
{:ok, state}
end
def handle_info(:tick, state) do
state =
case state[:conn_state] do
:disconnected ->
connect_to_server(state)
:send_api_version ->
send_version(state)
:waiting_on_version_resp ->
wait_for_api_version_response(state)
:sending_start_api ->
start_api(state)
:waiting_on_start_api_resp ->
wait_for_start_api_resp(state)
:transition_to_non_blocking ->
transition_to_nonblocking(state)
:connected ->
state
end
if state[:conn_state] != :connected do
Logger.info("Sending tick...")
Process.send_after(self(), :tick, 1000)
end
{:noreply, state}
end
def handle_info(:start_api, state) do
Logger.debug("Starting API...")
{:noreply, state}
end
def handle_info(:req_account_summary, state) do
msg = <<0, 0, 0, 8, "62", 0, 2, 0, 2, 0, 0, 0, 0>>
:ok = :gen_tcp.send(state[:socket], msg)
{:noreply, state}
end
def handle_info(:req_ids, state) do
msg =
Ibex.Messages.Outbound.ReqIds.t()
|> IBMessage.to_ib_api()
:ok = :gen_tcp.send(state[:socket], msg)
{:noreply, state}
end
def handle_info({:tcp, _, data}, state) do
<<l::size(32), rest::binary>> = data
Logger.debug("Data is: #{inspect(data)}. Length is: #{inspect(l)}")
fields = String.split(rest, <<0>>)
send(Ibex.Inbound.Handler, {:dispatch, fields})
{:noreply, state}
end
def handle_info({:tcp_closed, _}, state), do: {:stop, :normal, state}
def handle_info({:tcp_error, _}, state), do: {:stop, :normal, state}
def handle_info({:req_stock_contract_details, ticker}, state) do
{req_id, state} = next_id(state)
msg =
Ibex.Messages.Outbound.ReqContractDetails.t(to_string(req_id), ticker, "STK")
|> IBMessage.to_ib_api()
:ok = :gen_tcp.send(state[:socket], msg)
{:noreply, state}
end
def handle_info({:symbol_search, symbol}, state) do
{req_id, state} = next_id(state)
msg =
Ibex.Messages.Outbound.ReqMatchingSymbols.t(to_string(req_id), symbol)
|> IBMessage.to_ib_api()
:ok = :gen_tcp.send(state[:socket], msg)
{:noreply, state}
end
@doc """
Catch-call for handle_info
"""
def handle_info({other, _}, state) do
Logger.debug("Received #{inspect(other)}")
{:noreply, state}
end
def handle_cast({:message, message}, %{socket: socket} = state) do
:ok = :gen_tcp.send(socket, message)
{:noreply, state}
end
@spec handle({:disconnect, any}, any) :: {:noreply, any}
def handle({:disconnect, _}, state) do
disconnect(state, :user_requested)
{:noreply, state}
end
def disconnect(state, reason) do
Logger.info("Disconnected: #{reason}")
{:stop, :normal, state}
end
defp connect_to_server(state) do
case :gen_tcp.connect(@ip, @port, [:binary, active: false]) do
{:ok, socket} ->
%{state | conn_state: :send_api_version, socket: socket}
{:error, reason} ->
Logger.error("Failed to connect to server: #{inspect(reason)}")
state
end
end
defp send_version(state) do
Logger.info("Sending API version to server...")
case :gen_tcp.send(state[:socket], <<"API", 0, 0, 0, 0, 9, "v100..177">>) do
:ok ->
%{state | conn_state: :waiting_on_version_resp}
{:error, reason} ->
Logger.error("Failed to send API version string to server: #{inspect(reason)}")
state
end
end
defp start_api(state) do
Logger.info("Sending START_API message...")
case :gen_tcp.send(state[:socket], <<0, 0, 0, 8, 55, 49, 0, 50, 0, 50, 0, 0>>) do
:ok ->
%{state | conn_state: :waiting_on_start_api_resp}
{:error, reason} ->
Logger.error("Failed to send StartApi message: #{inspect(reason)}")
state
end
end
defp wait_for_api_version_response(state) do
Logger.info("Waiting for API version response...")
{length, code, data} = wait_for_data(state)
state =
case length(data) do
3 ->
Logger.info(
"Extracting Server API version from fields. Version: #{Enum.at(data, 0)}. Time: #{Enum.at(data, 1)}"
)
state = %{state | server_api: Enum.at(data, 0)}
%{state | server_connect_time: Enum.at(data, 1)}
_ ->
Logger.info("Resposne from API Version Request was longer: #{inspect(data)}")
state
end
%{state | conn_state: :sending_start_api}
end
defp wait_for_start_api_resp(state) do
Logger.info("Waiting for START_API response...")
{:ok, data} = :gen_tcp.recv(state[:socket], 0)
<<l::size(32), rest::binary>> = data
<<_::binary-size(l), rest::binary>> = rest
<<l::size(32), msg2::binary-size(l), rest::binary>> = rest
stripped = String.split(msg2, <<0>>)
Logger.debug("Next valid order ID is #{Enum.at(stripped, 1)}")
state = %{state | next_id: String.to_integer(Enum.at(stripped, 1))}
%{state | conn_state: :transition_to_non_blocking}
end
defp wait_for_data(state) do
Logger.info("Waiting synchronously for data...")
{:ok, data} = :gen_tcp.recv(state[:socket], 0)
Ibex.Messages.Inbound.parse(data)
end
defp transition_to_nonblocking(state) do
Logger.info("Transitioning to non-blocking mode...")
:inet.setopts(state[:socket], active: true)
%{state | conn_state: :connected}
end
defp next_id(state) do
next_id = state[:next_id]
state = %{state | next_id: next_id + 1}
{next_id, state}
end
end