pipewire.cpp (12684B)
1 /* PipeWire */ 2 /* SPDX-FileCopyrightText: Copyright © 2019 Wim Taymans */ 3 /* SPDX-License-Identifier: MIT */ 4 5 #include <algorithm> 6 #include <cerrno> 7 #include <csignal> 8 #include <cstdint> 9 #include <cstdio> 10 #include <cstring> 11 #include <iostream> 12 #include <span> 13 #include <string> 14 15 #include <spa/pod/builder.h> 16 #include <spa/param/latency-utils.h> 17 18 #include <pipewire/pipewire.h> 19 #include <pipewire/filter.h> 20 21 #include "decoder.hpp" 22 #include "file_wrapper.hpp" 23 #include "options.hpp" 24 25 using namespace std::string_literals; 26 27 static constexpr unsigned INPUTS = 2; 28 static constexpr const char* INPUT_NAMES[INPUTS] = { "input_FL", "input_FR" }; 29 30 namespace 31 { 32 struct Data 33 { 34 pw_main_loop* loop; 35 pw_filter* filter; 36 std::array<void*, INPUTS> in; 37 std::array<void*, Channel::N_CHANNELS> out; 38 std::array<void*, Channel::N_CHANNELS> out_by_channel; 39 std::uint32_t n_outputs = 0, old_latency = -1; 40 File in_dump, out_dump; 41 42 std::vector<Option> opts; 43 Decoder dec; 44 bool force_quantum = true; 45 bool reset_pending = true; 46 float preamp = 1; 47 float old_rate = -1; 48 std::vector<float> in_buf; 49 std::uint32_t in_buf_pos = 0; 50 51 std::span<const float> out_buf; 52 53 std::vector<char> cmd_buf; 54 55 void TryFlush(std::uint32_t size); 56 void Process(struct spa_io_position* position); 57 58 void CreateOutputs(); 59 void ParseArgsFun(std::string_view k, std::string_view v); 60 void Input(int fd); 61 void ProcessLine(std::string_view sv) noexcept; 62 }; 63 } 64 65 template <typename T, std::size_t InN, typename U> 66 static void Mux(File& dump, std::span<T*, InN> ins, T* out, std::size_t n, 67 U mul) 68 { 69 auto outp = out; 70 if (std::all_of(ins.begin(), ins.end(), [](T* t) { return t; })) 71 // fast path, all inputs exist 72 for (std::size_t i = 0; i < n; i += ins.size()) 73 for (std::size_t j = 0; j < ins.size(); ++j) 74 *outp++ = *ins[j]++ * mul; 75 else 76 for (std::size_t i = 0; i < n; i += ins.size()) 77 for (std::size_t j = 0; j < ins.size(); ++j) 78 *outp++ = ins[j] ? *ins[j]++ * mul : 0; 79 80 if (dump) fwrite(out, sizeof(T), n, dump); 81 } 82 83 template <typename T, std::size_t OutN> 84 static void Demux( 85 File& dump, const T* inp, std::span<T*, OutN> outs, std::size_t n) 86 { 87 if (dump) fwrite(inp, sizeof(float), n, dump); 88 89 if (std::all_of(outs.begin(), outs.end(), [](T* t) { return t; })) 90 // fast path, all outputs exist 91 for (std::size_t i = 0; i < n; i += outs.size()) 92 for (std::size_t j = 0; j < outs.size(); ++j) 93 *outs[j]++ = *inp++; 94 else 95 for (std::size_t i = 0; i < n; i += outs.size()) 96 for (std::size_t j = 0; j < outs.size(); ++j) 97 { 98 if (outs[j]) *outs[j]++ = *inp; 99 ++inp; 100 } 101 } 102 103 template <std::size_t N> 104 static std::pair<bool, std::array<float*, N>> GetBufs( 105 std::array<void*, N> ports, std::uint32_t n, std::uint32_t n_samples) 106 { 107 std::array<float*, N> res; 108 bool has = false; 109 for (std::size_t i = 0; i < n; ++i) 110 { 111 res[i] = static_cast<float*>(pw_filter_get_dsp_buffer(ports[i], n_samples)); 112 if (res[i]) has = true; 113 } 114 return {has, res}; 115 } 116 117 void Data::Process(struct spa_io_position* position) 118 { 119 std::uint32_t n_samples = position->clock.duration; 120 auto rate = float(position->clock.rate.num) / float(position->clock.rate.denom); 121 std::uint32_t block_size = dec.GetBlockSize(); 122 std::uint32_t buf_size = INPUTS * (block_size ? block_size : n_samples); 123 pw_log_trace("do process %" PRIu32, n_samples); 124 125 auto latency = dec.GetDelay(); 126 if (n_samples < block_size) latency += block_size - n_samples; 127 if (latency != old_latency) 128 { 129 uint8_t buffer[1024]; 130 struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer)); 131 auto li = SPA_PROCESS_LATENCY_INFO_INIT(.rate = int(latency)); 132 const struct spa_pod* params[1] = { 133 spa_process_latency_build(&b, SPA_PARAM_ProcessLatency, &li), 134 }; 135 pw_filter_update_params(filter, nullptr, params, 1); 136 old_latency = latency; 137 } 138 139 auto [has_in, ins] = GetBufs(in, INPUTS, n_samples); 140 auto [has_out, outs] = GetBufs(out, n_outputs, n_samples); 141 142 if (!has_in && !has_out) 143 { 144 pw_log_trace("Nothing to do"); 145 reset_pending = true; 146 return; 147 } 148 if (!has_in && out_buf.empty()) 149 { 150 pw_log_trace("Generating silence"); 151 for (unsigned i = 0; i < n_outputs; ++i) 152 if (outs[i]) std::memset(outs[i], 0, sizeof(float) * n_samples); 153 return; 154 } 155 156 if (reset_pending || rate != old_rate) 157 { 158 dec.Init(rate); 159 old_rate = rate; 160 in_buf_pos = 0; 161 out_buf = {}; 162 reset_pending = false; 163 } 164 165 in_buf.resize(buf_size); 166 if (in_buf_pos > buf_size) in_buf_pos = 0; 167 auto to_read = INPUTS * n_samples, to_write = n_outputs * n_samples; 168 169 auto write_out = [&]() 170 { 171 auto now_write = std::min(to_write, std::uint32_t(out_buf.size())); 172 if (!now_write) return; 173 Demux(out_dump, out_buf.data(), std::span{outs}.subspan(0, n_outputs), 174 now_write); 175 out_buf = out_buf.subspan(now_write); 176 to_write -= now_write; 177 }; 178 179 write_out(); 180 while (to_read) 181 { 182 auto now_read = std::min<std::uint32_t>(to_read, buf_size - in_buf_pos); 183 Mux(in_dump, std::span{ins}, in_buf.data() + in_buf_pos, now_read, preamp); 184 to_read -= now_read, in_buf_pos += now_read; 185 186 if (in_buf_pos == buf_size) 187 { 188 if (!out_buf.empty()) pw_log_warn("Discarding non-empty buffer!"); 189 pw_log_trace("Calling decode"); 190 out_buf = dec.Decode(in_buf); 191 in_buf_pos = 0; 192 } 193 194 write_out(); 195 } 196 197 if (to_write) 198 { 199 to_write /= n_outputs; 200 pw_log_debug("Memsetting remaining %" PRIu32 " samples", to_write); 201 for (unsigned i = 0; i < n_outputs; ++i) 202 if (outs[i]) std::memset(outs[i], 0, sizeof(float) * to_write); 203 } 204 } 205 206 void Data::Input(int fd) 207 { 208 constexpr std::size_t READ_SIZE = 4096; 209 auto off = cmd_buf.size(); 210 cmd_buf.resize(off + READ_SIZE); 211 // noexcept from here 212 auto rd = read(fd, cmd_buf.data() + off, READ_SIZE); 213 if (rd < 0) { cmd_buf.resize(off); return; } 214 cmd_buf.resize(off + rd); 215 216 auto st = cmd_buf.begin(); 217 while (st < cmd_buf.end()) 218 if (auto end = std::find(st, cmd_buf.end(), '\n'); end != cmd_buf.end()) 219 { 220 ProcessLine({st, end}); 221 st = end+1; 222 } 223 else if (rd == 0) 224 { 225 ProcessLine({st, cmd_buf.end()}); 226 st = cmd_buf.end(); 227 } 228 cmd_buf.erase(cmd_buf.begin(), st); 229 } 230 231 void Data::ParseArgsFun(std::string_view k, std::string_view v) 232 { 233 if (HandleOption(opts, k, v)) return; 234 auto dopt = dec.GetOptions(); 235 if (HandleOption(dopt, k, v)) return; 236 throw ParseError("Unknown option "s + std::string{k}); 237 } 238 239 void Data::ProcessLine(std::string_view sv) noexcept 240 { 241 try 242 { 243 std::vector<std::string_view> parts; 244 for (std::size_t st = 0; st < sv.size(); ) 245 if (sv[st] == ' ' || sv[st] == '\t') ++st; 246 else if (auto p = sv.find_first_of(" \t", st); p != std::string_view::npos) 247 { 248 parts.push_back(sv.substr(st, p-st)); 249 st = p+1; 250 } 251 else 252 { 253 parts.push_back(sv.substr(st)); 254 break; 255 } 256 257 ParseOptions(parts, std::bind_front(&Data::ParseArgsFun, this)); 258 259 out_buf = {}; // FIXME: better way 260 // update quantum 261 { 262 auto quantum = std::to_string(force_quantum ? dec.GetBlockSize() : 0); 263 spa_dict_item upd[] = { 264 SPA_DICT_ITEM_INIT(PW_KEY_NODE_FORCE_QUANTUM, quantum.c_str()), 265 }; 266 auto ud = SPA_DICT_INIT(upd, std::size(upd)); 267 pw_filter_update_properties(filter, nullptr, &ud); 268 } 269 270 CreateOutputs(); 271 } 272 catch (Exit) {} // ignore Exit from help 273 catch (const std::exception& e) 274 { std::cerr << "Failed to execute command: " << e.what() << std::endl; } 275 } 276 277 void Data::CreateOutputs() 278 { 279 auto outputs = dec.GetChannels(); 280 n_outputs = outputs.size(); 281 282 std::array<bool, Channel::N_CHANNELS> used{}; 283 for (std::uint32_t i = 0; i < n_outputs; ++i) 284 { 285 auto ch = outputs[i]; 286 used[ch] = true; 287 if (!out_by_channel[ch]) 288 { 289 auto pw_name = Channel::NAMES_PIPEWIRE[ch]; 290 out_by_channel[ch] = pw_filter_add_port( 291 filter, 292 PW_DIRECTION_OUTPUT, 293 PW_FILTER_PORT_FLAG_MAP_BUFFERS, 294 0, // sizeof(Port) 295 pw_properties_new( 296 PW_KEY_FORMAT_DSP, "32 bit float mono audio", 297 PW_KEY_AUDIO_CHANNEL, pw_name, 298 PW_KEY_PORT_NAME, ("output_"s + pw_name).c_str(), 299 nullptr), 300 nullptr, 0); 301 } 302 out[i] = out_by_channel[ch]; 303 } 304 305 for (unsigned ch = 0; ch < Channel::N_CHANNELS; ++ch) 306 if (!used[ch] && out_by_channel[ch]) 307 { 308 pw_filter_remove_port(out_by_channel[ch]); 309 out_by_channel[ch] = nullptr; 310 } 311 } 312 313 static const struct pw_filter_events filter_events = 314 { 315 .version = PW_VERSION_FILTER_EVENTS, 316 .process = [](void* userdata, struct spa_io_position* position) 317 { reinterpret_cast<Data*>(userdata)->Process(position); }, 318 }; 319 320 static void DoQuit(void *userdata, int signal_number) 321 { 322 (void) signal_number; 323 pw_main_loop_quit(reinterpret_cast<Data*>(userdata)->loop); 324 } 325 326 static File OpenFile(std::string_view sv) 327 { 328 if (sv.empty()) return {}; 329 else return {std::string{sv}.c_str(), "wb"}; 330 } 331 332 int Main(int argc, char *argv[]) 333 { 334 Data data = {}; 335 data.opts = { 336 { 337 "help", "", "Show help", 338 [&](std::string_view) 339 { 340 std::cerr << "Usage " << argv[0] << " [--options]\n"; 341 PrintHelp(data.opts); 342 data.dec.PrintHelp(); 343 throw Exit{0}; 344 }, 345 }, 346 { 347 "dump_input", "FILENAME", "Dump input to file", 348 [&data](std::string_view sv) { data.in_dump = OpenFile(sv); }, 349 }, 350 { 351 "dump_output", "FILENAME", "Dump output to file", 352 [&data](std::string_view sv) { data.out_dump = OpenFile(sv); }, 353 }, 354 { 355 "force_quantum", "BOOL", 356 "Force Pipewire to use the decoder's block size as quantum", 357 [&data](std::string_view sv) { data.force_quantum = FromString<bool>(sv); }, 358 }, 359 { 360 "preamp", "FLOAT", 361 "Multiply all input values by this (default: 1)", 362 [&data](std::string_view sv) { data.preamp = FromString<float>(sv); }, 363 }, 364 }; 365 366 ParseArgs(argc, argv, std::bind_front(&Data::ParseArgsFun, &data)); 367 368 data.opts.emplace_back( 369 "reset", "", "Reset the current decoder", 370 [&](auto) { data.reset_pending = true; }); 371 data.opts.emplace_back( 372 "quit", "", "Quits the application", 373 [&](auto) { pw_main_loop_quit(data.loop); }); 374 375 pw_init(&argc, &argv); 376 377 /* make a main loop. If you already have another main loop, you can add 378 * the fd of this pipewire mainloop to it. */ 379 data.loop = pw_main_loop_new(nullptr); 380 auto loop = pw_main_loop_get_loop(data.loop); 381 382 pw_loop_add_signal(loop, SIGINT, DoQuit, &data); 383 pw_loop_add_signal(loop, SIGTERM, DoQuit, &data); 384 pw_loop_add_io( 385 loop, STDIN_FILENO, SPA_IO_IN, false, 386 [](void* userdata, int fd, uint32_t mask) 387 { 388 (void) mask; 389 reinterpret_cast<Data*>(userdata)->Input(fd); 390 }, &data); 391 392 /* Create a simple filter, the simple filter manages the core and remote 393 * objects for you if you don't need to deal with them. 394 * 395 * Pass your events and a user_data pointer as the last arguments. This 396 * will inform you about the filter state. The most important event 397 * you need to listen to is the process event where you need to process 398 * the data. 399 */ 400 data.filter = pw_filter_new_simple( 401 loop, 402 "surroundize-pipewire", 403 pw_properties_new( 404 PW_KEY_MEDIA_TYPE, "Audio", 405 PW_KEY_MEDIA_CATEGORY, "Filter", 406 PW_KEY_MEDIA_ROLE, "DSP", 407 PW_KEY_MEDIA_CLASS, "Stream/Output/Audio", 408 PW_KEY_APP_NAME, "surroundize-pipewire", 409 PW_KEY_NODE_FORCE_QUANTUM, 410 std::to_string(data.force_quantum ? data.dec.GetBlockSize() : 0).c_str(), 411 PW_KEY_NODE_PASSIVE, "true", 412 nullptr), 413 &filter_events, 414 &data); 415 416 for (unsigned i = 0; i < INPUTS; ++i) 417 data.in[i] = pw_filter_add_port( 418 data.filter, 419 PW_DIRECTION_INPUT, 420 PW_FILTER_PORT_FLAG_MAP_BUFFERS, 421 0, // sizeof(Port) 422 pw_properties_new( 423 PW_KEY_FORMAT_DSP, "32 bit float mono audio", 424 PW_KEY_AUDIO_CHANNEL, INPUT_NAMES[i] + 6, 425 PW_KEY_PORT_NAME, INPUT_NAMES[i], 426 nullptr), 427 nullptr, 0); 428 429 data.CreateOutputs(); 430 431 /* Now connect this filter. We ask that our process function is 432 * called in a realtime thread. */ 433 if (pw_filter_connect(data.filter, 434 PW_FILTER_FLAG_NONE, //PW_FILTER_FLAG_RT_PROCESS, 435 nullptr, 0) < 0) 436 { 437 std::cerr << "can't connect\n"; 438 return -1; 439 } 440 441 /* and wait while we let things run */ 442 pw_main_loop_run(data.loop); 443 444 pw_filter_destroy(data.filter); 445 pw_main_loop_destroy(data.loop); 446 pw_deinit(); 447 448 return 0; 449 }