使用 ValueError 构建自定义联合平均过程:层顺序需要 1 个输入,但它收到 3 个输入张量 [英] Build Custom Federated averaging process with ValueError: Layer sequential expects 1 inputs, but it received 3 input tensors

查看:31
本文介绍了使用 ValueError 构建自定义联合平均过程:层顺序需要 1 个输入,但它收到 3 个输入张量的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试从 csv 加载数据集并对可用数据执行一些联合学习.

i am trying to load a dataset from csv and perform some federated learning on the available data.

我设法从给定的 csv 文件加载联合数据集并加载训练和测试数据.

i manage to load a federated dataset from a given csv file and load both the train and the test data.

我现在的问题是如何重现一个工作示例来构建一个迭代过程,该过程对这些数据执行自定义联合平均.

My question now is how to reproduce a working example to build an iterative process that performs a custom federated averaging on this data.

这是我的代码,但它不起作用:

Here is my code but it's not working:

import collections
import os

import numpy as np
import pandas as pd
import tensorflow as tf
import tensorflow_federated as tff
from absl import app
from tensorflow.keras import layers

from src.main import Parameters

global input_spec


def main(args):
    working_dir = "D:/User/Documents/GitHub/TriaBaseMLBackup/input/fakehdfs/nms/ystr=2016/ymstr=1/ymdstr=26"
    client_id_colname = 'counter'
    SHUFFLE_BUFFER = 1000
    NUM_EPOCHS = 1

    for root, dirs, files in os.walk(working_dir):
        file_list = []

        for filename in files:
            if filename.endswith('.csv'):
                file_list.append(os.path.join(root, filename))
        df_list = []
        for file in file_list:
            df = pd.read_csv(file, delimiter="|", usecols=[1, 2, 6, 7], header=None, na_values=["NIL"],
                             na_filter=True, names=["time", "meas_info", "counter", "value"])
            # df_list.append(df[["value"]])

        if df_list:
            rawdata = pd.concat(df_list)

    client_ids = df.get(client_id_colname)
    train_client_ids = client_ids.sample(frac=0.5).tolist()

    # test_client_ids = [x for x in client_ids if x not in train_client_ids]
    example_dataset = train_data.create_tf_dataset_for_client(
    train_data.client_ids[0]
     )
    def create_tf_dataset_for_client_fn(client_id):
        # a function which takes a client_id and returns a
        # tf.data.Dataset for that client
        # target = df.pop('value')
        client_data = df[df['value'] == client_id]
        print(df.head())
        features = ['time', 'meas_info', 'value']
        LABEL_COLUMN = 'counter'
        dataset = tf.data.Dataset.from_tensor_slices(
            (collections.OrderedDict(df[features].to_dict('list')),
             df[LABEL_COLUMN].to_list())
        )
        global input_spec
        input_spec = dataset.element_spec
        dataset = dataset.shuffle(SHUFFLE_BUFFER).batch(1).repeat(NUM_EPOCHS)
        return dataset

    train_data = tff.simulation.ClientData.from_clients_and_fn(
        client_ids=train_client_ids,
        create_tf_dataset_for_client_fn=create_tf_dataset_for_client_fn
    )

    # split client id into train and test clients
    loss_builder = tf.keras.losses.SparseCategoricalCrossentropy
    metrics_builder = lambda: [tf.keras.metrics.SparseCategoricalAccuracy()]

    def retrieve_model():
        initializer = tf.keras.initializers.GlorotNormal(seed=0)
        model = tf.keras.models.Sequential([
            tf.keras.layers.LSTM(2, input_shape=(1, 2), return_sequences=True),
            tf.keras.layers.Dense(256, activation=tf.nn.relu),
            tf.keras.layers.Activation(tf.nn.softmax),
        ])

        return model

    print(input_spec)

    def tff_model_fn() -> tff.learning.Model:
        return tff.learning.from_keras_model(
            keras_model=retrieve_model(),
            input_spec=example_dataset.element_spec,
            loss=loss_builder(),
            metrics=metrics_builder())

    iterative_process = tff.learning.build_federated_averaging_process(
        tff_model_fn, Parameters.server_adam_optimizer_fn, Parameters.client_adam_optimizer_fn)
    server_state = iterative_process.initialize()

    for round_num in range(Parameters.FLAGS.total_rounds):
        sampled_clients = np.random.choice(
            train_data.client_ids,
            size=Parameters.FLAGS.train_clients_per_round,
            replace=False)
        sampled_train_data = [
            train_data.create_tf_dataset_for_client(client)
            for client in sampled_clients
        ]
        server_state, metrics = iterative_process.next(server_state, sampled_train_data)
        train_metrics = metrics['train']
        print(metrics)


