You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
447 lines
12 KiB
C++
447 lines
12 KiB
C++
/* PipeWire */
|
|
/* SPDX-FileCopyrightText: Copyright © 2019 Wim Taymans */
|
|
/* SPDX-License-Identifier: MIT */
|
|
|
|
#include <algorithm>
|
|
#include <cerrno>
|
|
#include <csignal>
|
|
#include <cstdint>
|
|
#include <cstdio>
|
|
#include <cstring>
|
|
#include <iostream>
|
|
#include <span>
|
|
#include <string>
|
|
|
|
#include <spa/pod/builder.h>
|
|
#include <spa/param/latency-utils.h>
|
|
|
|
#include <pipewire/pipewire.h>
|
|
#include <pipewire/filter.h>
|
|
|
|
#include "decoder.hpp"
|
|
#include "file_wrapper.hpp"
|
|
#include "options.hpp"
|
|
|
|
using namespace std::string_literals;
|
|
|
|
static constexpr unsigned INPUTS = 2;
|
|
static constexpr const char* INPUT_NAMES[INPUTS] = { "input_FL", "input_FR" };
|
|
|
|
namespace
|
|
{
|
|
struct Data
|
|
{
|
|
pw_main_loop* loop;
|
|
pw_filter* filter;
|
|
std::array<void*, INPUTS> in;
|
|
std::array<void*, Channel::N_CHANNELS> out;
|
|
std::array<void*, Channel::N_CHANNELS> out_by_channel;
|
|
std::uint32_t n_outputs = 0, old_latency = -1;
|
|
File in_dump, out_dump;
|
|
|
|
std::vector<Option> opts;
|
|
Decoder dec;
|
|
bool force_quantum = true;
|
|
bool reset_pending = true;
|
|
float preamp = 1;
|
|
std::vector<float> in_buf;
|
|
std::uint32_t in_buf_pos = 0;
|
|
|
|
std::span<const float> out_buf;
|
|
|
|
std::vector<char> cmd_buf;
|
|
|
|
void TryFlush(std::uint32_t size);
|
|
void Process(struct spa_io_position* position);
|
|
|
|
void CreateOutputs();
|
|
void ParseArgsFun(std::string_view k, std::string_view v);
|
|
void Input(int fd);
|
|
void ProcessLine(std::string_view sv) noexcept;
|
|
};
|
|
}
|
|
|
|
template <typename T, std::size_t InN, typename U>
|
|
static void Mux(File& dump, std::span<T*, InN> ins, T* out, std::size_t n,
|
|
U mul)
|
|
{
|
|
auto outp = out;
|
|
if (std::all_of(ins.begin(), ins.end(), [](T* t) { return t; }))
|
|
// fast path, all inputs exist
|
|
for (std::size_t i = 0; i < n; i += ins.size())
|
|
for (std::size_t j = 0; j < ins.size(); ++j)
|
|
*outp++ = *ins[j]++ * mul;
|
|
else
|
|
for (std::size_t i = 0; i < n; i += ins.size())
|
|
for (std::size_t j = 0; j < ins.size(); ++j)
|
|
*outp++ = ins[j] ? *ins[j]++ * mul : 0;
|
|
|
|
if (dump) fwrite(out, sizeof(T), n, dump);
|
|
}
|
|
|
|
template <typename T, std::size_t OutN>
|
|
static void Demux(
|
|
File& dump, const T* inp, std::span<T*, OutN> outs, std::size_t n)
|
|
{
|
|
if (dump) fwrite(inp, sizeof(float), n, dump);
|
|
|
|
if (std::all_of(outs.begin(), outs.end(), [](T* t) { return t; }))
|
|
// fast path, all outputs exist
|
|
for (std::size_t i = 0; i < n; i += outs.size())
|
|
for (std::size_t j = 0; j < outs.size(); ++j)
|
|
*outs[j]++ = *inp++;
|
|
else
|
|
for (std::size_t i = 0; i < n; i += outs.size())
|
|
for (std::size_t j = 0; j < outs.size(); ++j)
|
|
{
|
|
if (outs[j]) *outs[j]++ = *inp;
|
|
++inp;
|
|
}
|
|
}
|
|
|
|
template <std::size_t N>
|
|
static std::pair<bool, std::array<float*, N>> GetBufs(
|
|
std::array<void*, N> ports, std::uint32_t n, std::uint32_t n_samples)
|
|
{
|
|
std::array<float*, N> res;
|
|
bool has = false;
|
|
for (std::size_t i = 0; i < n; ++i)
|
|
{
|
|
res[i] = static_cast<float*>(pw_filter_get_dsp_buffer(ports[i], n_samples));
|
|
if (res[i]) has = true;
|
|
}
|
|
return {has, res};
|
|
}
|
|
|
|
void Data::Process(struct spa_io_position* position)
|
|
{
|
|
std::uint32_t n_samples = position->clock.duration;
|
|
auto rate = float(position->clock.rate.num) / float(position->clock.rate.denom);
|
|
std::uint32_t block_size = dec.GetBlockSize();
|
|
std::uint32_t buf_size = INPUTS * (block_size ? block_size : n_samples);
|
|
pw_log_trace("do process %" PRIu32, n_samples);
|
|
|
|
auto latency = dec.GetDelay();
|
|
if (n_samples < block_size) latency += block_size - n_samples;
|
|
if (latency != old_latency)
|
|
{
|
|
uint8_t buffer[1024];
|
|
struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer));
|
|
auto li = SPA_PROCESS_LATENCY_INFO_INIT(.rate = latency);
|
|
const struct spa_pod* params[1] = {
|
|
spa_process_latency_build(&b, SPA_PARAM_ProcessLatency, &li),
|
|
};
|
|
pw_filter_update_params(filter, nullptr, params, 1);
|
|
old_latency = latency;
|
|
}
|
|
|
|
auto [has_in, ins] = GetBufs(in, INPUTS, n_samples);
|
|
auto [has_out, outs] = GetBufs(out, n_outputs, n_samples);
|
|
|
|
if (!has_in && !has_out)
|
|
{
|
|
pw_log_trace("Nothing to do");
|
|
reset_pending = true;
|
|
return;
|
|
}
|
|
if (!has_in && out_buf.empty())
|
|
{
|
|
pw_log_trace("Generating silence");
|
|
for (unsigned i = 0; i < n_outputs; ++i)
|
|
if (outs[i]) std::memset(outs[i], 0, sizeof(float) * n_samples);
|
|
return;
|
|
}
|
|
|
|
if (reset_pending)
|
|
{
|
|
dec.Reset();
|
|
in_buf_pos = 0;
|
|
out_buf = {};
|
|
reset_pending = false;
|
|
}
|
|
|
|
in_buf.resize(buf_size);
|
|
if (in_buf_pos > buf_size) in_buf_pos = 0;
|
|
auto to_read = INPUTS * n_samples, to_write = n_outputs * n_samples;
|
|
|
|
auto write_out = [&]()
|
|
{
|
|
auto now_write = std::min<std::uint32_t>(to_write, out_buf.size());
|
|
if (!now_write) return;
|
|
Demux(out_dump, out_buf.data(), std::span{outs}.subspan(0, n_outputs),
|
|
now_write);
|
|
out_buf = out_buf.subspan(now_write);
|
|
to_write -= now_write;
|
|
};
|
|
|
|
write_out();
|
|
while (to_read)
|
|
{
|
|
auto now_read = std::min<std::uint32_t>(to_read, buf_size - in_buf_pos);
|
|
Mux(in_dump, std::span{ins}, in_buf.data() + in_buf_pos, now_read, preamp);
|
|
to_read -= now_read, in_buf_pos += now_read;
|
|
|
|
if (in_buf_pos == buf_size)
|
|
{
|
|
if (!out_buf.empty()) pw_log_warn("Discarding non-empty buffer!");
|
|
pw_log_trace("Calling decode");
|
|
out_buf = dec.Decode(rate, in_buf);
|
|
in_buf_pos = 0;
|
|
}
|
|
|
|
write_out();
|
|
}
|
|
|
|
if (to_write)
|
|
{
|
|
to_write /= n_outputs;
|
|
pw_log_debug("Memsetting remaining %" PRIu32 " samples", to_write);
|
|
for (unsigned i = 0; i < n_outputs; ++i)
|
|
if (outs[i]) std::memset(outs[i], 0, sizeof(float) * to_write);
|
|
}
|
|
}
|
|
|
|
void Data::Input(int fd)
|
|
{
|
|
constexpr std::size_t READ_SIZE = 4096;
|
|
auto off = cmd_buf.size();
|
|
cmd_buf.resize(off + READ_SIZE);
|
|
// noexcept from here
|
|
auto rd = read(fd, cmd_buf.data() + off, READ_SIZE);
|
|
if (rd < 0) { cmd_buf.resize(off); return; }
|
|
cmd_buf.resize(off + rd);
|
|
|
|
auto st = cmd_buf.begin();
|
|
while (st < cmd_buf.end())
|
|
if (auto end = std::find(st, cmd_buf.end(), '\n'); end != cmd_buf.end())
|
|
{
|
|
ProcessLine({st, end});
|
|
st = end+1;
|
|
}
|
|
else if (rd == 0)
|
|
{
|
|
ProcessLine({st, cmd_buf.end()});
|
|
st = cmd_buf.end();
|
|
}
|
|
cmd_buf.erase(cmd_buf.begin(), st);
|
|
}
|
|
|
|
void Data::ParseArgsFun(std::string_view k, std::string_view v)
|
|
{
|
|
if (HandleOption(opts, k, v)) return;
|
|
auto dopt = dec.GetOptions();
|
|
if (HandleOption(dopt, k, v)) return;
|
|
throw ParseError("Unknown option "s + std::string{k});
|
|
}
|
|
|
|
void Data::ProcessLine(std::string_view sv) noexcept
|
|
{
|
|
try
|
|
{
|
|
std::vector<std::string_view> parts;
|
|
for (std::size_t st = 0; st < sv.size(); )
|
|
if (sv[st] == ' ' || sv[st] == '\t') ++st;
|
|
else if (auto p = sv.find_first_of(" \t", st); p != std::string_view::npos)
|
|
{
|
|
parts.push_back(sv.substr(st, p-st));
|
|
st = p+1;
|
|
}
|
|
else
|
|
{
|
|
parts.push_back(sv.substr(st));
|
|
break;
|
|
}
|
|
|
|
ParseOptions(parts, std::bind_front(&Data::ParseArgsFun, this));
|
|
|
|
out_buf = {}; // FIXME: better way
|
|
// update quantum
|
|
{
|
|
auto quantum = std::to_string(force_quantum ? dec.GetBlockSize() : 0);
|
|
spa_dict_item upd[] = {
|
|
SPA_DICT_ITEM_INIT(PW_KEY_NODE_FORCE_QUANTUM, quantum.c_str()),
|
|
};
|
|
auto ud = SPA_DICT_INIT(upd, std::size(upd));
|
|
pw_filter_update_properties(filter, nullptr, &ud);
|
|
}
|
|
|
|
CreateOutputs();
|
|
}
|
|
catch (Exit) {} // ignore Exit from help
|
|
catch (const std::exception& e)
|
|
{ std::cerr << "Failed to execute command: " << e.what() << std::endl; }
|
|
}
|
|
|
|
void Data::CreateOutputs()
|
|
{
|
|
auto outputs = dec.GetChannels();
|
|
n_outputs = outputs.size();
|
|
|
|
std::array<bool, Channel::N_CHANNELS> used{};
|
|
for (std::uint32_t i = 0; i < n_outputs; ++i)
|
|
{
|
|
auto ch = outputs[i];
|
|
used[ch] = true;
|
|
if (!out_by_channel[ch])
|
|
{
|
|
auto pw_name = Channel::NAMES_PIPEWIRE[ch];
|
|
out_by_channel[ch] = pw_filter_add_port(
|
|
filter,
|
|
PW_DIRECTION_OUTPUT,
|
|
PW_FILTER_PORT_FLAG_MAP_BUFFERS,
|
|
0, // sizeof(Port)
|
|
pw_properties_new(
|
|
PW_KEY_FORMAT_DSP, "32 bit float mono audio",
|
|
PW_KEY_AUDIO_CHANNEL, pw_name,
|
|
PW_KEY_PORT_NAME, ("output_"s + pw_name).c_str(),
|
|
nullptr),
|
|
nullptr, 0);
|
|
}
|
|
out[i] = out_by_channel[ch];
|
|
}
|
|
|
|
for (unsigned ch = 0; ch < Channel::N_CHANNELS; ++ch)
|
|
if (!used[ch] && out_by_channel[ch])
|
|
{
|
|
pw_filter_remove_port(out_by_channel[ch]);
|
|
out_by_channel[ch] = nullptr;
|
|
}
|
|
}
|
|
|
|
static const struct pw_filter_events filter_events =
|
|
{
|
|
.version = PW_VERSION_FILTER_EVENTS,
|
|
.process = [](void* userdata, struct spa_io_position* position)
|
|
{ reinterpret_cast<Data*>(userdata)->Process(position); },
|
|
};
|
|
|
|
static void DoQuit(void *userdata, int signal_number)
|
|
{
|
|
(void) signal_number;
|
|
pw_main_loop_quit(reinterpret_cast<Data*>(userdata)->loop);
|
|
}
|
|
|
|
static File OpenFile(std::string_view sv)
|
|
{
|
|
if (sv.empty()) return {};
|
|
else return {std::string{sv}.c_str(), "wb"};
|
|
}
|
|
|
|
int Main(int argc, char *argv[])
|
|
{
|
|
Data data = {};
|
|
data.opts = {
|
|
{
|
|
"help", "", "Show help",
|
|
[&](std::string_view)
|
|
{
|
|
std::cerr << "Usage " << argv[0] << " [--options]\n";
|
|
PrintHelp(data.opts);
|
|
data.dec.PrintHelp();
|
|
throw Exit{0};
|
|
},
|
|
},
|
|
{
|
|
"dump_input", "FILENAME", "Dump input to file",
|
|
[&data](std::string_view sv) { data.in_dump = OpenFile(sv); },
|
|
},
|
|
{
|
|
"dump_output", "FILENAME", "Dump output to file",
|
|
[&data](std::string_view sv) { data.out_dump = OpenFile(sv); },
|
|
},
|
|
{
|
|
"force_quantum", "BOOL",
|
|
"Force Pipewire to use the decoder's block size as quantum",
|
|
[&data](std::string_view sv) { data.force_quantum = FromString<bool>(sv); },
|
|
},
|
|
{
|
|
"preamp", "FLOAT",
|
|
"Multiply all input values by this (default: 1)",
|
|
[&data](std::string_view sv) { data.preamp = FromString<float>(sv); },
|
|
},
|
|
};
|
|
|
|
ParseArgs(argc, argv, std::bind_front(&Data::ParseArgsFun, &data));
|
|
|
|
data.opts.emplace_back(
|
|
"reset", "", "Reset the current decoder",
|
|
[&](auto) { data.reset_pending = true; });
|
|
data.opts.emplace_back(
|
|
"quit", "", "Quits the application",
|
|
[&](auto) { pw_main_loop_quit(data.loop); });
|
|
|
|
pw_init(&argc, &argv);
|
|
|
|
/* make a main loop. If you already have another main loop, you can add
|
|
* the fd of this pipewire mainloop to it. */
|
|
data.loop = pw_main_loop_new(nullptr);
|
|
auto loop = pw_main_loop_get_loop(data.loop);
|
|
|
|
pw_loop_add_signal(loop, SIGINT, DoQuit, &data);
|
|
pw_loop_add_signal(loop, SIGTERM, DoQuit, &data);
|
|
pw_loop_add_io(
|
|
loop, STDIN_FILENO, SPA_IO_IN, false,
|
|
[](void* userdata, int fd, uint32_t mask)
|
|
{
|
|
(void) mask;
|
|
reinterpret_cast<Data*>(userdata)->Input(fd);
|
|
}, &data);
|
|
|
|
/* Create a simple filter, the simple filter manages the core and remote
|
|
* objects for you if you don't need to deal with them.
|
|
*
|
|
* Pass your events and a user_data pointer as the last arguments. This
|
|
* will inform you about the filter state. The most important event
|
|
* you need to listen to is the process event where you need to process
|
|
* the data.
|
|
*/
|
|
data.filter = pw_filter_new_simple(
|
|
loop,
|
|
"surroundize-pipewire",
|
|
pw_properties_new(
|
|
PW_KEY_MEDIA_TYPE, "Audio",
|
|
PW_KEY_MEDIA_CATEGORY, "Filter",
|
|
PW_KEY_MEDIA_ROLE, "DSP",
|
|
PW_KEY_MEDIA_CLASS, "Stream/Output/Audio",
|
|
PW_KEY_APP_NAME, "surroundize-pipewire",
|
|
PW_KEY_NODE_FORCE_QUANTUM,
|
|
std::to_string(data.force_quantum ? data.dec.GetBlockSize() : 0).c_str(),
|
|
nullptr),
|
|
&filter_events,
|
|
&data);
|
|
|
|
for (unsigned i = 0; i < INPUTS; ++i)
|
|
data.in[i] = pw_filter_add_port(
|
|
data.filter,
|
|
PW_DIRECTION_INPUT,
|
|
PW_FILTER_PORT_FLAG_MAP_BUFFERS,
|
|
0, // sizeof(Port)
|
|
pw_properties_new(
|
|
PW_KEY_FORMAT_DSP, "32 bit float mono audio",
|
|
PW_KEY_AUDIO_CHANNEL, INPUT_NAMES[i] + 6,
|
|
PW_KEY_PORT_NAME, INPUT_NAMES[i],
|
|
nullptr),
|
|
nullptr, 0);
|
|
|
|
data.CreateOutputs();
|
|
|
|
/* Now connect this filter. We ask that our process function is
|
|
* called in a realtime thread. */
|
|
if (pw_filter_connect(data.filter,
|
|
PW_FILTER_FLAG_NONE, //PW_FILTER_FLAG_RT_PROCESS,
|
|
nullptr, 0) < 0)
|
|
{
|
|
std::cerr << "can't connect\n";
|
|
return -1;
|
|
}
|
|
|
|
/* and wait while we let things run */
|
|
pw_main_loop_run(data.loop);
|
|
|
|
pw_filter_destroy(data.filter);
|
|
pw_main_loop_destroy(data.loop);
|
|
pw_deinit();
|
|
|
|
return 0;
|
|
}
|