You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

447 lines
12 KiB
C++

/* PipeWire */
/* SPDX-FileCopyrightText: Copyright © 2019 Wim Taymans */
/* SPDX-License-Identifier: MIT */
#include <algorithm>
#include <cerrno>
#include <csignal>
#include <cstdint>
#include <cstdio>
#include <cstring>
#include <iostream>
#include <span>
#include <string>
#include <spa/pod/builder.h>
#include <spa/param/latency-utils.h>
#include <pipewire/pipewire.h>
#include <pipewire/filter.h>
#include "decoder.hpp"
#include "file_wrapper.hpp"
#include "options.hpp"
using namespace std::string_literals;
static constexpr unsigned INPUTS = 2;
static constexpr const char* INPUT_NAMES[INPUTS] = { "input_FL", "input_FR" };
namespace
{
struct Data
{
pw_main_loop* loop;
pw_filter* filter;
std::array<void*, INPUTS> in;
std::array<void*, Channel::N_CHANNELS> out;
std::array<void*, Channel::N_CHANNELS> out_by_channel;
std::uint32_t n_outputs = 0, old_latency = -1;
File in_dump, out_dump;
std::vector<Option> opts;
Decoder dec;
bool force_quantum = true;
bool reset_pending = true;
float preamp = 1;
std::vector<float> in_buf;
std::uint32_t in_buf_pos = 0;
std::span<const float> out_buf;
std::vector<char> cmd_buf;
void TryFlush(std::uint32_t size);
void Process(struct spa_io_position* position);
void CreateOutputs();
void ParseArgsFun(std::string_view k, std::string_view v);
void Input(int fd);
void ProcessLine(std::string_view sv) noexcept;
};
}
template <typename T, std::size_t InN, typename U>
static void Mux(File& dump, std::span<T*, InN> ins, T* out, std::size_t n,
U mul)
{
auto outp = out;
if (std::all_of(ins.begin(), ins.end(), [](T* t) { return t; }))
// fast path, all inputs exist
for (std::size_t i = 0; i < n; i += ins.size())
for (std::size_t j = 0; j < ins.size(); ++j)
*outp++ = *ins[j]++ * mul;
else
for (std::size_t i = 0; i < n; i += ins.size())
for (std::size_t j = 0; j < ins.size(); ++j)
*outp++ = ins[j] ? *ins[j]++ * mul : 0;
if (dump) fwrite(out, sizeof(T), n, dump);
}
template <typename T, std::size_t OutN>
static void Demux(
File& dump, const T* inp, std::span<T*, OutN> outs, std::size_t n)
{
if (dump) fwrite(inp, sizeof(float), n, dump);
if (std::all_of(outs.begin(), outs.end(), [](T* t) { return t; }))
// fast path, all outputs exist
for (std::size_t i = 0; i < n; i += outs.size())
for (std::size_t j = 0; j < outs.size(); ++j)
*outs[j]++ = *inp++;
else
for (std::size_t i = 0; i < n; i += outs.size())
for (std::size_t j = 0; j < outs.size(); ++j)
{
if (outs[j]) *outs[j]++ = *inp;
++inp;
}
}
template <std::size_t N>
static std::pair<bool, std::array<float*, N>> GetBufs(
std::array<void*, N> ports, std::uint32_t n, std::uint32_t n_samples)
{
std::array<float*, N> res;
bool has = false;
for (std::size_t i = 0; i < n; ++i)
{
res[i] = static_cast<float*>(pw_filter_get_dsp_buffer(ports[i], n_samples));
if (res[i]) has = true;
}
return {has, res};
}
void Data::Process(struct spa_io_position* position)
{
std::uint32_t n_samples = position->clock.duration;
auto rate = float(position->clock.rate.num) / float(position->clock.rate.denom);
std::uint32_t block_size = dec.GetBlockSize();
std::uint32_t buf_size = INPUTS * (block_size ? block_size : n_samples);
pw_log_trace("do process %" PRIu32, n_samples);
auto latency = dec.GetDelay();
if (n_samples < block_size) latency += block_size - n_samples;
if (latency != old_latency)
{
uint8_t buffer[1024];
struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer));
auto li = SPA_PROCESS_LATENCY_INFO_INIT(.rate = latency);
const struct spa_pod* params[1] = {
spa_process_latency_build(&b, SPA_PARAM_ProcessLatency, &li),
};
pw_filter_update_params(filter, nullptr, params, 1);
old_latency = latency;
}
auto [has_in, ins] = GetBufs(in, INPUTS, n_samples);
auto [has_out, outs] = GetBufs(out, n_outputs, n_samples);
if (!has_in && !has_out)
{
pw_log_trace("Nothing to do");
reset_pending = true;
return;
}
if (!has_in && out_buf.empty())
{
pw_log_trace("Generating silence");
for (unsigned i = 0; i < n_outputs; ++i)
if (outs[i]) std::memset(outs[i], 0, sizeof(float) * n_samples);
return;
}
if (reset_pending)
{
dec.Reset();
in_buf_pos = 0;
out_buf = {};
reset_pending = false;
}
in_buf.resize(buf_size);
if (in_buf_pos > buf_size) in_buf_pos = 0;
auto to_read = INPUTS * n_samples, to_write = n_outputs * n_samples;
auto write_out = [&]()
{
auto now_write = std::min<std::uint32_t>(to_write, out_buf.size());
if (!now_write) return;
Demux(out_dump, out_buf.data(), std::span{outs}.subspan(0, n_outputs),
now_write);
out_buf = out_buf.subspan(now_write);
to_write -= now_write;
};
write_out();
while (to_read)
{
auto now_read = std::min<std::uint32_t>(to_read, buf_size - in_buf_pos);
Mux(in_dump, std::span{ins}, in_buf.data() + in_buf_pos, now_read, preamp);
to_read -= now_read, in_buf_pos += now_read;
if (in_buf_pos == buf_size)
{
if (!out_buf.empty()) pw_log_warn("Discarding non-empty buffer!");
pw_log_trace("Calling decode");
out_buf = dec.Decode(rate, in_buf);
in_buf_pos = 0;
}
write_out();
}
if (to_write)
{
to_write /= n_outputs;
pw_log_debug("Memsetting remaining %" PRIu32 " samples", to_write);
for (unsigned i = 0; i < n_outputs; ++i)
if (outs[i]) std::memset(outs[i], 0, sizeof(float) * to_write);
}
}
void Data::Input(int fd)
{
constexpr std::size_t READ_SIZE = 4096;
auto off = cmd_buf.size();
cmd_buf.resize(off + READ_SIZE);
// noexcept from here
auto rd = read(fd, cmd_buf.data() + off, READ_SIZE);
if (rd < 0) { cmd_buf.resize(off); return; }
cmd_buf.resize(off + rd);
auto st = cmd_buf.begin();
while (st < cmd_buf.end())
if (auto end = std::find(st, cmd_buf.end(), '\n'); end != cmd_buf.end())
{
ProcessLine({st, end});
st = end+1;
}
else if (rd == 0)
{
ProcessLine({st, cmd_buf.end()});
st = cmd_buf.end();
}
cmd_buf.erase(cmd_buf.begin(), st);
}
void Data::ParseArgsFun(std::string_view k, std::string_view v)
{
if (HandleOption(opts, k, v)) return;
auto dopt = dec.GetOptions();
if (HandleOption(dopt, k, v)) return;
throw ParseError("Unknown option "s + std::string{k});
}
void Data::ProcessLine(std::string_view sv) noexcept
{
try
{
std::vector<std::string_view> parts;
for (std::size_t st = 0; st < sv.size(); )
if (sv[st] == ' ' || sv[st] == '\t') ++st;
else if (auto p = sv.find_first_of(" \t", st); p != std::string_view::npos)
{
parts.push_back(sv.substr(st, p-st));
st = p+1;
}
else
{
parts.push_back(sv.substr(st));
break;
}
ParseOptions(parts, std::bind_front(&Data::ParseArgsFun, this));
out_buf = {}; // FIXME: better way
// update quantum
{
auto quantum = std::to_string(force_quantum ? dec.GetBlockSize() : 0);
spa_dict_item upd[] = {
SPA_DICT_ITEM_INIT(PW_KEY_NODE_FORCE_QUANTUM, quantum.c_str()),
};
auto ud = SPA_DICT_INIT(upd, std::size(upd));
pw_filter_update_properties(filter, nullptr, &ud);
}
CreateOutputs();
}
catch (Exit) {} // ignore Exit from help
catch (const std::exception& e)
{ std::cerr << "Failed to execute command: " << e.what() << std::endl; }
}
void Data::CreateOutputs()
{
auto outputs = dec.GetChannels();
n_outputs = outputs.size();
std::array<bool, Channel::N_CHANNELS> used{};
for (std::uint32_t i = 0; i < n_outputs; ++i)
{
auto ch = outputs[i];
used[ch] = true;
if (!out_by_channel[ch])
{
auto pw_name = Channel::NAMES_PIPEWIRE[ch];
out_by_channel[ch] = pw_filter_add_port(
filter,
PW_DIRECTION_OUTPUT,
PW_FILTER_PORT_FLAG_MAP_BUFFERS,
0, // sizeof(Port)
pw_properties_new(
PW_KEY_FORMAT_DSP, "32 bit float mono audio",
PW_KEY_AUDIO_CHANNEL, pw_name,
PW_KEY_PORT_NAME, ("output_"s + pw_name).c_str(),
nullptr),
nullptr, 0);
}
out[i] = out_by_channel[ch];
}
for (unsigned ch = 0; ch < Channel::N_CHANNELS; ++ch)
if (!used[ch] && out_by_channel[ch])
{
pw_filter_remove_port(out_by_channel[ch]);
out_by_channel[ch] = nullptr;
}
}
static const struct pw_filter_events filter_events =
{
.version = PW_VERSION_FILTER_EVENTS,
.process = [](void* userdata, struct spa_io_position* position)
{ reinterpret_cast<Data*>(userdata)->Process(position); },
};
static void DoQuit(void *userdata, int signal_number)
{
(void) signal_number;
pw_main_loop_quit(reinterpret_cast<Data*>(userdata)->loop);
}
static File OpenFile(std::string_view sv)
{
if (sv.empty()) return {};
else return {std::string{sv}.c_str(), "wb"};
}
int Main(int argc, char *argv[])
{
Data data = {};
data.opts = {
{
"help", "", "Show help",
[&](std::string_view)
{
std::cerr << "Usage " << argv[0] << " [--options]\n";
PrintHelp(data.opts);
data.dec.PrintHelp();
throw Exit{0};
},
},
{
"dump_input", "FILENAME", "Dump input to file",
[&data](std::string_view sv) { data.in_dump = OpenFile(sv); },
},
{
"dump_output", "FILENAME", "Dump output to file",
[&data](std::string_view sv) { data.out_dump = OpenFile(sv); },
},
{
"force_quantum", "BOOL",
"Force Pipewire to use the decoder's block size as quantum",
[&data](std::string_view sv) { data.force_quantum = FromString<bool>(sv); },
},
{
"preamp", "FLOAT",
"Multiply all input values by this (default: 1)",
[&data](std::string_view sv) { data.preamp = FromString<float>(sv); },
},
};
ParseArgs(argc, argv, std::bind_front(&Data::ParseArgsFun, &data));
data.opts.emplace_back(
"reset", "", "Reset the current decoder",
[&](auto) { data.reset_pending = true; });
data.opts.emplace_back(
"quit", "", "Quits the application",
[&](auto) { pw_main_loop_quit(data.loop); });
pw_init(&argc, &argv);
/* make a main loop. If you already have another main loop, you can add
* the fd of this pipewire mainloop to it. */
data.loop = pw_main_loop_new(nullptr);
auto loop = pw_main_loop_get_loop(data.loop);
pw_loop_add_signal(loop, SIGINT, DoQuit, &data);
pw_loop_add_signal(loop, SIGTERM, DoQuit, &data);
pw_loop_add_io(
loop, STDIN_FILENO, SPA_IO_IN, false,
[](void* userdata, int fd, uint32_t mask)
{
(void) mask;
reinterpret_cast<Data*>(userdata)->Input(fd);
}, &data);
/* Create a simple filter, the simple filter manages the core and remote
* objects for you if you don't need to deal with them.
*
* Pass your events and a user_data pointer as the last arguments. This
* will inform you about the filter state. The most important event
* you need to listen to is the process event where you need to process
* the data.
*/
data.filter = pw_filter_new_simple(
loop,
"surroundize-pipewire",
pw_properties_new(
PW_KEY_MEDIA_TYPE, "Audio",
PW_KEY_MEDIA_CATEGORY, "Filter",
PW_KEY_MEDIA_ROLE, "DSP",
PW_KEY_MEDIA_CLASS, "Stream/Output/Audio",
PW_KEY_APP_NAME, "surroundize-pipewire",
PW_KEY_NODE_FORCE_QUANTUM,
std::to_string(data.force_quantum ? data.dec.GetBlockSize() : 0).c_str(),
nullptr),
&filter_events,
&data);
for (unsigned i = 0; i < INPUTS; ++i)
data.in[i] = pw_filter_add_port(
data.filter,
PW_DIRECTION_INPUT,
PW_FILTER_PORT_FLAG_MAP_BUFFERS,
0, // sizeof(Port)
pw_properties_new(
PW_KEY_FORMAT_DSP, "32 bit float mono audio",
PW_KEY_AUDIO_CHANNEL, INPUT_NAMES[i] + 6,
PW_KEY_PORT_NAME, INPUT_NAMES[i],
nullptr),
nullptr, 0);
data.CreateOutputs();
/* Now connect this filter. We ask that our process function is
* called in a realtime thread. */
if (pw_filter_connect(data.filter,
PW_FILTER_FLAG_NONE, //PW_FILTER_FLAG_RT_PROCESS,
nullptr, 0) < 0)
{
std::cerr << "can't connect\n";
return -1;
}
/* and wait while we let things run */
pw_main_loop_run(data.loop);
pw_filter_destroy(data.filter);
pw_main_loop_destroy(data.loop);
pw_deinit();
return 0;
}