if __name__ == '__main__':
    app.run(main)


def start():
    app.run(main)

这是 input_spec 输出

Here is the input_spec output

(OrderedDict([('time', TensorSpec(shape=(), dtype=tf.int32, name=None)), ('meas_info', TensorSpec(shape=(), dtype=tf.int32, name=None)), ('value', TensorSpec(shape=(), dtype=tf.int64, name=None))]), TensorSpec(shape=(), dtype=tf.float32, name=None))

这是我得到的错误

ValueError: Layer sequential expects 1 inputs, but it received 3 input tensors. Inputs received: [<tf.Tensor 'batch_input:0' shape=() dtype=int32>, <tf.Tensor 'batch_input_1:0' shape=() dtype=int32>, <tf.Tensor 'batch_input_2:0' shape=() dtype=int64>]

谁能帮我找出问题所在?

Can anyone help me to figure out the problem?

推荐答案

如错误信息:ValueError:Layer Sequential 需要 1 个输入,但它收到了 3 个输入张量. 说,Keras 模型只定义了一个输入(列表中的第一层):

As the error message: ValueError: Layer sequential expects 1 inputs, but it received 3 input tensors. says, the Keras model is defined with only a single input (the first layer in the list):

model = tf.keras.models.Sequential([
  tf.keras.layers.LSTM(2, input_shape=(1, 2), return_sequences=True),
  tf.keras.layers.Dense(256, activation=tf.nn.relu),
  tf.keras.layers.Activation(tf.nn.softmax),
])

尝试检查 model.input_spec 以查看模型希望作为输入提供的对象.

Try inspecting model.input_spec to see what objects the model is expecting to be fed as input.

>>> [InputSpec(shape=(None, None, 2), ndim=3)]

数据集定义了输入特征的 3 个张量的 OrderedDict:

Where as the dataset defines and OrderedDict of 3 tensors for input features:

features = ['time', 'meas_info', 'value']
LABEL_COLUMN = 'counter'
dataset = tf.data.Dataset.from_tensor_slices(
   (collections.OrderedDict(df[features].to_dict('list')),
   df[LABEL_COLUMN].to_list())
)

尝试检查 dataset.element_spec 的值以查看数据集将为模型提供哪些对象.

Try inspecting the value of dataset.element_spec to see what objects the dataset will feed the model.

要使它们兼容,需要更改模型定义或数据集.我将假设需要数据集中的三个特征,在这种情况下,我们想告诉 Keras 我们有来自 OrderedDict 的三个特征.我们需要使用 Keras 的函数模型 API.

To make them compatible will require either changing the model definition, or the dataset. I'll assume the three features in the dataset are desired, in which case we want to tell Keras we have three features from the OrderedDict. We'll need to use the Functional model API from Keras.

SEQUENCE_LENGTH = 5
input_dict =  {f: tf.keras.layers.Input(shape=(SEQUENCE_LENGTH, 1), name=f) for f in features}
concatenated_inputs = tf.keras.layers.Concatenate()(input_dict.values())
lstm_output = tf.keras.layers.LSTM(2, input_shape=(1, 2), return_sequences=True)(concatenated_inputs)
logits = tf.keras.layers.Dense(256, activation=tf.nn.relu)(lstm_output)
predictions = tf.keras.layers.Activation(tf.nn.softmax)(logits)
model = tf.keras.models.Model(inputs=input_dict, outputs=predictions

请注意,对于 LSTM 层,我需要提供额外的 SEQUENCE_LENGTH 变量和维度.shape=(SEQUENCE_LENGTH, 1) 需要修改以适应来自数据集的特征的形状.

Note that for the LSTM layer, I needed to provide and extra SEQUENCE_LENGTH variable and dimension. The shape=(SEQUENCE_LENGTH, 1) will need to be modified to fit the shape of the features coming out of the dataset.

要快速测试模型和数据集是否兼容(不使用所有其他机器),请确保以下内容不会引发错误:

To test if the model and dataset and compatible quickly (without all the other machinery), ensure that the following doesn't raise an error:

model(next(iter(dataset))[0])

这篇关于使用 ValueError 构建自定义联合平均过程:层顺序需要 1 个输入,但它收到 3 个输入张量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