* Re: [PATCH v6 03/20] perf record: Introduce thread local variable [not found] ` <c5a046f8bed989e4ede98f1fcdaa9d0b6bf78cac.1622025774.git.alexey.v.bayduraev@linux.intel.com> @ 2021-06-03 22:56 ` Riccardo Mancini 2021-06-09 22:54 ` Namhyung Kim 0 siblings, 1 reply; 6+ messages in thread From: Riccardo Mancini @ 2021-06-03 22:56 UTC (permalink / raw) To: Alexey Bayduraev Cc: Jiri Olsa, Namhyung Kim, Alexander Shishkin, Peter Zijlstra, Ingo Molnar, linux-kernel, Andi Kleen, Adrian Hunter, Alexander Antonov, Alexei Budankov, linux-perf-users, Ian Rogers, Arnaldo Carvalho de Melo Hi, thank you very much for your work for adding threading capabilites to perf record. I did some testing on your entire patchset, especially checking for memory issues using ASan. This is just the first of a couple of emails to point out some issues I found. I will also do additional tests in the future. On Wed, 2021-05-26 at 13:52 +0300, Alexey Bayduraev wrote: SNIP > @@ -2220,18 +2275,20 @@ static int __cmd_record(struct record *rec, int argc, > const char **argv) > goto out_child; > } > > - if (!quiet) > - fprintf(stderr, "[ perf record: Woken up %ld times to write data > ]\n", waking); > - > if (target__none(&rec->opts.target)) > record__synthesize_workload(rec, true); > > out_child: > + record__stop_threads(rec, &waking); > +out_free_threads: > record__free_thread_data(rec); > evlist__finalize_ctlfd(rec->evlist); > record__mmap_read_all(rec, true); > record__aio_mmap_read_sync(rec); record__mmap_read_all should be moved before record__free_thread_data since it uses the thread_data that's just been freed. Furthermore, record__mmap_read_all should also be moved before the out_free_threads label, since it cannot be called unless record__start_threads succeeded, otherwise thread would be NULL and will cause a segfault (it happens if there is an error somewhere else in perf, for example). In my tests the following order works, but it should be double checked for possible side-effects of this order change. out_child: record__stop_threads(rec, &waking); record__mmap_read_all(rec, true); out_free_threads: record__free_thread_data(rec); evlist__finalize_ctlfd(rec->evlist); record__aio_mmap_read_sync(rec); Thanks, Riccardo > + if (!quiet) > + fprintf(stderr, "[ perf record: Woken up %ld times to write data > ]\n", waking); > + > if (rec->session->bytes_transferred && rec->session->bytes_compressed) { > ratio = (float)rec->session->bytes_transferred/(float)rec- > >session->bytes_compressed; > session->header.env.comp_ratio = ratio + 0.5; SNIP ^ permalink raw reply [flat|nested] 6+ messages in thread
* Re: [PATCH v6 03/20] perf record: Introduce thread local variable 2021-06-03 22:56 ` [PATCH v6 03/20] perf record: Introduce thread local variable Riccardo Mancini @ 2021-06-09 22:54 ` Namhyung Kim 0 siblings, 0 replies; 6+ messages in thread From: Namhyung Kim @ 2021-06-09 22:54 UTC (permalink / raw) To: Riccardo Mancini Cc: Alexey Bayduraev, Jiri Olsa, Alexander Shishkin, Peter Zijlstra, Ingo Molnar, linux-kernel, Andi Kleen, Adrian Hunter, Alexander Antonov, Alexei Budankov, linux-perf-users, Ian Rogers, Arnaldo Carvalho de Melo Hi Riccardo, On Thu, Jun 3, 2021 at 3:56 PM Riccardo Mancini <rickyman7@gmail.com> wrote: > > Hi, > > thank you very much for your work for adding threading capabilites to perf > record. > I did some testing on your entire patchset, especially checking for memory > issues using ASan. This is just the first of a couple of emails to point out > some issues I found. > I will also do additional tests in the future. > > On Wed, 2021-05-26 at 13:52 +0300, Alexey Bayduraev wrote: > SNIP > > @@ -2220,18 +2275,20 @@ static int __cmd_record(struct record *rec, int argc, > > const char **argv) > > goto out_child; > > } > > > > - if (!quiet) > > - fprintf(stderr, "[ perf record: Woken up %ld times to write data > > ]\n", waking); > > - > > if (target__none(&rec->opts.target)) > > record__synthesize_workload(rec, true); > > > > out_child: > > + record__stop_threads(rec, &waking); > > +out_free_threads: > > record__free_thread_data(rec); > > evlist__finalize_ctlfd(rec->evlist); > > record__mmap_read_all(rec, true); > > record__aio_mmap_read_sync(rec); > > record__mmap_read_all should be moved before record__free_thread_data since it > uses the thread_data that's just been freed. > Furthermore, record__mmap_read_all should also be moved before the > out_free_threads label, since it cannot be called unless record__start_threads > succeeded, otherwise thread would be NULL and will cause a segfault (it happens > if there is an error somewhere else in perf, for example). > > In my tests the following order works, but it should be double checked for > possible side-effects of this order change. > > out_child: > record__stop_threads(rec, &waking); > record__mmap_read_all(rec, true); > out_free_threads: > record__free_thread_data(rec); > evlist__finalize_ctlfd(rec->evlist); > record__aio_mmap_read_sync(rec); I wonder how it worked before.. maybe we should place record__free_thread_data() far below. Thanks, Namhyung ^ permalink raw reply [flat|nested] 6+ messages in thread
[parent not found: <bdbb55a052ced7adf7f2d16cbc4c7c5507b7c0e3.1622025774.git.alexey.v.bayduraev@linux.intel.com>]
* Re: [PATCH v6 05/20] perf record: Start threads in the beginning of trace streaming [not found] ` <bdbb55a052ced7adf7f2d16cbc4c7c5507b7c0e3.1622025774.git.alexey.v.bayduraev@linux.intel.com> @ 2021-06-03 23:01 ` Riccardo Mancini 0 siblings, 0 replies; 6+ messages in thread From: Riccardo Mancini @ 2021-06-03 23:01 UTC (permalink / raw) To: Alexey Bayduraev Cc: Jiri Olsa, Namhyung Kim, Alexander Shishkin, Peter Zijlstra, Ingo Molnar, linux-kernel, Andi Kleen, Adrian Hunter, Alexander Antonov, Alexei Budankov, linux-perf-users, Ian Rogers, Arnaldo Carvalho de Melo Hi, On Wed, 2021-05-26 at 13:52 +0300, Alexey Bayduraev wrote: > Start thread in detached state because its management is implemented > via messaging to avoid any scaling issues. Block signals prior thread > start so only main tool thread would be notified on external async > signals during data collection. Thread affinity mask is used to assign > eligible cpus for the thread to run. Wait and sync on thread start using > thread ack pipe. > > Signed-off-by: Alexey Bayduraev <alexey.v.bayduraev@linux.intel.com> > --- > tools/perf/builtin-record.c | 106 +++++++++++++++++++++++++++++++++++- > 1 file changed, 105 insertions(+), 1 deletion(-) > > diff --git a/tools/perf/builtin-record.c b/tools/perf/builtin-record.c > index 838c1f779849..88fad12cbe5b 100644 > --- a/tools/perf/builtin-record.c > +++ b/tools/perf/builtin-record.c > @@ -1423,6 +1423,66 @@ static void record__thread_munmap_filtered(struct > fdarray *fda, int fd, > perf_mmap__put(map); > } > > +static void *record__thread(void *arg) > +{ > + enum thread_msg msg = THREAD_MSG__READY; > + bool terminate = false; > + struct fdarray *pollfd; > + int err, ctlfd_pos; > + > + thread = arg; > + thread->tid = syscall(SYS_gettid); > + > + err = write(thread->pipes.ack[1], &msg, sizeof(msg)); > + if (err == -1) > + pr_err("threads[%d]: failed to notify on start: %s", thread- > >tid, strerror(errno)); > + > + pr_debug("threads[%d]: started on cpu=%d\n", thread->tid, > sched_getcpu()); > + > + pollfd = &thread->pollfd; > + ctlfd_pos = thread->ctlfd_pos; > + > + for (;;) { > + unsigned long long hits = thread->samples; > + > + if (record__mmap_read_all(thread->rec, false) < 0 || > terminate) > + break; > + > + if (hits == thread->samples) { > + > + err = fdarray__poll(pollfd, -1); > + /* > + * Propagate error, only if there's any. Ignore > positive > + * number of returned events and interrupt error. > + */ > + if (err > 0 || (err < 0 && errno == EINTR)) > + err = 0; > + thread->waking++; > + > + if (fdarray__filter(pollfd, POLLERR | POLLHUP, > + record__thread_munmap_filtered, > NULL) == 0) > + break; > + } > + > + if (pollfd->entries[ctlfd_pos].revents & POLLHUP) { > + terminate = true; > + close(thread->pipes.msg[0]); > + pollfd->entries[ctlfd_pos].fd = -1; > + pollfd->entries[ctlfd_pos].events = 0; > + } > + > + pollfd->entries[ctlfd_pos].revents = 0; > + } > + record__mmap_read_all(thread->rec, true); > + > + err = write(thread->pipes.ack[1], &msg, sizeof(msg)); > + if (err == -1) > + pr_err("threads[%d]: failed to notify on termination: %s", > + thread->tid, strerror(errno)); > + > + return NULL; > +} > + > static void record__init_features(struct record *rec) > { > struct perf_session *session = rec->session; > @@ -1886,13 +1946,57 @@ static int record__terminate_thread(struct thread_data > *thread_data) > > static int record__start_threads(struct record *rec) > { > + int t, tt, ret = 0, nr_threads = rec->nr_threads; > struct thread_data *thread_data = rec->thread_data; > + sigset_t full, mask; > + pthread_t handle; > + pthread_attr_t attrs; > + > + sigfillset(&full); > + if (sigprocmask(SIG_SETMASK, &full, &mask)) { > + pr_err("Failed to block signals on threads start: %s\n", > strerror(errno)); > + return -1; > + } > + > + pthread_attr_init(&attrs); > + pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED); > + > + for (t = 1; t < nr_threads; t++) { > + enum thread_msg msg = THREAD_MSG__UNDEFINED; > + > + pthread_attr_setaffinity_np(&attrs, > + > MMAP_CPU_MASK_BYTES(&(thread_data[t].mask->affinity)), > + (cpu_set_t *)(thread_data[t].mask- > >affinity.bits)); > + > + if (pthread_create(&handle, &attrs, record__thread, > &thread_data[t])) { > + for (tt = 1; tt < t; tt++) > + record__terminate_thread(&thread_data[t]); > + pr_err("Failed to start threads: %s\n", > strerror(errno)); > + ret = -1; > + goto out_err; > + } > + > + if (read(thread_data[t].pipes.ack[0], &msg, sizeof(msg)) > 0) > + pr_debug2("threads[%d]: sent %s\n", rec- > >thread_data[t].tid, > + thread_msg_tags[msg]); > + } > + > + if (nr_threads > 1) { > + sched_setaffinity(0, MMAP_CPU_MASK_BYTES(&thread_data[0].mask- > >affinity), > + (cpu_set_t *)thread_data[0].mask- > >affinity.bits); > + } > > thread = &thread_data[0]; > > pr_debug("threads[%d]: started on cpu=%d\n", thread->tid, > sched_getcpu()); > > - return 0; > +out_err: > + if (sigprocmask(SIG_SETMASK, &mask, NULL)) { > + pr_err("Failed to unblock signals on threads start: %s\n", > strerror(errno)); > + ret = -1; > + } > + > + return ret; > } ASan complains of a memory leak of the attrs, since pthread_attr_destroy is missing. It could be added just after out_err label. Thanks, Riccardo > > static int record__stop_threads(struct record *rec, unsigned long *waking) ^ permalink raw reply [flat|nested] 6+ messages in thread
[parent not found: <59a8bd9c18b70150919c44c95c551569a7c58bb0.1622025774.git.alexey.v.bayduraev@linux.intel.com>]
* Re: [PATCH v6 10/20] perf record: Introduce --threads=<spec> command line option [not found] ` <59a8bd9c18b70150919c44c95c551569a7c58bb0.1622025774.git.alexey.v.bayduraev@linux.intel.com> @ 2021-06-03 23:14 ` Riccardo Mancini 0 siblings, 0 replies; 6+ messages in thread From: Riccardo Mancini @ 2021-06-03 23:14 UTC (permalink / raw) To: Alexey Bayduraev Cc: Jiri Olsa, Namhyung Kim, Alexander Shishkin, Peter Zijlstra, Ingo Molnar, linux-kernel, Andi Kleen, Adrian Hunter, Alexander Antonov, Alexei Budankov, linux-perf-users, Ian Rogers, Arnaldo Carvalho de Melo Hi, the parameter provided to --thread is not checked. If an invalid value is inserted, it will crash. On Wed, 2021-05-26 at 13:53 +0300, Alexey Bayduraev wrote: > Provide --threads option in perf record command line interface. > The option can have a value in the form of masks that specify > cpus to be monitored with data streaming threads and its layout > in system topology. The masks can be filtered using cpu mask > provided via -C option. > > The specification value can be user defined list of masks. Masks > separated by colon define cpus to be monitored by one thread and > affinity mask of that thread is separated by slash. For example: > <cpus mask 1>/<affinity mask 1>:<cpu mask 2>/<affinity mask 2> > specifies parallel threads layout that consists of two threads > with corresponding assigned cpus to be monitored. > > The specification value can be a string e.g. "cpu", "core" or > "socket" meaning creation of data streaming thread for every > cpu or core or socket to monitor distinct cpus or cpus grouped > by core or socket. > > The option provided with no or empty value defaults to per-cpu > parallel threads layout creating data streaming thread for every > cpu being monitored. > > Feature design and implementation are based on prototypes [1], [2]. > > [1] git clone https://git.kernel.org/pub/scm/linux/kernel/git/jolsa/perf.git - > b perf/record_threads > [2] https://lore.kernel.org/lkml/20180913125450.21342-1-jolsa@kernel.org/ > > Suggested-by: Jiri Olsa <jolsa@kernel.org> > Suggested-by: Namhyung Kim <namhyung@kernel.org> > Acked-by: Andi Kleen <ak@linux.intel.com> > Signed-off-by: Alexey Bayduraev <alexey.v.bayduraev@linux.intel.com> > --- > tools/perf/builtin-record.c | 347 +++++++++++++++++++++++++++++++++++- > tools/perf/util/record.h | 1 + > 2 files changed, 346 insertions(+), 2 deletions(-) > > diff --git a/tools/perf/builtin-record.c b/tools/perf/builtin-record.c > index e118efe21ba7..a54d72475629 100644 > --- a/tools/perf/builtin-record.c > +++ b/tools/perf/builtin-record.c > @@ -51,6 +51,7 @@ > #include "util/evlist-hybrid.h" > #include "asm/bug.h" > #include "perf.h" > +#include "cputopo.h" > > #include <errno.h> > #include <inttypes.h> > @@ -122,6 +123,20 @@ static const char *thread_msg_tags[THREAD_MSG__MAX] = { > "UNDEFINED", "READY" > }; > > +enum thread_spec { > + THREAD_SPEC__UNDEFINED = 0, > + THREAD_SPEC__CPU, > + THREAD_SPEC__CORE, > + THREAD_SPEC__SOCKET, > + THREAD_SPEC__NUMA, > + THREAD_SPEC__USER, > + THREAD_SPEC__MAX, > +}; > + > +static const char *thread_spec_tags[THREAD_SPEC__MAX] = { > + "undefined", "cpu", "core", "socket", "numa", "user" > +}; > + > struct record { > struct perf_tool tool; > struct record_opts opts; > @@ -2721,6 +2736,70 @@ static void record__thread_mask_free(struct thread_mask > *mask) > record__mmap_cpu_mask_free(&mask->affinity); > } > > +static int record__thread_mask_or(struct thread_mask *dest, struct > thread_mask *src1, > + struct thread_mask *src2) > +{ > + if (src1->maps.nbits != src2->maps.nbits || > + dest->maps.nbits != src1->maps.nbits || > + src1->affinity.nbits != src2->affinity.nbits || > + dest->affinity.nbits != src1->affinity.nbits) > + return -EINVAL; > + > + bitmap_or(dest->maps.bits, src1->maps.bits, > + src2->maps.bits, src1->maps.nbits); > + bitmap_or(dest->affinity.bits, src1->affinity.bits, > + src2->affinity.bits, src1->affinity.nbits); > + > + return 0; > +} > + > +static int record__thread_mask_intersects(struct thread_mask *mask_1, struct > thread_mask *mask_2) > +{ > + int res1, res2; > + > + if (mask_1->maps.nbits != mask_2->maps.nbits || > + mask_1->affinity.nbits != mask_2->affinity.nbits) > + return -EINVAL; > + > + res1 = bitmap_intersects(mask_1->maps.bits, mask_2->maps.bits, > + mask_1->maps.nbits); > + res2 = bitmap_intersects(mask_1->affinity.bits, mask_2->affinity.bits, > + mask_1->affinity.nbits); > + if (res1 || res2) > + return 1; > + > + return 0; > +} > + > +static int record__parse_threads(const struct option *opt, const char *str, > int unset) > +{ > + int s; > + struct record_opts *opts = opt->value; > + > + if (unset || !str || !strlen(str)) { > + opts->threads_spec = THREAD_SPEC__CPU; > + } else { > + for (s = 1; s < THREAD_SPEC__MAX; s++) { > + if (s == THREAD_SPEC__USER) { > + opts->threads_user_spec = strdup(str); > + opts->threads_spec = THREAD_SPEC__USER; > + break; > + } > + if (!strncasecmp(str, thread_spec_tags[s], > strlen(thread_spec_tags[s]))) { > + opts->threads_spec = s; > + break; > + } > + } > + } > + > + pr_debug("threads_spec: %s", thread_spec_tags[opts->threads_spec]); > + if (opts->threads_spec == THREAD_SPEC__USER) > + pr_debug("=[%s]", opts->threads_user_spec); > + pr_debug("\n"); > + > + return 0; > +} > + > static int parse_output_max_size(const struct option *opt, > const char *str, int unset) > { > @@ -3164,6 +3243,9 @@ static struct option __record_options[] = { > "\t\t\t Optionally send control command completion > ('ack\\n') to ack-fd descriptor.\n" > "\t\t\t Alternatively, ctl-fifo / ack-fifo will be > opened and used as ctl-fd / ack-fd.", > parse_control_option), > + OPT_CALLBACK_OPTARG(0, "threads", &record.opts, NULL, "spec", > + "write collected trace data into several data > files using parallel threads", > + record__parse_threads), > OPT_END() > }; > > @@ -3177,6 +3259,17 @@ static void record__mmap_cpu_mask_init(struct > mmap_cpu_mask *mask, struct perf_c > set_bit(cpus->map[c], mask->bits); > } > > +static void record__mmap_cpu_mask_init_spec(struct mmap_cpu_mask *mask, char > *mask_spec) > +{ > + struct perf_cpu_map *cpus; > + > + cpus = perf_cpu_map__new(mask_spec); > + if (cpus) { > + record__mmap_cpu_mask_init(mask, cpus); > + perf_cpu_map__put(cpus); > + } > +} > + > static int record__alloc_thread_masks(struct record *rec, int nr_threads, int > nr_bits) > { > int t, ret; > @@ -3196,6 +3289,229 @@ static int record__alloc_thread_masks(struct record > *rec, int nr_threads, int nr > > return 0; > } > + > +static int record__init_thread_cpu_masks(struct record *rec, struct > perf_cpu_map *cpus) > +{ > + int t, ret, nr_cpus = perf_cpu_map__nr(cpus); > + > + ret = record__alloc_thread_masks(rec, nr_cpus, cpu__max_cpu()); > + if (ret) > + return ret; > + > + rec->nr_threads = nr_cpus; > + pr_debug("threads: nr_threads=%d\n", rec->nr_threads); > + > + for (t = 0; t < rec->nr_threads; t++) { > + set_bit(cpus->map[t], rec->thread_masks[t].maps.bits); > + pr_debug("thread_masks[%d]: maps mask [%d]\n", t, cpus- > >map[t]); > + set_bit(cpus->map[t], rec->thread_masks[t].affinity.bits); > + pr_debug("thread_masks[%d]: affinity mask [%d]\n", t, cpus- > >map[t]); > + } > + > + return 0; > +} > + > +static int record__init_thread_masks_spec(struct record *rec, struct > perf_cpu_map *cpus, > + char **maps_spec, char > **affinity_spec, u32 nr_spec) > +{ > + u32 s; > + int ret = 0, nr_threads = 0; > + struct mmap_cpu_mask cpus_mask; > + struct thread_mask thread_mask, full_mask, *prev_masks; > + > + ret = record__mmap_cpu_mask_alloc(&cpus_mask, cpu__max_cpu()); > + if (ret) > + goto out; > + record__mmap_cpu_mask_init(&cpus_mask, cpus); > + ret = record__thread_mask_alloc(&thread_mask, cpu__max_cpu()); > + if (ret) > + goto out_free_cpu_mask; > + ret = record__thread_mask_alloc(&full_mask, cpu__max_cpu()); > + if (ret) > + goto out_free_thread_mask; > + record__thread_mask_clear(&full_mask); > + > + for (s = 0; s < nr_spec; s++) { > + record__thread_mask_clear(&thread_mask); > + > + record__mmap_cpu_mask_init_spec(&thread_mask.maps, > maps_spec[s]); > + record__mmap_cpu_mask_init_spec(&thread_mask.affinity, > affinity_spec[s]); > + > + if (!bitmap_and(thread_mask.maps.bits, thread_mask.maps.bits, > + cpus_mask.bits, thread_mask.maps.nbits) || > + !bitmap_and(thread_mask.affinity.bits, > thread_mask.affinity.bits, > + cpus_mask.bits, thread_mask.affinity.nbits)) > + continue; > + > + ret = record__thread_mask_intersects(&thread_mask, > &full_mask); > + if (ret) > + goto out_free_full_mask; > + record__thread_mask_or(&full_mask, &full_mask, &thread_mask); > + > + prev_masks = rec->thread_masks; > + rec->thread_masks = realloc(rec->thread_masks, > + (nr_threads + 1) * sizeof(struct > thread_mask)); > + if (!rec->thread_masks) { > + pr_err("Failed to allocate thread masks\n"); > + rec->thread_masks = prev_masks; > + ret = -ENOMEM; > + goto out_free_full_mask; > + } > + rec->thread_masks[nr_threads] = thread_mask; > + if (verbose) { > + pr_debug("thread_masks[%d]: addr=", nr_threads); > + mmap_cpu_mask__scnprintf(&rec- > >thread_masks[nr_threads].maps, "maps"); > + pr_debug("thread_masks[%d]: addr=", nr_threads); > + mmap_cpu_mask__scnprintf(&rec- > >thread_masks[nr_threads].affinity, > + "affinity"); > + } > + nr_threads++; > + ret = record__thread_mask_alloc(&thread_mask, cpu__max_cpu()); > + if (ret) > + goto out_free_full_mask; > + } > + Here it can be checked if nr_threads is > 0. Otherwise, print an error message epxlaining the reason and return an error. nr_threads can be 0 if the user inserts out-of-range CPUs in the spec, e.g. "--threads=15/17" on a 4 CPU machine. > + rec->nr_threads = nr_threads; > + pr_debug("threads: nr_threads=%d\n", rec->nr_threads); > + > +out_free_full_mask: > + record__thread_mask_free(&full_mask); > +out_free_thread_mask: > + record__thread_mask_free(&thread_mask); > +out_free_cpu_mask: > + record__mmap_cpu_mask_free(&cpus_mask); > +out: > + return ret; > +} > + > +static int record__init_thread_core_masks(struct record *rec, struct > perf_cpu_map *cpus) > +{ > + int ret; > + struct cpu_topology *topo; > + > + topo = cpu_topology__new(); > + if (!topo) > + return -EINVAL; > + > + ret = record__init_thread_masks_spec(rec, cpus, topo->thread_siblings, > + topo->thread_siblings, topo- > >thread_sib); > + cpu_topology__delete(topo); > + > + return ret; > +} > + > +static int record__init_thread_socket_masks(struct record *rec, struct > perf_cpu_map *cpus) > +{ > + int ret; > + struct cpu_topology *topo; > + > + topo = cpu_topology__new(); > + if (!topo) > + return -EINVAL; > + > + ret = record__init_thread_masks_spec(rec, cpus, topo->core_siblings, > + topo->core_siblings, topo- > >core_sib); > + cpu_topology__delete(topo); > + > + return ret; > +} > + > +static int record__init_thread_numa_masks(struct record *rec, struct > perf_cpu_map *cpus) > +{ > + u32 s; > + int ret; > + char **spec; > + struct numa_topology *topo; > + > + topo = numa_topology__new(); > + if (!topo) > + return -EINVAL; > + spec = zalloc(topo->nr * sizeof(char *)); > + if (!spec) { > + ret = -ENOMEM; > + goto out_delete_topo; > + } > + for (s = 0; s < topo->nr; s++) > + spec[s] = topo->nodes[s].cpus; > + > + ret = record__init_thread_masks_spec(rec, cpus, spec, spec, topo->nr); > + > + zfree(&spec); > + > +out_delete_topo: > + numa_topology__delete(topo); > + > + return ret; > +} > + > +static int record__init_thread_user_masks(struct record *rec, struct > perf_cpu_map *cpus) > +{ > + int t, ret; > + u32 s, nr_spec = 0; > + char **maps_spec = NULL, **affinity_spec = NULL, **prev_spec; > + char *spec, *spec_ptr, *user_spec, *mask, *mask_ptr; > + > + for (t = 0, user_spec = (char *)rec->opts.threads_user_spec; ; t++, > user_spec = NULL) { > + spec = strtok_r(user_spec, ":", &spec_ptr); > + if (spec == NULL) > + break; > + pr_debug(" spec[%d]: %s\n", t, spec); > + mask = strtok_r(spec, "/", &mask_ptr); > + if (mask == NULL) > + break; > + pr_debug(" maps mask: %s\n", mask); > + prev_spec = maps_spec; > + maps_spec = realloc(maps_spec, (nr_spec + 1) * sizeof(char > *)); > + if (!maps_spec) { > + pr_err("Failed to realloc maps_spec\n"); > + maps_spec = prev_spec; > + ret = -ENOMEM; > + goto out_free_all_specs; > + } > + maps_spec[nr_spec] = strdup(mask); > + if (!maps_spec[nr_spec]) { > + pr_err("Failed to alloc maps_spec[%d]\n", nr_spec); > + ret = -ENOMEM; > + goto out_free_all_specs; > + } > + mask = strtok_r(NULL, "/", &mask_ptr); > + if (mask == NULL) I think this should be a parse failure and return an error, not just skip to the next one. Furthermore, maps_spec[nr_spec] should be freed before breaking/exiting. Thanks, Riccardo > + break; > + pr_debug(" affinity mask: %s\n", mask); > + prev_spec = affinity_spec; > + affinity_spec = realloc(affinity_spec, (nr_spec + 1) * > sizeof(char *)); > + if (!affinity_spec) { > + pr_err("Failed to realloc affinity_spec\n"); > + affinity_spec = prev_spec; > + free(maps_spec[nr_spec]); > + ret = -ENOMEM; > + goto out_free_all_specs; > + } > + affinity_spec[nr_spec] = strdup(mask); > + if (!affinity_spec[nr_spec]) { > + pr_err("Failed to alloc affinity_spec[%d]\n", > nr_spec); > + free(maps_spec[nr_spec]); > + ret = -ENOMEM; > + goto out_free_all_specs; > + } > + nr_spec++; > + } > + > + ret = record__init_thread_masks_spec(rec, cpus, maps_spec, > affinity_spec, nr_spec); > + > +out_free_all_specs: > + for (s = 0; s < nr_spec; s++) { > + if (maps_spec) > + free(maps_spec[s]); > + if (affinity_spec) > + free(affinity_spec[s]); > + } > + free(affinity_spec); > + free(maps_spec); > + > + return ret; > +} > + > static int record__init_thread_default_masks(struct record *rec, struct > perf_cpu_map *cpus) > { > int ret; > @@ -3213,9 +3529,33 @@ static int record__init_thread_default_masks(struct > record *rec, struct perf_cpu > > static int record__init_thread_masks(struct record *rec) > { > + int ret = 0; > struct perf_cpu_map *cpus = rec->evlist->core.cpus; > > - return record__init_thread_default_masks(rec, cpus); > + if (!record__threads_enabled(rec)) > + return record__init_thread_default_masks(rec, cpus); > + > + switch (rec->opts.threads_spec) { > + case THREAD_SPEC__CPU: > + ret = record__init_thread_cpu_masks(rec, cpus); > + break; > + case THREAD_SPEC__CORE: > + ret = record__init_thread_core_masks(rec, cpus); > + break; > + case THREAD_SPEC__SOCKET: > + ret = record__init_thread_socket_masks(rec, cpus); > + break; > + case THREAD_SPEC__NUMA: > + ret = record__init_thread_numa_masks(rec, cpus); > + break; > + case THREAD_SPEC__USER: > + ret = record__init_thread_user_masks(rec, cpus); > + break; > + default: > + break; > + } > + > + return ret; > } > > static int record__fini_thread_masks(struct record *rec) > @@ -3466,7 +3806,10 @@ int cmd_record(int argc, const char **argv) > > err = record__init_thread_masks(rec); > if (err) { > - pr_err("record__init_thread_masks failed, error %d\n", err); > + if (err > 0) > + pr_err("ERROR: parallel data streaming masks (-- > threads) intersect.\n"); > + else > + pr_err("record__init_thread_masks failed, error %d\n", > err); > goto out; > } > > diff --git a/tools/perf/util/record.h b/tools/perf/util/record.h > index 4d68b7e27272..3da156498f47 100644 > --- a/tools/perf/util/record.h > +++ b/tools/perf/util/record.h > @@ -78,6 +78,7 @@ struct record_opts { > int ctl_fd_ack; > bool ctl_fd_close; > int threads_spec; > + const char *threads_user_spec; > }; > > extern const char * const *record_usage; ^ permalink raw reply [flat|nested] 6+ messages in thread
[parent not found: <ec370117b49575be493add488a07450517c78aaf.1622025774.git.alexey.v.bayduraev@linux.intel.com>]
* Re: [PATCH v6 16/20] perf session: Introduce decompressor into trace reader object [not found] ` <ec370117b49575be493add488a07450517c78aaf.1622025774.git.alexey.v.bayduraev@linux.intel.com> @ 2021-06-03 23:22 ` Riccardo Mancini 0 siblings, 0 replies; 6+ messages in thread From: Riccardo Mancini @ 2021-06-03 23:22 UTC (permalink / raw) To: Alexey Bayduraev Cc: Jiri Olsa, Namhyung Kim, Alexander Shishkin, Peter Zijlstra, Ingo Molnar, linux-kernel, Andi Kleen, Adrian Hunter, Alexander Antonov, Alexei Budankov, linux-perf-users, Ian Rogers, Arnaldo Carvalho de Melo Hi, On Wed, 2021-05-26 at 13:53 +0300, Alexey Bayduraev wrote: > Introduce decompressor into trace reader object so that decompression > could be executed on per data file basis separately for every data > file located in data directory. > > Signed-off-by: Alexey Bayduraev <alexey.v.bayduraev@linux.intel.com> > --- > tools/perf/util/session.c | 51 +++++++++++++++++++++++++++++---------- > tools/perf/util/session.h | 1 + > 2 files changed, 39 insertions(+), 13 deletions(-) > > diff --git a/tools/perf/util/session.c b/tools/perf/util/session.c > index 335c073bae87..6c9a682eb291 100644 > --- a/tools/perf/util/session.c > +++ b/tools/perf/util/session.c > @@ -73,6 +73,9 @@ struct reader { > u64 data_offset; > reader_cb_t process; > bool in_place_update; > + struct zstd_data zstd_data; > + struct decomp *decomp; > + struct decomp *decomp_last; > struct reader_state state; > }; > > @@ -85,7 +88,10 @@ static int perf_session__process_compressed_event(struct > perf_session *session, > size_t decomp_size, src_size; > u64 decomp_last_rem = 0; > size_t mmap_len, decomp_len = session->header.env.comp_mmap_len; > - struct decomp *decomp, *decomp_last = session->decomp_last; > + struct decomp *decomp, *decomp_last = session->active_reader ? > + session->active_reader->decomp_last : session->decomp_last; > + struct zstd_data *zstd_data = session->active_reader ? > + &session->active_reader->zstd_data : &session->zstd_data; > > if (decomp_last) { > decomp_last_rem = decomp_last->size - decomp_last->head; > @@ -113,7 +119,7 @@ static int perf_session__process_compressed_event(struct > perf_session *session, > src = (void *)event + sizeof(struct perf_record_compressed); > src_size = event->pack.header.size - sizeof(struct > perf_record_compressed); > > - decomp_size = zstd_decompress_stream(&(session->zstd_data), src, > src_size, > + decomp_size = zstd_decompress_stream(zstd_data, src, src_size, > &(decomp->data[decomp_last_rem]), decomp_len - > decomp_last_rem); > if (!decomp_size) { > munmap(decomp, mmap_len); > @@ -123,12 +129,22 @@ static int perf_session__process_compressed_event(struct > perf_session *session, > > decomp->size += decomp_size; > > - if (session->decomp == NULL) { > - session->decomp = decomp; > - session->decomp_last = decomp; > + if (session->active_reader) { > + if (session->active_reader->decomp == NULL) { > + session->active_reader->decomp = decomp; > + session->active_reader->decomp_last = decomp; > + } else { > + session->active_reader->decomp_last->next = decomp; > + session->active_reader->decomp_last = decomp; > + } > } else { > - session->decomp_last->next = decomp; > - session->decomp_last = decomp; > + if (session->decomp == NULL) { > + session->decomp = decomp; > + session->decomp_last = decomp; > + } else { > + session->decomp_last->next = decomp; > + session->decomp_last = decomp; > + } > } > > pr_debug("decomp (B): %zd to %zd\n", src_size, decomp_size); > @@ -319,11 +335,10 @@ static void perf_session__delete_threads(struct > perf_session *session) > machine__delete_threads(&session->machines.host); > } > > -static void perf_session__release_decomp_events(struct perf_session *session) > +static void perf_decomp__release_events(struct decomp *next) > { > - struct decomp *next, *decomp; > + struct decomp *decomp; > size_t mmap_len; > - next = session->decomp; > do { > decomp = next; > if (decomp == NULL) > @@ -336,6 +351,8 @@ static void perf_session__release_decomp_events(struct > perf_session *session) > > void perf_session__delete(struct perf_session *session) > { > + int r; > + > if (session == NULL) > return; > auxtrace__free(session); > @@ -343,10 +360,12 @@ void perf_session__delete(struct perf_session *session) > perf_session__destroy_kernel_maps(session); > perf_session__delete_threads(session); > if (session->readers) { > + for (r = 0; r < session->nr_readers; r++) > + perf_decomp__release_events(session- > >readers[r].decomp); > zfree(&session->readers); > session->nr_readers = 0; > } > - perf_session__release_decomp_events(session); > + perf_decomp__release_events(session->decomp); > perf_env__exit(&session->header.env); > machines__exit(&session->machines); > if (session->data) > @@ -2157,7 +2176,8 @@ static int __perf_session__process_decomp_events(struct > perf_session *session) > { > s64 skip; > u64 size; > - struct decomp *decomp = session->decomp_last; > + struct decomp *decomp = session->active_reader ? > + session->active_reader->decomp_last : session->decomp_last; > > if (!decomp) > return 0; > @@ -2214,6 +2234,9 @@ reader__process_events(struct reader *rd, struct > perf_session *session, > > memset(mmaps, 0, sizeof(st->mmaps)); > > + if (zstd_init(&rd->zstd_data, 0)) > + return -1; > + > mmap_prot = PROT_READ; > mmap_flags = MAP_SHARED; zstd_fini is never called on zstd_data. Thanks, Riccardo > @@ -2257,12 +2280,13 @@ reader__process_events(struct reader *rd, struct > perf_session *session, > goto remap; > } > > + session->active_reader = rd; > size = event->header.size; > > skip = -EINVAL; > > if (size < sizeof(struct perf_event_header) || > - (skip = rd->process(session, event, st->file_pos, rd->path)) < 0) > { > + (skip = perf_session__process_event(session, event, st->file_pos, > rd->path)) < 0) { > pr_err("%#" PRIx64 " [%s] [%#x]: failed to process type: %d > [%s]\n", > st->file_offset + st->head, rd->path, event- > >header.size, > event->header.type, strerror(-skip)); > @@ -2289,6 +2313,7 @@ reader__process_events(struct reader *rd, struct > perf_session *session, > goto more; > > out: > + session->active_reader = NULL; > return err; > } > > diff --git a/tools/perf/util/session.h b/tools/perf/util/session.h > index 2815d00b5467..e0a8712f8770 100644 > --- a/tools/perf/util/session.h > +++ b/tools/perf/util/session.h > @@ -44,6 +44,7 @@ struct perf_session { > struct decomp *decomp_last; > struct reader *readers; > int nr_readers; > + struct reader *active_reader; > }; > > struct decomp { ^ permalink raw reply [flat|nested] 6+ messages in thread
[parent not found: <be40346cdb384e0721f79d918067ff9026743845.1622025774.git.alexey.v.bayduraev@linux.intel.com>]
* Re: [PATCH v6 20/20] perf session: Load data directory files for analysis [not found] ` <be40346cdb384e0721f79d918067ff9026743845.1622025774.git.alexey.v.bayduraev@linux.intel.com> @ 2021-06-03 23:28 ` Riccardo Mancini 0 siblings, 0 replies; 6+ messages in thread From: Riccardo Mancini @ 2021-06-03 23:28 UTC (permalink / raw) To: Alexey Bayduraev Cc: Jiri Olsa, Namhyung Kim, Alexander Shishkin, Peter Zijlstra, Ingo Molnar, linux-kernel, Andi Kleen, Adrian Hunter, Alexander Antonov, Alexei Budankov, linux-perf-users, Ian Rogers, Arnaldo Carvalho de Melo Hi, On Wed, 2021-05-26 at 13:53 +0300, Alexey Bayduraev wrote: > Load data directory files and provide basic raw dump and aggregated > analysis support of data directories in report mode, still with no > memory consumption optimizations. > > Design and implementation are based on the prototype [1], [2]. > > [1] git clone https://git.kernel.org/pub/scm/linux/kernel/git/jolsa/perf.git - > b perf/record_threads > [2] https://lore.kernel.org/lkml/20180913125450.21342-1-jolsa@kernel.org/ > > Suggested-by: Jiri Olsa <jolsa@kernel.org> > Signed-off-by: Alexey Bayduraev <alexey.v.bayduraev@linux.intel.com> > --- > tools/perf/util/session.c | 129 ++++++++++++++++++++++++++++++++++++++ > 1 file changed, 129 insertions(+) > > diff --git a/tools/perf/util/session.c b/tools/perf/util/session.c > index 041601810b85..dd4ef9749cd0 100644 > --- a/tools/perf/util/session.c > +++ b/tools/perf/util/session.c > @@ -65,6 +65,7 @@ struct reader_state { > u64 data_size; > u64 head; > bool eof; > + u64 size; > }; > > enum { > @@ -2319,6 +2320,7 @@ reader__read_event(struct reader *rd, struct > perf_session *session, > if (skip) > size += skip; > > + st->size += size; > st->head += size; > st->file_pos += size; > > @@ -2418,6 +2420,130 @@ static int __perf_session__process_events(struct > perf_session *session) > return err; > } > > +/* > + * This function reads, merge and process directory data. > + * It assumens the version 1 of directory data, where each > + * data file holds per-cpu data, already sorted by kernel. > + */ > +static int __perf_session__process_dir_events(struct perf_session *session) > +{ > + struct perf_data *data = session->data; > + struct perf_tool *tool = session->tool; > + int i, ret = 0, readers = 1; > + struct ui_progress prog; > + u64 total_size = perf_data__size(session->data); > + struct reader *rd; > + > + perf_tool__fill_defaults(tool); > + > + ui_progress__init_size(&prog, total_size, "Sorting events..."); > + > + for (i = 0; i < data->dir.nr; i++) { > + if (data->dir.files[i].size) > + readers++; > + } > + > + rd = session->readers = zalloc(readers * sizeof(struct reader)); > + if (!rd) > + return -ENOMEM; > + session->nr_readers = readers; > + readers = 0; > + > + rd[readers] = (struct reader) { > + .fd = perf_data__fd(session->data), > + .path = session->data->file.path, > + .data_size = session->header.data_size, > + .data_offset = session->header.data_offset, > + .in_place_update = session->data->in_place_update, > + }; > + ret = reader__init(&rd[readers], NULL); > + if (ret) > + goto out_err; > + ret = reader__mmap(&rd[readers], session); > + if (ret != READER_OK) { > + if (ret == READER_EOF) > + ret = -EINVAL; > + goto out_err; > + } > + readers++; > + > + for (i = 0; i < data->dir.nr; i++) { > + if (data->dir.files[i].size) { > + rd[readers] = (struct reader) { > + .fd = data->dir.files[i].fd, > + .path = data->dir.files[i].path, > + .data_size = data->dir.files[i].size, > + .data_offset = 0, > + .in_place_update = session->data- > >in_place_update, > + }; > + ret = reader__init(&rd[readers], NULL); zstd_fini is never called on rd[readers].zstd_data Maybe it can be done in perf_session__delete. For example, we could add a new reader__fini function to do the cleanup of zstd data and perf_decomp__release_events. Thanks, Riccardo > + if (ret) > + goto out_err; > + ret = reader__mmap(&rd[readers], session); > + if (ret != READER_OK) { > + if (ret == READER_EOF) > + ret = -EINVAL; > + goto out_err; > + } > + readers++; > + } > + } > + > + i = 0; > + > + while ((ret >= 0) && readers) { > + if (session_done()) > + return 0; > + > + if (rd[i].state.eof) { > + i = (i + 1) % session->nr_readers; > + continue; > + } > + > + ret = reader__read_event(&rd[i], session, &prog); > + if (ret < 0) > + break; > + if (ret == READER_EOF) { > + ret = reader__mmap(&rd[i], session); > + if (ret < 0) > + goto out_err; > + if (ret == READER_EOF) > + readers--; > + } > + > + /* > + * Processing 10MBs of data from each reader in sequence, > + * because that's the way the ordered events sorting works > + * most efficiently. > + */ > + if (rd[i].state.size >= 10*1024*1024) { > + rd[i].state.size = 0; > + i = (i + 1) % session->nr_readers; > + } > + } > + > + ret = ordered_events__flush(&session->ordered_events, > OE_FLUSH__FINAL); > + if (ret) > + goto out_err; > + > + ret = perf_session__flush_thread_stacks(session); > +out_err: > + ui_progress__finish(); > + > + if (!tool->no_warn) > + perf_session__warn_about_errors(session); > + > + /* > + * We may switching perf.data output, make ordered_events > + * reusable. > + */ > + ordered_events__reinit(&session->ordered_events); > + > + session->one_mmap = false; > + > + return ret; > +} > + > int perf_session__process_events(struct perf_session *session) > { > if (perf_session__register_idle_thread(session) < 0) > @@ -2426,6 +2552,9 @@ int perf_session__process_events(struct perf_session > *session) > if (perf_data__is_pipe(session->data)) > return __perf_session__process_pipe_events(session); > > + if (perf_data__is_dir(session->data)) > + return __perf_session__process_dir_events(session); > + > return __perf_session__process_events(session); > } > ^ permalink raw reply [flat|nested] 6+ messages in thread
end of thread, other threads:[~2021-06-09 22:54 UTC | newest] Thread overview: 6+ messages (download: mbox.gz follow: Atom feed -- links below jump to the message on this page -- [not found] <cover.1622025774.git.alexey.v.bayduraev@linux.intel.com> [not found] ` <c5a046f8bed989e4ede98f1fcdaa9d0b6bf78cac.1622025774.git.alexey.v.bayduraev@linux.intel.com> 2021-06-03 22:56 ` [PATCH v6 03/20] perf record: Introduce thread local variable Riccardo Mancini 2021-06-09 22:54 ` Namhyung Kim [not found] ` <bdbb55a052ced7adf7f2d16cbc4c7c5507b7c0e3.1622025774.git.alexey.v.bayduraev@linux.intel.com> 2021-06-03 23:01 ` [PATCH v6 05/20] perf record: Start threads in the beginning of trace streaming Riccardo Mancini [not found] ` <59a8bd9c18b70150919c44c95c551569a7c58bb0.1622025774.git.alexey.v.bayduraev@linux.intel.com> 2021-06-03 23:14 ` [PATCH v6 10/20] perf record: Introduce --threads=<spec> command line option Riccardo Mancini [not found] ` <ec370117b49575be493add488a07450517c78aaf.1622025774.git.alexey.v.bayduraev@linux.intel.com> 2021-06-03 23:22 ` [PATCH v6 16/20] perf session: Introduce decompressor into trace reader object Riccardo Mancini [not found] ` <be40346cdb384e0721f79d918067ff9026743845.1622025774.git.alexey.v.bayduraev@linux.intel.com> 2021-06-03 23:28 ` [PATCH v6 20/20] perf session: Load data directory files for analysis Riccardo Mancini
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox; as well as URLs for NNTP newsgroup(s).